24 September 2015

Map Reduce with XML Input and Multiple Avro Outputs

Continuing my learning on Map Reduce (after quite a long break) I decided to figure out how to take the Stack Exchange data dumps and convert them from XML to Avro.

The data dumps have one file per table, with the data for each row encoded as attributes within a row tag, eg:

<votes>
  ...
  <row Id="1" PostId="1" VoteTypeId="2" CreationDate="2009-04-30T00:00:00.000" />
  ...
</votes>

The first challenge is therefore how to read an XML file with a map reduce job.

StreamXmlRecordReader and XmlInputFormat

Reading large XML documents with Hadoop has the potential to be tricky as the XML document can span multiple splits, and hence cannot be processed by a single mapper. Often a large XML document is actually a collection of relatively small records, which is the case here. The Hadoop Definitive Guide mentions StreamXmlRecordReader as a way to process these sort of documents. You specify the start and end tags that describe how to split the large XML document into records, and then a mapper gets handed a full record at a time. It also mentions an improved XML input format from the Mahout project called XmlInputFormat. Eventually I found a project that used XmlInputFormat, and the entire XmlInputFormat is about 140 lines. It's a pretty good example of how to create a custom input format. The source is on github. Just copying that class into your project is much easier than including the entire Mahout project!

To use this, all you need is the following in your driver:

// Create configuration
Configuration conf = this.getConf(); //new Configuration(true);
conf.set("xmlinput.start", "<row");
conf.set("xmlinput.end", "/>");

// Create job
Job job = Job.getInstance(conf, "StackOverflow XML to Avro");
job.setJarByClass(getClass());
job.setInputFormatClass(XmlInputFormat.class);

Notice that I have not used full tags as my delimiters, which works OK and is what is required for this example.

Avro Outputs

An Avro file is a file that contains a list of Avro records, where each record conforms to the same schema. The schema can be defined in JSON format and is stored in the Avro output file along with the records. For example, the schema for the Votes record is:

{
  "type":  "record",
  "name":  "StackOverflowVoteRecord",
  "doc":   "A Record",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "postid", "type": ["null", "string"] },
    { "name": "votetypeid", "type": ["null", "string"] },
    { "name": "userid", "type": ["null", "string"] },
    { "name": "creationdate", "type": ["null", "string"] },
    { "name": "bountyamount", "type": ["null", "string"] }
  ]
  }

Each entry in an Avro file is a record, but there are output types that simulate writing key value pairs into the file by writing a record with two fields, 'key' and 'value', where the value can be another record.

To configure an Avro job, you should use the AvroJob class, which allows you to set the key and value schema for mapper and job output, for example:

AvroJob.setMapOutputKeySchema(job, Schema.Type.INT));
AvroJob.setMapOutputValueSchema(job, SCHEMA);
AvroJob.setOutputKeySchema(job, SCHEMA);

job.setOutputFormatClass(AvroKeyOutputFormat.class);

In this job, I have several types of input file to process, and hence several types of output file, so instead of using AvroJob, I used AvroMultipleOutputs instead. This allows me to add a named output each with a different schema for each input file type:

AvroMultipleOutputs.addNamedOutput(job, "badges", AvroKeyOutputFormat.class, SCHEMAS.get("badges"), null);
AvroMultipleOutputs.addNamedOutput(job, "users", AvroKeyOutputFormat.class, SCHEMAS.get("users"), null);
AvroMultipleOutputs.addNamedOutput(job, "posts", AvroKeyOutputFormat.class, SCHEMAS.get("posts"), null);
AvroMultipleOutputs.addNamedOutput(job, "comments", AvroKeyOutputFormat.class, SCHEMAS.get("comments"), null);
AvroMultipleOutputs.addNamedOutput(job, "tags", AvroKeyOutputFormat.class, SCHEMAS.get("tags"), null);
AvroMultipleOutputs.addNamedOutput(job, "votes", AvroKeyOutputFormat.class, SCHEMAS.get("votes"), null);
AvroMultipleOutputs.addNamedOutput(job, "postlinks", AvroKeyOutputFormat.class, SCHEMAS.get("postlinks"), null);
AvroMultipleOutputs.setCountersEnabled(job, true);

The complete driver code is on github

The Mapper

The mapper is now pretty simple, aside from one complexity. The mapper must identify which input filetype is it processing (from its filename) and select the correct schema and named output to write the avro records into. The filename can be obtained from the map reduce context, and then the schema looked up in a hashmap stored in the driver class. This is all taken care of by the mapper setup method:

public void setup(Context context) {
  amos = new AvroMultipleOutputs(context);
  InputSplit split = context.getInputSplit();
  String fileName = ((FileSplit) split).getPath().getName().toLowerCase().split("\\.")[0];
  if (xmlToAvro.SCHEMAS.containsKey(fileName)) {
    record = new GenericData.Record(xmlToAvro.SCHEMAS.get(fileName));
    amos_name = fileName;
  }
}

