Dissecting MapReduce Program (Part 1)
February 27, 2017Spark vs. Hadoop – Who Wins?
September 13, 2017Dissecting MapReduce Program (Part 2)
In the last post we went over the driver program of a MapReduce program in detail. We will also see InputFormat, OutputFormat & Writables. In this post we will look at Mapper & Reducer and we will finish this post by executing the MapReduce job in our cluster. Now lets move on to the Mapper .
Before you go on reading this post, please note that this post is from our free course named Hadoop Starter Kit. It is a free introductory course on Hadoop and it is 100% free. Click here to enroll to Hadoop Starter Kit. You will also get free access to our 3 node Hadoop cluster hosted on Amazon Web Services (AWS) – also free !
Mapper Program
Can you remember when a Mapper gets called ?
A mapper is called once for every record in your dataset. So if your dataset has 100 records a Mapper will be called 100 times. How many mappers do you think will be created? The number of Mappers will be dependent on the number of Input Splits? To know about the difference between Input Split and Block look at the Dissecting MapReduce Components post. In our dataset each line is a record so the Mapper is called once for each line. What is going to be the output of our Mapper?
We are trying to find out the maximum closing price of each stock symbol. This means that we have to group the records by symbol so that we can calculate the maximum closing price by symbol. So we will output Stock Symbol as the key and close price as the value for each record. We now know what is going to be the Map’s input and what is going to be the maps output. Here is the Mapper program.
package com.hirw.maxcloseprice; /** * MaxClosePriceMapper.java * www.hadoopinrealworld.com * This is a Mapper program to calculate Max Close Price from stock dataset using MapReduce */ import java.io.IOException; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MaxClosePriceMapper extends Mapper<LongWritable, Text, Text, FloatWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] items = line.split(","); String stock = items[1]; Float closePrice = Float.parseFloat(items[6]); context.write(new Text(stock), new FloatWritable(closePrice)); } }
We will extend the Mapper class to make our class a mapper. Make sure you have the right import statements the older version of Hadoop library will have mapred as the naming convention where as the latest version will have MapReduce as the naming convention. Make sure you have included mapreduce to avoid any class conflicts later.
Look at the signature of the Mapper class, there are 4 parameters listed. The first 2 parameter dictate the input to the Mapper as key and value. The third and the fourth parameter tells us the output key and value from the mapper. So the input key to the Mapper is of type LongWritable and input value is of type Text. The output key from the Mapper is of type Text and the output value from the Mapper is of type FloatWritable.
In the program, we have to override the Map method. This method will be called for every record that is passed to the Mapper. The first two arguments to the Map method varies based on the Input format that defines your dataset. The second parameter is the text, which is the actual line from the file. The first argument is LongWritable but what does it represent?
ABCSE,B7J,2010-01-25,8.98,9.00,8.73,8.83,131500,8.83 ABCSE,B7J,2010-01-22,9.03,9.04,8.94,8.95,77800,8.95 ABCSE,B7J,2010-01-21,9.06,9.13,8.98,9.04,102200,9.04 ABCSE,B7J,2010-01-20,8.93,9.09,8.93,9.09,119300,9.09 ABCSE,B7J,2010-01-19,8.99,9.04,8.91,9.02,132300,9.02 ABCSE,B7J,2010-01-15,8.98,9.00,8.90,8.99,78000,8.99
The first argument is the byte offset with in the file of the beginning of the each line. So this argument in this case is not of much importance to use. So we will just ignore this argument because we are more interested in the second argument which is the actual record from the file. Since the record is delimited by comma, we are using the string split method to get all the columns in to the array. We know our dataset very well so we know that the 2nd column is the symbol and the 7th column is the closing price.
Now we have the two values from the record that we are most interested in. We know that the output of the Mapper should be a key value pair. In our case the symbol will be the key and closing price will be the value. But how do we submit the output from the Mapper? We will use the context object to set the key value pair. That is it. From here on, Hadoop will take care of your output. The output along with other key value pairs from the mappers will be sorted and partitioned by key. which will then be copied to the appropriate reducer. then at each reducer the output from several mappers will be merged. The values for a key will be group and the input to the reducer will be a key and a list of values for that key. The reducer will be called once per key.
So in our problem, lets say you have 100 stock symbols, you will have all the closing prices for every stock symbol grouped together, and each reducer will be called 100 times once each stock symbol as key and the list of corresponding close prices for that symbol as the value. All you have to do is write the output to the context object in the Mapper and the map reduce framework will take care of the rest and that is the simplicity and beauty of this amazing framework.
Reducer Program
Now lets look at the Reducer program. Having seen the Mapper already, following the reducer will be much easier. First start by extending the Reducer class. You have to specify 4 type parameters, the first 2 parameters defines the input to the reducer and the 3rd and 4th defines the output from the reducer. Next, override the reduce method and this will take Text as the key which is nothing but a stock symbol and a Iterable list of float writables which is nothing but the list of closing prices for that particular stock symbol as arguments. Once you have the list of closing prices, it is easier to calculate the maximum closing price using the for loop.
package com.hirw.maxcloseprice; /** * MaxClosePriceReducer.java * www.hadoopinrealworld.com * This is a Reduce program to calculate Max Close Price from stock dataset using MapReduce */ import java.io.IOException; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MaxClosePriceReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> { @Override public void reduce(Text key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException { float maxClosePrice = Float.MIN_VALUE; //Iterate all closing prices and calculate maximum for (FloatWritable value : values) { maxClosePrice = Math.max(maxClosePrice, value.get()); } //Write output context.write(key, new FloatWritable(maxClosePrice)); } }
Finally once you have the output, simply submit the output by calling the write method on the context. Make sure the type of input arguments to the reduce method and the types that you wrote to the context match the type parameters defined above. Now we have the program ready. it is now time to run package this project as a .jar and execute the job in our Hadoop cluster.
We have enabled Maven with this project so when we build the project all the dependent jars will be bundled in the project and here is pom.xml file defining the dependencies for the project.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <repositories> <repository> <id>cloudera-releases</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <modelVersion>4.0.0</modelVersion> <groupId>MaxClosePrice</groupId> <artifactId>MaxClosePrice</artifactId> <version>1.0</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>2.3.0-mr1-cdh5.1.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.3.0-mr1-cdh5.1.0</version> </dependency> <dependency> <groupId>org.apache.mrunit</groupId> <artifactId>mrunit</artifactId> <version>1.1.0</version> <classifier>hadoop2</classifier> </dependency> <!-- uncomment if jdk tools not found --> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.7.0_05</version> <scope>system</scope> <systemPath>C:/Program Files/Java/jdk1.8.0_65/lib/tools.jar</systemPath> </dependency> </dependencies> </project>
Execution & results
Note the command to execute, we are using hadoop to execute the jar instead of Java. Because using hadoop conviniently add all the hadoop libaries to the classpath. If we decide to use java to execute, we have to manually specify all the classes that are needed to be added to the classpath.
hadoop jar /hirw-starterkit/mapreduce/stocks/MaxClosePrice-1.0.jar com.hirw.maxcloseprice.MaxClosePrice /user/hirw/input/stocks output/mapreduce/stocks
Finally here is the output of the job execution and the actual output of the MapReduce job will be stored under output/mapreduce/stocks
In the below screenshot you can see, the MapReduce job executed 4 Map tasks and 1 Reduce tasks. Map tasks read a total of 7,461,349 records.
Here is the output of the MapReduce job. Maximum closing price by symbol of all the symbols in the stocks dataset.