
Saturday, July 9, 2016

explaining hadoop by processing large files using java

Why we need Hadoop ?

How to process larger files in java 

In this blog concept of hadoop explained by processing large files 

All the java class and source code :

Sample Data is from :

What it does ?

  csv file  contains record of base players players by runs scored in each year . 

Each java class process   csv files , and find out  in each year , who scored maximum runs . 

Java class will produces output

<year>  <player>  <MaxRunsScored>

CsvCreator class   is used to generate random data 

Java Code flow

1. Mapper

Map is key value based

Output will be Year as Key , Player + Runs as Value

while (dis.available() != 0) {

try {


String [] tokens = dis.readLine().split("\\,+");

String year = tokens[1];
String player = tokens[0];
String runs = tokens[8];
String value = player+","+runs;

if (mapper.containsKey(year))

String newvalue = mapper.get(year);
newvalue = newvalue+"~"+value;



It will find max runs scored every key

for ( String key : mapper.keySet())


String token1[] = mapper.get(key).split("\\~+");

int maxRun = Integer.MIN_VALUE;
String toPrint="";

for ( String we : token1 )


String token2[] = we.split("\\,+");
int run = Integer.parseInt(token2[1]);

maxTemp = Math.max(maxRun, run);

if ( maxRun == run)
toPrint=" " + key + " " + token2[0] + " " + token2[1];


Output will print to output.txt

UseCase : 1


( it can process only upto 500 MB after that it will faile due to Out Of memory)

( All Graphs :
X : Size of the file in MB)
Y : Time to process )

UseCase 2 : List of Mappers (MultiMapper)

A new mapper will be created and it will do the mapping after that it will be added to List of Mappers . It need combiner to combiner all the mappers and Reduce will process list of mappers .

Here mapping time will reduced and Reducer take will be more 

Change in Mapper

if (linecount%10000==0)
// System.out.println("Spliting");
mapper = new TreeMap<String, String>();

Change in Reducer

for(int k=0; k<mapperList.size();k++)
mapper = mapperList.get(k);

for ( String keyofi : mapper.keySet())

String toPrint=gettheValid(mapper.get(keyofi));
if (Finalmapper.containsKey( keyofi))

String currenthigh = Finalmapper.get(keyofi);

String token2[] = currenthigh.split("\\~+");
int run = Integer.parseInt(token2[1]);

String token3[] = toPrint.split("\\~+");

int run2 = Integer.parseInt(token3[1]);

else {

No Failure in this code and 3.3 GB in 200 sec

UseCase 3 :

Use multiple threads to read same csv file . It is not good design .
Failed after 300 MB

UseCase 4:

New thread will be created for mapping .

List<RunnableDemo> RDL = new ArrayList<RunnableDemo>();

{ mapper = new TreeMap<String, String>();
RDL.add(new RunnableDemo("Thread"+ThreadCount,lines,mapper ));
lines = new ArrayList<String>();

RunnableDemo is class to start and process the threads

it works fine for less than 1 GB, after that it performances decreases

java -Xms2048m -Xmx4048m NewThread

Java -Xms & -Xmx options can increase the processing capacity . But it will fail depending upon the size of the RAM

UseCase 5 : Split the file and use RMI to access it .

Split the file in two parts . Create a RMI object and run as server . Modify the code to access one part using RMI and other using like UseCase 2 .

MapperRemoteInterface stub=(MapperRemoteInterface)Naming.lookup("rmi://localhost:5000/sonoo");

It works better for size around 1GB , after that performances will same like UseCase2

UseCase 5.b: 2 RMI objects (one local & one remote )

File will be split into two part . One will be stored in local server and other will be stored in remote server . RMI server will be running on both servers . When the program starts , one thread will access
RMIobjectin remote machine , other will access local machine .

Performance will be doubled with respect to last UseCase

CaseCase 6 : 4 RMI object in AWS

RDL.add(new RunnableDemo("Remote1",filename+".1","rmi://"));


RDL.add(new RunnableDemo("Remote2",filename+".2","rmi://"));


RDL.add(new RunnableDemo("Remote3",filename+".3","rmi://"));


RDL.add(new RunnableDemo("Remote4",filename+".4","rmi://"));


In AWS , Due to memory limitation , this works fine till 800MB file ( 200 MB on each node )

Summary : All Tests in

For 800 MB file :

This shows that , When number of RMI objects increase, the speed and processing capacity will increase
This is basic concept of Hadoop

HDFS , will split the file and stores as block in different node ( datanode) . All the splitting and location of files (metadata ) are stored in master server (namenode )

MapReduce is method , which map every data as (Key , Value ) , Reducer will process those (key,value)

Hadoop works such way that , Mapper and reducer can be don in different data node at same time .

Storage space of HDFS can be improved by adding datanodes

Same query on Oracle 11g Database ?

(4 GB RAM, VM )

Query : 

SELECT a.year,, a.run4 from table20M a  
Join (Select Year, Max(Run4) Run From table20M Group By Year ) B  
ON (a.year = b.year AND a.run4 =;

1. To push 800 MB csv file , (8.5 M rows ) , it need more than 1 hr .

2. Same query will run 13 s in DB and 15sec to finish entire job process (Connection+output IO using Java)

we got best usecase (4 RMI) from AWS is 18 sec for 800 MB file .

RDBMS is still best in processing linear queries .

* It need data in structured format , it wont take semi structured format like xml for json

* Uploading (interesting ) data take more time

Hadoop has solution to these problems

* it can upload data to HDFS in any format , and tools like hive or pig can read semi structured format like xml for json

* Tools like flume can upload data streaming data with out any issue and with out much time .

Hadoop is not replacement for RDBMS .

As of now Hadoop can't do transitional level operation like bank account operation or ticket booking etc . 

Like Hadoop used for showing available tickets with price .

But for booking operation RDBMS used .

It used for distributed storage system and for parallel processing