The remainder of the mapper code handles splitting the XML records into an Avro record and writing them to the correct output.

Maven Dependencies

To use Avro in a map reduce job, you need to add the following to you pom.xml:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.7.7</version>
  </dependency>

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-mapred</artifactId>
  <version>1.7.7</version>
  <classifier>hadoop2</classifier>
</dependency>

Notice the classifier 'hadoop2' - this includes the avro-mapred jar for MR2 (yarn).

Running The Job

Load Raw Data To Hadoop

The Stack Exchange dumps come compressed in 7z format (which I had never used before). The first step is to decompress them and load to Hadoop. I put them into a folder called input in my HDFS home directory:

[vagrant@standalone serverfault.com]$ ls -ltrh
total 2.0G
-rw-r--r-- 1 vagrant vagrant 168M Sep 23 15:23 Comments.xml
-rw-r--r-- 1 vagrant vagrant  39M Sep 23 15:23 Badges.xml
-rw-r--r-- 1 vagrant vagrant 2.9M Sep 23 15:23 PostLinks.xml
-rw-r--r-- 1 vagrant vagrant 916M Sep 23 15:23 PostHistory.xml
-rw-r--r-- 1 vagrant vagrant 139M Sep 23 15:23 Votes.xml
-rw-r--r-- 1 vagrant vagrant  64M Sep 23 15:23 Users.xml
-rw-r--r-- 1 vagrant vagrant 224K Sep 23 15:23 Tags.xml
-rw-r--r-- 1 vagrant vagrant 625M Sep 23 15:23 Posts.xml

[vagrant@standalone serverfault.com]$ hadoop fs -put *.xml input/

[vagrant@standalone serverfault.com]$ hadoop fs -ls input
Found 8 items
-rw-r--r--   3 vagrant vagrant   40661120 2015-09-23 15:24 input/Badges.xml
-rw-r--r--   3 vagrant vagrant  176150364 2015-09-23 15:24 input/Comments.xml
-rw-r--r--   3 vagrant vagrant  959680230 2015-09-23 15:24 input/PostHistory.xml
-rw-r--r--   3 vagrant vagrant    2988058 2015-09-23 15:24 input/PostLinks.xml
-rw-r--r--   3 vagrant vagrant  654708460 2015-09-23 15:24 input/Posts.xml
-rw-r--r--   3 vagrant vagrant     228933 2015-09-23 15:24 input/Tags.xml
-rw-r--r--   3 vagrant vagrant   66168226 2015-09-23 15:24 input/Users.xml
-rw-r--r--   3 vagrant vagrant  144769493 2015-09-23 15:25 input/Votes.xml

Compile and Run Map Reduce

Clone the git repo.

Use Maven to build the jar:

$ mvn package -Dmaven.test.skip=true

The Avro libraries are not part of the core Hadoop install, so you need to download the correct Jars and add them to both LIBJARS and HADOOP_CLASSPATH. Then the job can be started in the usual way:

export LIBJARS=avro-1.7.7.jar,avro-mapred-1.7.7-hadoop2.jar
export HADOOP_CLASSPATH=avro-1.7.7.jar:avro-mapred-1.7.7-hadoop2.jar
hadoop jar WordCount-1.0-SNAPSHOT.jar com.sodonnel.stackOverflow.xmlToAvro -libjars $LIBJARS input output

After running the job, you should end up with a few Avro files for each of the input XML files in the output directory:

$ hadoop fs -ls output
Found 13 items
-rw-r--r--   3 vagrant vagrant          0 2015-09-24 14:11 output/_SUCCESS
-rw-r--r--   3 vagrant vagrant   22477814 2015-09-24 14:11 output/badges-m-00016.avro
-rw-r--r--   3 vagrant vagrant  110111079 2015-09-24 14:03 output/comments-m-00001.avro
-rw-r--r--   3 vagrant vagrant   34758424 2015-09-24 14:11 output/comments-m-00015.avro
-rw-r--r--   3 vagrant vagrant    1330504 2015-09-24 14:11 output/postlinks-m-00018.avro
-rw-r--r--   3 vagrant vagrant  124593825 2015-09-24 14:07 output/posts-m-00009.avro
-rw-r--r--   3 vagrant vagrant  123105305 2015-09-24 14:08 output/posts-m-00010.avro
-rw-r--r--   3 vagrant vagrant  121213033 2015-09-24 14:09 output/posts-m-00011.avro
-rw-r--r--   3 vagrant vagrant  119791251 2015-09-24 14:09 output/posts-m-00012.avro
-rw-r--r--   3 vagrant vagrant  104216815 2015-09-24 14:10 output/posts-m-00013.avro
-rw-r--r--   3 vagrant vagrant     119501 2015-09-24 14:11 output/tags-m-00019.avro
-rw-r--r--   3 vagrant vagrant   69758697 2015-09-24 14:10 output/users-m-00014.avro
-rw-r--r--   3 vagrant vagrant   83896117 2015-09-24 14:03 output/votes-m-00000.avro
blog comments powered by Disqus