15 December 2014
The Hadoop Definitive Guide has a pretty good tutorial on creating simple map reduce programs.
The first thing to learn is that the typical map reduce program is make up of at least 3 different classes:
Generally, a driver class should implement the Tool interface and extend the Configured class - this seems to be a fairly common pattern to bootstrap a map reduce program.
The Hadoop Definitive Guide provides a very simple example that prints out the configuration of the Hadoop cluster you are running on:
package com.sodonnel.MapReduce;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ConfigurationPrinter extends Configured implements Tool {
static {
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
for (Entry<String, String> entry: conf) {
System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());
}
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new ConfigurationPrinter(), args);
System.exit(exitCode);
}
}
Notice that the main method does not invoke its own run method, it uses ToolRunner to do it instead, which performs some setup to bootstrap the application.
If you compile this code into a JAR and then run it against a cluster, it will print a rather long list of configuration variables:
$ hadoop jar MapReduce-0.0.1-SNAPSHOT.jar com.sodonnel.MapReduce.ConfigurationPrinter
mapreduce.shuffle.ssl.enabled=false
mapreduce.tasktracker.report.address=127.0.0.1:0
mapreduce.tasktracker.http.threads=40
dfs.stream-buffer-size=4096
tfile.fs.output.buffer.size=262144
fs.permissions.umask-mode=022
dfs.client.datanode-restart.timeout=30
io.bytes.per.checksum=512
ha.failover-controller.graceful-fence.connection.retries=1
dfs.datanode.drop.cache.behind.writes=false
yarn.app.mapreduce.am.resource.cpu-vcores=1
hadoop.common.configuration.version=0.23.0
mapreduce.job.ubertask.enable=false
dfs.namenode.replication.work.multiplier.per.iteration=2
mapreduce.job.acl-modify-job=
io.seqfile.local.dir=${hadoop.tmp.dir}/io/local
fs.s3.sleepTimeSeconds=10
mapreduce.client.output.filter=FAILED
<snip>
Printing the Hadoop configuration might be useful for debugging configuration problems, but it's not super useful. A full map reduce program is probably more interesting. To show how to create a full map reduce problem, I came across a variation of the word count example - read a csv file where each row is a comma seperate list of words and provide the total count of words as output.
This program will make use of 3 classes:
WordCount.java:
package com.sodonnel.MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount extends Configured implements Tool {
public int run(String[] args) throws Exception {
Path inputPath = new Path(args[0]);
Path outputDir = new Path(args[1]);
// Create configuration
Configuration conf = new Configuration(true);
// Create job
Job job = Job.getInstance(conf, "WordCount");
job.setJarByClass(getClass());
// Setup MapReduce
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Input
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
// Delete output if exists
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute job
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new WordCount(), args);
System.exit(exitCode);
}
}
WordCountMapper.java
package com.sodonnel.MapReduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final IntWritable ONE = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] csv = value.toString().split(",");
for (String str : csv) {
word.set(str);
context.write(word, ONE);
}
}
}
WordCountReducer.java:
package com.sodonnel.MapReduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text text, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(text, new IntWritable(sum));
}
}
$ mvn clean install -Dmaven.test.skip=true
On HDFS, create the input directory in your home directory, and put some CSV data into it:
$ hadoop fs -mkdir input
$ hadoop fs -put csvdata.csv input/
$ hadoop jar MapReduce-0.0.1-SNAPSHOT.jar com.sodonnel.MapReduce.WordCount input output
If all goes well, the output directory will be created, and the resulting word counts will be in a file in that directory.