27 April 2015

Map Reduce Multiple Outputs

Recently I wrote a couple of articles about writing and testing simple map reduce programs.

That got me thinking about some more advanced things you might want to do, so in this article and hopefully a few more in the future I will look at more features of the map reduce framework, starting with multiple outputs.

Multiple Outputs

Most of the early map reduce examples I came across only worked with a single input and a single output directory. It didn't take me long to come across a few scenarios where it would be useful to output to several directories. For instance, I might want to fan out the records in a large file into several smaller files, based on the contents of each record. Or, if I am processing a batch of records, instead of failing the entire job thanks to a single badly formatted record, I could write the bad record out to an error file and continue on processing as normal.

Hadoop allows writing to multiple output files very easily, and there are (at least) two ways of doing it.

Named Outputs

In the driver class, specify any number of named output classes.

MultipleOutputs.addNamedOutput(job, "goodrecords", TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "badrecord",   TextOutputFormat.class, NullWritable.class, Text.class);

Note that each output can have a different format and key / value type.

In the mapper (or reducer) you need to do a couple of things to make multiple outputs work correctly.

First, in the mapper setup method, instantiate a multiple output object:

@Override
public void setup(Context context) {
  mos = new MultipleOutputs<NullWritable, Text>(context);
}

A very important point is that you also need to close the multiple output object or things do not work correctly at all. You do this in the cleanup method:

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
  mos.close();
}

Then in the map (or reduce) method, you can write either of the outputs previously created by name:

mos.write("goodRecords", NullWritable.get(), value);
mos.write("badrecords",  NullWritable.get(), value);

You can still write to the context as usual too, so this is quite flexible.

An important point when writing to multiple outputs in this way, is that all the files end up in the same directory with a different prefix, which is the default output directory for the job. You don't have the option to specify a different path for each named output.

Unnamed Outputs

If you want to output to a different file based on the content on the input record, then you can use unnamed multiple outputs. In this case, you don't have to setup anything in the driver class except the usual output directory.

In the mapper, add a setup and cleanup method exactly the same as above, and then use an alternative version of the write command:

String keyChar = value.toString().substring(0,1).toLowerCase();
mos.write(NullWritable.get(), value, keyChar);

In this example, the output does not have a name and the record is written to an output file determined by the first character of the record (keyChar in the example above).

Unlike with the named outputs, you can have data written into sub directories, by passing a path instead of just a filename to keyChar. I think the subdirectories must be within the jobs normal output directory.

Another difference from the named outputs is that each named output can have a different key, value and file format, while the unnamed version seems to inherit these attributes from the jobs default output settings.

Lazy Output Format

There is one more minor annoyance when working with multiple outputs. Even if you never write any records to the context for standard output, the map reduce framework will create a zero byte file in the output directory. There is a way to avoid this, by adding the following setting in the driver:

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

The Complete Code

//
// DRIVER
//

package com.sodonnel.Hadoop.Fan;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Fan extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        Path inputPath = new Path(args[0]);
        Path outputDir = new Path(args[1]);

        // Create configuration
        Configuration conf = new Configuration(true);

        // Create job
        Job job = Job.getInstance(conf, "Fan");
        job.setJarByClass(getClass());

        // Setup MapReduce
        job.setMapperClass(FanOutMapper.class);
        job.setNumReduceTasks(0);

        // Specify key / value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // Input
        FileInputFormat.addInputPath(job, inputPath);
        job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, outputDir);
        //
        // Changing the outputFormatClass by commenting the next line
        // and adding the following one, prevents a zero byte file from
        // being created when you use multi-outputs
        //
        job.setOutputFormatClass(TextOutputFormat.class);
        //LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

        //
        // If you want to have named outputs, then define them upfront here
        //
        //  MultipleOutputs.addNamedOutput(job, "badRecords", TextOutputFormat.class,
        //          NullWritable.class, Text.class);
        //  MultipleOutputs.addNamedOutput(job, "goodRecords", TextOutputFormat.class,
        //          NullWritable.class, Text.class);

        // Delete output if exists
        FileSystem hdfs = FileSystem.get(conf);
        if (hdfs.exists(outputDir))
            hdfs.delete(outputDir, true);

        // Execute job
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Fan(), args);
        System.exit(exitCode);
    }
}

//
// MAPPER
//

package com.sodonnel.Hadoop.Fan;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class FanOutMapper extends
    Mapper<LongWritable, Text, NullWritable, Text> {

    private MultipleOutputs<NullWritable, Text> mos;

    @Override
    public void setup(Context context) {
        mos = new MultipleOutputs<NullWritable, Text>(context);
    }           

    // 
    // You must override the cleanup method and close the multi-output object
    // or things do not work correctly.
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        // Throw away totally blank lines
        if (value.equals(new Text(""))) {
            return;
        }

        // Fan the records out into a file that has the first character of the 
        // string as the filename.
        // You can also use named outputs (defined in the job runner class)
        // instead of deriving the filename based on the input lines.
        // If you pass a path with / characters in it, the data will go into subdirs
        // eg 20150304/data etc
        String keyChar = value.toString().substring(0,1).toLowerCase();

        // In this example, the keyChar string indicates the filename the data is written
        // into. You can write the same data to many files, and the filename can 
        // contain slashes to make it into a path. The path is relative to the output dir
        // setup in the job config.
        mos.write(NullWritable.get(), value, keyChar);
        // mos.write("goodRecords", NullWritable.get(),value);
        // context.write(NullWritable.get(), value);
    }
}
blog comments powered by Disqus