Using Core Java
The Core ORC API reads and writes ORC files into Hive’s storage-api vectorized classes. Both Hive and MapReduce use the Core API to actually read and write the data.
Vectorized Row Batch
Data is passed to ORC as instances of
VectorizedRowBatch
that contain the data for 1024 rows. The focus is on speed and
accessing the data fields directly. cols
is an array of
ColumnVector
and size
is the number of rows.
package org.apache.hadoop.hive.ql.exec.vector;
public class VectorizedRowBatch {
public ColumnVector[] cols;
public int size;
...
}
ColumnVector
is the parent type of the different kinds of columns and has some
fields that are shared across all of the column types. In particular,
the noNulls
flag if there are no nulls in this column for this batch
and the isRepeating
flag for columns were the entire batch is the
same value. For columns where noNulls == false
the isNull
array is
true if that value is null.
public abstract class ColumnVector {
// If the whole column vector has no nulls, this is true, otherwise false.
public boolean noNulls;
// If hasNulls is true, then this array contains true if the value is
// is null, otherwise false.
public boolean[] isNull;
/*
* True if same value repeats for whole column vector.
* If so, vector[0] holds the repeating value.
*/
public boolean isRepeating;
...
}
The subtypes of ColumnVector are:
ORC Type | ColumnVector |
---|---|
array | ListColumnVector |
binary | BytesColumnVector |
bigint | LongColumnVector |
boolean | LongColumnVector |
char | BytesColumnVector |
date | LongColumnVector |
decimal | DecimalColumnVector |
double | DoubleColumnVector |
float | DoubleColumnVector |
int | LongColumnVector |
map | MapColumnVector |
smallint | LongColumnVector |
string | BytesColumnVector |
struct | StructColumnVector |
timestamp | TimestampColumnVector |
tinyint | LongColumnVector |
uniontype | UnionColumnVector |
varchar | BytesColumnVector |
LongColumnVector handles all of the integer types (boolean, bigint, date, int, smallint, and tinyint). The data is represented as an array of longs where each value is sign-extended as necessary.
public class LongColumnVector extends ColumnVector {
public long[] vector;
...
}
TimestampColumnVector handles timestamp values. The data is represented as an array of longs and an array of ints.
public class TimestampColumnVector extends ColumnVector {
// the number of milliseconds since 1 Jan 1970 00:00 GMT
public long[] time;
// the number of nanoseconds within the second
public int[] nanos
...
}
DoubleColumnVector handles all of the floating point types (double, and float). The data is represented as an array of doubles.
public class DoubleColumnVector extends ColumnVector {
public double[] vector;
...
}
DecimalColumnVector handles decimal columns. The data is represented as an array of HiveDecimalWritable. Note that this implementation is not performant and will likely be replaced.
public class DecimalColumnVector extends ColumnVector {
public HiveDecimalWritable[] vector;
...
}
BytesColumnVector handles all of the binary types (binary, char, string, and varchar). The data is represented as a byte array, offset, and length. The byte arrays may or may not be shared between values.
public class BytesColumnVector extends ColumnVector {
public byte[][] vector;
public int[] start;
public int[] length;
...
}
StructColumnVector
handles the struct columns and represents the data as an array of
ColumnVector
. The value for row 5 consists of the fifth value from
each of the fields
values.
public class StructColumnVector extends ColumnVector {
public ColumnVector[] fields;
...
}
UnionColumnVector
handles the union columns and represents the data as an array of
integers that pick the subtype and a fields
array one per a
subtype. Only the value of the fields
that corresponds to
tags[row]
is set.
public class UnionColumnVector extends ColumnVector {
public int[] tags;
public ColumnVector[] fields;
...
}
ListColumnVector
handles the array columns and represents the data as two arrays of
integers for the offset and lengths and a ColumnVector
for the
children values.
public class ListColumnVector extends ColumnVector {
// for each row, the first offset of the child
public long[] offsets;
// for each row, the number of elements in the array
public long[] lengths;
// the offset in the child that should be used for new values
public int childCount;
// the values of the children
public ColumnVector child;
...
}
MapColumnVector
handles the map columns and represents the data as two arrays of
integers for the offset and lengths and two ColumnVector
s for the
keys and values.
public class MapColumnVector extends ColumnVector {
// for each row, the first offset of the child
public long[] offsets;
// for each row, the number of elements in the array
public long[] lengths;
// the offset in the child that should be used for new values
public int childCount;
// the values of the keys and values
public ColumnVector keys;
public ColumnVector values;
...
}
Writing ORC Files
Simple Example
To write an ORC file, you need to define the schema and use the OrcFile class to create a Writer with the desired filename. This example sets the required schema parameter, but there are many other options to control the ORC writer.
Configuration conf = new Configuration();
TypeDescription schema = TypeDescription.fromString("struct<x:int,y:int>");
Writer writer = OrcFile.createWriter(new Path("my-file.orc"),
OrcFile.writerOptions(conf)
.setSchema(schema));
Now you need to create a row batch, set the data, and write it to the file
as the batch fills up. When the file is done, close the Writer
.
VectorizedRowBatch batch = schema.createRowBatch();
LongColumnVector x = (LongColumnVector) batch.cols[0];
LongColumnVector y = (LongColumnVector) batch.cols[1];
for(int r=0; r < 10000; ++r) {
int row = batch.size++;
x.vector[row] = r;
y.vector[row] = r * 3;
// If the batch is full, write it out and start over.
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
writer.addRowBatch(batch);
batch.reset();
}
writer.close();
Advanced Example
The following example writes an ORC file with two integer columns and a map column. Each row’s map has 5 elements with keys ranging from “<row>.0” to “<row>.4”.
Path testFilePath = new Path("advanced-example.orc");
Configuration conf = new Configuration();
TypeDescription schema =
TypeDescription.fromString("struct<first:int," +
"second:int,third:map<string,int>>");
Writer writer =
OrcFile.createWriter(testFilePath,
OrcFile.writerOptions(conf).setSchema(schema));
VectorizedRowBatch batch = schema.createRowBatch();
LongColumnVector first = (LongColumnVector) batch.cols[0];
LongColumnVector second = (LongColumnVector) batch.cols[1];
//Define map. You need also to cast the key and value vectors
MapColumnVector map = (MapColumnVector) batch.cols[2];
BytesColumnVector mapKey = (BytesColumnVector) map.keys;
LongColumnVector mapValue = (LongColumnVector) map.values;
// Each map has 5 elements
final int MAP_SIZE = 5;
final int BATCH_SIZE = batch.getMaxSize();
// Ensure the map is big enough
mapKey.ensureSize(BATCH_SIZE * MAP_SIZE, false);
mapValue.ensureSize(BATCH_SIZE * MAP_SIZE, false);
// add 1500 rows to file
for(int r=0; r < 1500; ++r) {
int row = batch.size++;
first.vector[row] = r;
second.vector[row] = r * 3;
map.offsets[row] = map.childCount;
map.lengths[row] = MAP_SIZE;
map.childCount += MAP_SIZE;
for (int mapElem = (int) map.offsets[row];
mapElem < map.offsets[row] + MAP_SIZE; ++mapElem) {
String key = "row " + r + "." + (mapElem - map.offsets[row]);
mapKey.setVal(mapElem, key.getBytes(StandardCharsets.UTF_8));
mapValue.vector[mapElem] = mapElem;
}
if (row == BATCH_SIZE - 1) {
writer.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
writer.addRowBatch(batch);
batch.reset();
}
writer.close();
Reading ORC Files
To read ORC files, use the OrcFile class to create a Reader that contains the metadata about the file. There are a few options to the ORC reader, but far fewer than the writer and none of them are required. The reader has methods for getting the number of rows, schema, compression, etc. from the file.
Reader reader = OrcFile.createReader(new Path("my-file.orc"),
OrcFile.readerOptions(conf));
To get the data, create a RecordReader object. By default, the RecordReader reads all rows and all columns, but there are options to control the data that is read.
RecordReader rows = reader.rows();
VectorizedRowBatch batch = reader.getSchema().createRowBatch();
With a RecordReader
the user can ask for the next batch until there
are no more left. The reader will stop the batch at certain boundaries, so the
returned batch may not be full, but it will always contain some rows.
while (rows.nextBatch(batch)) {
for(int r=0; r < batch.size; ++r) {
... process row r from batch
}
}
rows.close();