Pages

Saturday, July 9, 2016

explaining hadoop by processing large files using java

Why we need Hadoop ?

How to process larger files in java 





In this blog concept of hadoop explained by processing large files 


All the java class and source code :

















Sample Data is from :



http://hortonworks.com/hadoop-tutorial/how-to-process-data-with-apache-pig/




What it does ?



  csv file  contains record of base players players by runs scored in each year . 

Each java class process   csv files , and find out  in each year , who scored maximum runs . 


Java class will produces output

<year>  <player>  <MaxRunsScored>



CsvCreator class   is used to generate random data 


Java Code flow


1. Mapper

Map is key value based

Output will be Year as Key , Player + Runs as Value



while (dis.available() != 0) {



try {

linecount++;

String [] tokens = dis.readLine().split("\\,+");

String year = tokens[1];
String player = tokens[0];
String runs = tokens[8];
String value = player+","+runs;

if (mapper.containsKey(year))
{

String newvalue = mapper.get(year);
newvalue = newvalue+"~"+value;
mapper.put(year,newvalue);
}
else
mapper.put(year,value);


}



Reducer

It will find max runs scored every key


for ( String key : mapper.keySet())

{

String token1[] = mapper.get(key).split("\\~+");

int maxRun = Integer.MIN_VALUE;
String toPrint="";

for ( String we : token1 )

{

String token2[] = we.split("\\,+");
int run = Integer.parseInt(token2[1]);

maxTemp = Math.max(maxRun, run);


if ( maxRun == run)
toPrint=" " + key + " " + token2[0] + " " + token2[1];
}
bw.write(toPrint+"\n");

}



Output will print to output.txt



UseCase : 1


Performance

( it can process only upto 500 MB after that it will faile due to Out Of memory)


( All Graphs :
X : Size of the file in MB)
Y : Time to process )









































































UseCase 2 : List of Mappers (MultiMapper)













A new mapper will be created and it will do the mapping after that it will be added to List of Mappers . It need combiner to combiner all the mappers and Reduce will process list of mappers .

Here mapping time will reduced and Reducer take will be more 



Change in Mapper

if (linecount%10000==0)
{
// System.out.println("Spliting");
mapperList.add(mapper);
mapper = new TreeMap<String, String>();
}

Change in Reducer

for(int k=0; k<mapperList.size();k++)
{
mapper = mapperList.get(k);

for ( String keyofi : mapper.keySet())

{
String toPrint=gettheValid(mapper.get(keyofi));
if (Finalmapper.containsKey( keyofi))

{
String currenthigh = Finalmapper.get(keyofi);

String token2[] = currenthigh.split("\\~+");
int run = Integer.parseInt(token2[1]);

String token3[] = toPrint.split("\\~+");

int run2 = Integer.parseInt(token3[1]);

if(run2<run)
Finalmapper.put(keyofi,currenthigh);
else
Finalmapper.put(keyofi,toPrint);
}
else {
Finalmapper.put(keyofi,toPrint);
}
}
}

























No Failure in this code and 3.3 GB in 200 sec





UseCase 3 :

Use multiple threads to read same csv file . It is not good design .
Failed after 300 MB





UseCase 4:

New thread will be created for mapping .


List<RunnableDemo> RDL = new ArrayList<RunnableDemo>();
..............
............
if(apxL%50000==0)

{ mapper = new TreeMap<String, String>();
RDL.add(new RunnableDemo("Thread"+ThreadCount,lines,mapper ));
RDL.get(ThreadCount).start();
lines = new ArrayList<String>();
ThreadCount++;
}




RunnableDemo is class to start and process the threads








































it works fine for less than 1 GB, after that it performances decreases

java -Xms2048m -Xmx4048m NewThread

Java -Xms & -Xmx options can increase the processing capacity . But it will fail depending upon the size of the RAM






UseCase 5 : Split the file and use RMI to access it .



Split the file in two parts . Create a RMI object and run as server . Modify the code to access one part using RMI and other using like UseCase 2 .

MapperRemoteInterface stub=(MapperRemoteInterface)Naming.lookup("rmi://localhost:5000/sonoo");
mapperList=stub.printOne(FileName);













































It works better for size around 1GB , after that performances will same like UseCase2

UseCase 5.b: 2 RMI objects (one local & one remote )

File will be split into two part . One will be stored in local server and other will be stored in remote server . RMI server will be running on both servers . When the program starts , one thread will access
RMIobjectin remote machine , other will access local machine .





































Performance will be doubled with respect to last UseCase



CaseCase 6 : 4 RMI object in AWS



RDL.add(new RunnableDemo("Remote1",filename+".1","rmi://172.31.27.147:5000/sonoo"));

RDL.get(0).start();

RDL.add(new RunnableDemo("Remote2",filename+".2","rmi://172.31.27.146:5000/sonoo"));

RDL.get(1).start();


RDL.add(new RunnableDemo("Remote3",filename+".3","rmi://172.31.27.144:5000/sonoo"));

RDL.get(2).start();

RDL.add(new RunnableDemo("Remote4",filename+".4","rmi://172.31.27.145:5000/sonoo"));

RDL.get(3).start();





































In AWS , Due to memory limitation , this works fine till 800MB file ( 200 MB on each node )


Summary : All Tests in





















































For 800 MB file :


































This shows that , When number of RMI objects increase, the speed and processing capacity will increase
This is basic concept of Hadoop

HDFS , will split the file and stores as block in different node ( datanode) . All the splitting and location of files (metadata ) are stored in master server (namenode )


MapReduce is method , which map every data as (Key , Value ) , Reducer will process those (key,value)

Hadoop works such way that , Mapper and reducer can be don in different data node at same time .

Storage space of HDFS can be improved by adding datanodes


Same query on Oracle 11g Database ?




(4 GB RAM, VM )


Query : 


SELECT a.year, a.name, a.run4 from table20M a  
Join (Select Year, Max(Run4) Run From table20M Group By Year ) B  
ON (a.year = b.year AND a.run4 = b.run);





1. To push 800 MB csv file , (8.5 M rows ) , it need more than 1 hr .

2. Same query will run 13 s in DB and 15sec to finish entire job process (Connection+output IO using Java)


we got best usecase (4 RMI) from AWS is 18 sec for 800 MB file .



RDBMS is still best in processing linear queries .


* It need data in structured format , it wont take semi structured format like xml for json

* Uploading (interesting ) data take more time


Hadoop has solution to these problems

* it can upload data to HDFS in any format , and tools like hive or pig can read semi structured format like xml for json

* Tools like flume can upload data streaming data with out any issue and with out much time .





Hadoop is not replacement for RDBMS .

As of now Hadoop can't do transitional level operation like bank account operation or ticket booking etc . 

Like Hadoop used for showing available tickets with price .

But for booking operation RDBMS used .


It used for distributed storage system and for parallel processing







No comments:

Post a Comment