15 December 2014

Creating a Simple Map Reduce Program for Cloudera Hadoop

The Hadoop Definitive Guide has a pretty good tutorial on creating simple map reduce programs.

The first thing to learn is that the typical map reduce program is make up of at least 3 different classes:

  • The Driver class - this is the program entry point, and is used to setup the flow of the job
  • The Mapper class - this implements the map phase of the map reduce task
  • The Reducer class - this implements the reduce phase of the job

Generally, a driver class should implement the Tool interface and extend the Configured class - this seems to be a fairly common pattern to bootstrap a map reduce program.

The Hadoop Definitive Guide provides a very simple example that prints out the configuration of the Hadoop cluster you are running on:

package com.sodonnel.MapReduce;

import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class ConfigurationPrinter extends Configured implements Tool {

    static {
        Configuration.addDefaultResource("hdfs-default.xml");
        Configuration.addDefaultResource("hdfs-site.xml");
        Configuration.addDefaultResource("mapred-default.xml");
        Configuration.addDefaultResource("mapred-site.xml");
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        for (Entry<String, String> entry: conf) {
            System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());
        }
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new ConfigurationPrinter(), args);
        System.exit(exitCode);
    }       

}

Notice that the main method does not invoke its own run method, it uses ToolRunner to do it instead, which performs some setup to bootstrap the application.

If you compile this code into a JAR and then run it against a cluster, it will print a rather long list of configuration variables:

$ hadoop jar MapReduce-0.0.1-SNAPSHOT.jar com.sodonnel.MapReduce.ConfigurationPrinter

mapreduce.shuffle.ssl.enabled=false
mapreduce.tasktracker.report.address=127.0.0.1:0
mapreduce.tasktracker.http.threads=40
dfs.stream-buffer-size=4096
tfile.fs.output.buffer.size=262144
fs.permissions.umask-mode=022
dfs.client.datanode-restart.timeout=30
io.bytes.per.checksum=512
ha.failover-controller.graceful-fence.connection.retries=1
dfs.datanode.drop.cache.behind.writes=false
yarn.app.mapreduce.am.resource.cpu-vcores=1
hadoop.common.configuration.version=0.23.0
mapreduce.job.ubertask.enable=false
dfs.namenode.replication.work.multiplier.per.iteration=2
mapreduce.job.acl-modify-job=
io.seqfile.local.dir=${hadoop.tmp.dir}/io/local
fs.s3.sleepTimeSeconds=10
mapreduce.client.output.filter=FAILED
<snip>

A Full Map Reduce Program

Printing the Hadoop configuration might be useful for debugging configuration problems, but it's not super useful. A full map reduce program is probably more interesting. To show how to create a full map reduce problem, I came across a variation of the word count example - read a csv file where each row is a comma seperate list of words and provide the total count of words as output.

This program will make use of 3 classes:

  • WordCount - this is the driver program
  • WordCountMapper - This will receive the data a line at a time, split it into words (the key) and counts (the value, always 1) to pass onto the reducer
  • WordCountReducer - This will receive the list of words created by the mapper and sum up the count of each word, before writing it to the output directory.

WordCount.java:

package com.sodonnel.MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount extends Configured implements Tool {



    public int run(String[] args) throws Exception {
        Path inputPath = new Path(args[0]);
        Path outputDir = new Path(args[1]);

        // Create configuration
        Configuration conf = new Configuration(true);

        // Create job
        Job job = Job.getInstance(conf, "WordCount");
        job.setJarByClass(getClass());

        // Setup MapReduce
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setNumReduceTasks(1);

        // Specify key / value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // Input
        FileInputFormat.addInputPath(job, inputPath);
        job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setOutputFormatClass(TextOutputFormat.class);

        // Delete output if exists
        FileSystem hdfs = FileSystem.get(conf);
        if (hdfs.exists(outputDir))
            hdfs.delete(outputDir, true);

        // Execute job
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new WordCount(), args);
        System.exit(exitCode);
    }


}

WordCountMapper.java

package com.sodonnel.MapReduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends
        Mapper<Object, Text, Text, IntWritable> {

    private final IntWritable ONE = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {

        String[] csv = value.toString().split(",");
        for (String str : csv) {
            word.set(str);
            context.write(word, ONE);
        }
    }
}

WordCountReducer.java:

package com.sodonnel.MapReduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text text, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(text, new IntWritable(sum));
    }
}

Compile the JAR

$ mvn clean install -Dmaven.test.skip=true

Add Input Data

On HDFS, create the input directory in your home directory, and put some CSV data into it:

$ hadoop fs -mkdir input
$ hadoop fs -put csvdata.csv input/

Run the Map Reduce Program

$ hadoop jar MapReduce-0.0.1-SNAPSHOT.jar com.sodonnel.MapReduce.WordCount input output

If all goes well, the output directory will be created, and the resulting word counts will be in a file in that directory.

blog comments powered by Disqus