Using in MapReduce

This page describes how to read and write ORC files from Hadoop’s newer org.apache.hadoop.mapreduce MapReduce APIs. If you want to use the older org.apache.hadoop.mapred API, please look at the previous page.

Reading ORC files

Add ORC and your desired version of Hadoop to your pom.xml:

<dependencies>
  <dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-mapreduce</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.7.0</version>
  </dependency>
</dependencies>

Set the minimal properties in your JobConf:

ORC files contain a series of values of the same type and that type schema is encoded in the file. Because the ORC files are self-describing, the reader always knows how to correctly interpret the data. All of the ORC files written by Hive and most of the others have a struct as the value type.

Your Mapper class will receive org.apache.hadoop.io.NullWritable as the key and a value based on the table below expanded recursively.

ORC Type Writable Type
array org.apache.orc.mapred.OrcList
binary org.apache.hadoop.io.BytesWritable
bigint org.apache.hadoop.io.LongWritable
boolean org.apache.hadoop.io.BooleanWritable
char org.apache.hadoop.io.Text
date org.apache.hadoop.hive.serde2.io.DateWritable
decimal org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
double org.apache.hadoop.io.DoubleWritable
float org.apache.hadoop.io.FloatWritable
int org.apache.hadoop.io.IntWritable
map org.apache.orc.mapred.OrcMap
smallint org.apache.hadoop.io.ShortWritable
string org.apache.hadoop.io.Text
struct org.apache.orc.mapred.OrcStruct
timestamp org.apache.orc.mapred.OrcTimestamp
tinyint org.apache.hadoop.io.ByteWritable
uniontype org.apache.orc.mapred.OrcUnion
varchar org.apache.hadoop.io.Text

Let’s assume that your input directory contains ORC files with the schema struct<s:string,i:int> and you want to use the string field as the key to the MapReduce shuffle and the integer as the value. The mapper code would look like:

public static class MyMapper
    extends Mapper<NullWritable,OrcStruct,Text,IntWritable> {

  // Assume the ORC file has type: struct<s:string,i:int>
  public void map(NullWritable key, OrcStruct value,
                  Context output) throws IOException, InterruptedException {
    // take the first field as the key and the second field as the value
    output.write((Text) value.getFieldValue(0),
                 (IntWritable) value.getFieldValue(1));
  }
}

Writing ORC files

To write ORC files from your MapReduce job, you’ll need to set

  • mapreduce.job.outputformat.class = org.apache.orc.mapreduce.OrcOutputFormat
  • mapreduce.output.fileoutputformat.outputdir = your output directory
  • orc.mapred.output.schema = the schema to write to the ORC file

The reducer needs to create the Writable value to be put into the ORC file and typically uses the OrcStruct.createValue(TypeDescription) function. For our example, let’s assume that the shuffle types are (Text, IntWritable) from the previous section and the reduce should gather the integer for each key together and write them as a list. The output schema would be struct<key:string,ints:array<int>>. As always with MapReduce, if your method stores the values, you need to copy their value before getting the next.

public static class MyReducer
  extends Reducer<Text,IntWritable,NullWritable,OrcStruct> {

  private TypeDescription schema =
    TypeDescription.fromString("struct<key:string,ints:array<int>>");
  // createValue creates the correct value type for the schema
  private OrcStruct pair = (OrcStruct) OrcStruct.createValue(schema);
  // get a handle to the list of ints
  private OrcList<IntWritable> valueList =
    (OrcList<IntWritable>) pair.getFieldValue(1);
  private final NullWritable nada = NullWritable.get();

  public void reduce(Text key, Iterable<IntWritable> values,
                     Context output
                     ) throws IOException, InterruptedException {
    pair.setFieldValue(0, key);
    valueList.clear();
    for(IntWritable val: values) {
      valueList.add(new IntWritable(val.get()));
    }
    output.write(nada, pair);
  }
}

Sending OrcStruct, OrcList, OrcMap, or OrcUnion through the Shuffle

In the previous examples, only the Hadoop types were sent through the MapReduce shuffle. The complex ORC types, since they are generic types, need to have their full type information provided to create the object. To enable MapReduce to properly instantiate the OrcStruct and other ORC types, we need to wrap it in either an OrcKey for the shuffle key or OrcValue for the shuffle value.

To send two OrcStructs through the shuffle, define the following properties in the JobConf:

  • mapreduce.map.output.key.class = org.apache.orc.mapred.OrcKey
  • orc.mapred.map.output.key.schema = the shuffle key’s schema
  • mapreduce.map.output.value.class = org.apache.orc.mapred.OrcValue
  • orc.mapred.map.output.value.schema = the shuffle value’s schema

The mapper just adds an OrcKey and OrcWrapper around the key and value respectively. These objects should be created once and reused as the mapper runs.

public static class MyMapperShuffle
    extends Mapper<NullWritable,OrcStruct,OrcKey,OrcValue> {
  private OrcKey keyWrapper = new OrcKey();
  private OrcValue valueWrapper = new OrcValue();
  private OrcStruct outStruct = (OrcStruct) OrcStruct.createValue
    (TypeDescription.fromString("struct<i:int,j:int>"));
  private IntWritable i = (IntWritable) outStruct.getFieldValue("i");
  private IntWritable j = (IntWritable) outStruct.getFieldValue("j");

  // Assume the input has type: struct<s:string,i:int>
  public void map(NullWritable key, OrcStruct value,
                  Context output) throws IOException, InterruptedException {
    keyWrapper.key = value;
    valueWrapper.value = outStruct;
    int val = ((IntWritable) value.getFieldValue("i")).get();
    i.set(val * 2);
    j.set(val * val);
    output.write(keyWrapper, valueWrapper);
  }
}

The reducer code accesses the underlying OrcStructs by using the OrcKey.key and OrcValue.value fields.