diff --git a/java/core/pom.xml b/java/core/pom.xml index e09ebda997..35ad77862e 100644 --- a/java/core/pom.xml +++ b/java/core/pom.xml @@ -41,6 +41,10 @@ commons-lang commons-lang + + io.airlift + aircompressor + org.apache.hadoop hadoop-common @@ -49,10 +53,6 @@ org.apache.hive hive-storage-api - - org.iq80.snappy - snappy - org.slf4j slf4j-api diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/org/apache/orc/CompressionKind.java index f684bef817..3cffe57ee9 100644 --- a/java/core/src/java/org/apache/orc/CompressionKind.java +++ b/java/core/src/java/org/apache/orc/CompressionKind.java @@ -23,5 +23,5 @@ * can be applied to ORC files. */ public enum CompressionKind { - NONE, ZLIB, SNAPPY, LZO + NONE, ZLIB, SNAPPY, LZO, LZ4 } diff --git a/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java new file mode 100644 index 0000000000..a3047304bd --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/AircompressorCodec.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import io.airlift.compress.Compressor; +import io.airlift.compress.Decompressor; +import org.apache.orc.CompressionCodec; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +public class AircompressorCodec implements CompressionCodec { + private final Compressor compressor; + private final Decompressor decompressor; + + AircompressorCodec(Compressor compressor, Decompressor decompressor) { + this.compressor = compressor; + this.decompressor = decompressor; + } + + // Thread local buffer + private static final ThreadLocal threadBuffer = + new ThreadLocal() { + @Override + protected byte[] initialValue() { + return null; + } + }; + + protected static byte[] getBuffer(int size) { + byte[] result = threadBuffer.get(); + if (result == null || result.length < size || result.length > size * 2) { + result = new byte[size]; + threadBuffer.set(result); + } + return result; + } + + @Override + public boolean compress(ByteBuffer in, ByteBuffer out, + ByteBuffer overflow) throws IOException { + int inBytes = in.remaining(); + // I should work on a patch for Snappy to support an overflow buffer + // to prevent the extra buffer copy. + byte[] compressed = getBuffer(compressor.maxCompressedLength(inBytes)); + int outBytes = + compressor.compress(in.array(), in.arrayOffset() + in.position(), inBytes, + compressed, 0, compressed.length); + if (outBytes < inBytes) { + int remaining = out.remaining(); + if (remaining >= outBytes) { + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), outBytes); + out.position(out.position() + outBytes); + } else { + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), remaining); + out.position(out.limit()); + System.arraycopy(compressed, remaining, overflow.array(), + overflow.arrayOffset(), outBytes - remaining); + overflow.position(outBytes - remaining); + } + return true; + } else { + return false; + } + } + + @Override + public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + int inOffset = in.position(); + int uncompressLen = + decompressor.decompress(in.array(), in.arrayOffset() + inOffset, + in.limit() - inOffset, out.array(), out.arrayOffset() + out.position(), + out.remaining()); + out.position(uncompressLen + out.position()); + out.flip(); + } + + @Override + public CompressionCodec modify(EnumSet modifiers) { + // snappy allows no modifications + return this; + } +} diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java index 34f743b863..baf13350ca 100644 --- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java @@ -436,12 +436,10 @@ private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path, // Check compression codec. switch (ps.getCompression()) { case NONE: - break; case ZLIB: - break; case SNAPPY: - break; case LZO: + case LZ4: break; default: throw new IllegalArgumentException("Unknown compression"); diff --git a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java index dd4f30c18f..f4d828a5dd 100644 --- a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java +++ b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java @@ -18,46 +18,20 @@ package org.apache.orc.impl; -import org.apache.orc.CompressionCodec; -import org.iq80.snappy.Snappy; +import io.airlift.compress.snappy.SnappyCompressor; +import io.airlift.compress.snappy.SnappyDecompressor; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.EnumSet; -public class SnappyCodec implements CompressionCodec, DirectDecompressionCodec { +public class SnappyCodec extends AircompressorCodec + implements DirectDecompressionCodec{ private static final HadoopShims SHIMS = HadoopShims.Factory.get(); Boolean direct = null; - @Override - public boolean compress(ByteBuffer in, ByteBuffer out, - ByteBuffer overflow) throws IOException { - int inBytes = in.remaining(); - // I should work on a patch for Snappy to support an overflow buffer - // to prevent the extra buffer copy. - byte[] compressed = new byte[Snappy.maxCompressedLength(inBytes)]; - int outBytes = - Snappy.compress(in.array(), in.arrayOffset() + in.position(), inBytes, - compressed, 0); - if (outBytes < inBytes) { - int remaining = out.remaining(); - if (remaining >= outBytes) { - System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + - out.position(), outBytes); - out.position(out.position() + outBytes); - } else { - System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + - out.position(), remaining); - out.position(out.limit()); - System.arraycopy(compressed, remaining, overflow.array(), - overflow.arrayOffset(), outBytes - remaining); - overflow.position(outBytes - remaining); - } - return true; - } else { - return false; - } + SnappyCodec() { + super(new SnappyCompressor(), new SnappyDecompressor()); } @Override @@ -66,12 +40,7 @@ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { directDecompress(in, out); return; } - int inOffset = in.position(); - int uncompressLen = - Snappy.uncompress(in.array(), in.arrayOffset() + inOffset, - in.limit() - inOffset, out.array(), out.arrayOffset() + out.position()); - out.position(uncompressLen + out.position()); - out.flip(); + super.decompress(in, out); } @Override @@ -99,10 +68,4 @@ public void directDecompress(ByteBuffer in, ByteBuffer out) decompressShim.decompress(in, out); out.flip(); // flip for read } - - @Override - public CompressionCodec modify(EnumSet modifiers) { - // snappy allows no modifications - return this; - } } diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index fd7fee0388..3df1b76d92 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -30,6 +30,12 @@ import java.util.TimeZone; import java.util.TreeMap; +import io.airlift.compress.lz4.Lz4Compressor; +import io.airlift.compress.lz4.Lz4Decompressor; +import io.airlift.compress.lzo.LzoCompressor; +import io.airlift.compress.lzo.LzoDecompressor; +import io.airlift.compress.snappy.SnappyCompressor; +import io.airlift.compress.snappy.SnappyDecompressor; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.orc.BinaryColumnStatistics; @@ -249,23 +255,11 @@ public static CompressionCodec createCodec(CompressionKind kind) { case SNAPPY: return new SnappyCodec(); case LZO: - try { - ClassLoader loader = Thread.currentThread().getContextClassLoader(); - if (loader == null) { - loader = WriterImpl.class.getClassLoader(); - } - @SuppressWarnings("unchecked") - Class lzo = - (Class) - loader.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec"); - return lzo.newInstance(); - } catch (ClassNotFoundException e) { - throw new IllegalArgumentException("LZO is not available.", e); - } catch (InstantiationException e) { - throw new IllegalArgumentException("Problem initializing LZO", e); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException("Insufficient access to LZO", e); - } + return new AircompressorCodec(new LzoCompressor(), + new LzoDecompressor()); + case LZ4: + return new AircompressorCodec(new Lz4Compressor(), + new Lz4Decompressor()); default: throw new IllegalArgumentException("Unknown compression codec: " + kind); @@ -2648,6 +2642,7 @@ private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) { case ZLIB: return OrcProto.CompressionKind.ZLIB; case SNAPPY: return OrcProto.CompressionKind.SNAPPY; case LZO: return OrcProto.CompressionKind.LZO; + case LZ4: return OrcProto.CompressionKind.LZ4; default: throw new IllegalArgumentException("Unknown compression " + kind); } diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java index 37e463b4c1..31ac1c4400 100644 --- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java +++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java @@ -1691,6 +1691,104 @@ public void testSnappy() throws Exception { rows.close(); } + /** + * Read and write a randomly generated lzo file. + * @throws Exception + */ + @Test + public void testLzo() throws Exception { + TypeDescription schema = + TypeDescription.fromString("struct"); + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf) + .setSchema(schema) + .stripeSize(1000) + .compress(CompressionKind.LZO) + .bufferSize(100)); + VectorizedRowBatch batch = schema.createRowBatch(); + Random rand = new Random(69); + batch.size = 1000; + for(int b=0; b < 10; ++b) { + for (int r=0; r < 1000; ++r) { + ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt(); + ((DoubleColumnVector) batch.cols[1]).vector[r] = rand.nextDouble(); + ((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong(); + } + writer.addRowBatch(batch); + } + writer.close(); + Reader reader = OrcFile.createReader(testFilePath, + OrcFile.readerOptions(conf).filesystem(fs)); + assertEquals(CompressionKind.LZO, reader.getCompressionKind()); + RecordReader rows = reader.rows(); + batch = reader.getSchema().createRowBatch(1000); + rand = new Random(69); + for(int b=0; b < 10; ++b) { + rows.nextBatch(batch); + assertEquals(1000, batch.size); + for(int r=0; r < batch.size; ++r) { + assertEquals(rand.nextInt(), + ((LongColumnVector) batch.cols[0]).vector[r]); + assertEquals(rand.nextDouble(), + ((DoubleColumnVector) batch.cols[1]).vector[r], 0.00001); + assertEquals(rand.nextLong(), + ((LongColumnVector) batch.cols[2]).vector[r]); + } + } + rows.nextBatch(batch); + assertEquals(0, batch.size); + rows.close(); + } + + /** + * Read and write a randomly generated lzo file. + * @throws Exception + */ + @Test + public void testLz4() throws Exception { + TypeDescription schema = + TypeDescription.fromString("struct"); + Writer writer = OrcFile.createWriter(testFilePath, + OrcFile.writerOptions(conf) + .setSchema(schema) + .stripeSize(1000) + .compress(CompressionKind.LZ4) + .bufferSize(100)); + VectorizedRowBatch batch = schema.createRowBatch(); + Random rand = new Random(3); + batch.size = 1000; + for(int b=0; b < 10; ++b) { + for (int r=0; r < 1000; ++r) { + ((LongColumnVector) batch.cols[0]).vector[r] = rand.nextInt(); + ((DoubleColumnVector) batch.cols[1]).vector[r] = rand.nextDouble(); + ((LongColumnVector) batch.cols[2]).vector[r] = rand.nextLong(); + } + writer.addRowBatch(batch); + } + writer.close(); + Reader reader = OrcFile.createReader(testFilePath, + OrcFile.readerOptions(conf).filesystem(fs)); + assertEquals(CompressionKind.LZ4, reader.getCompressionKind()); + RecordReader rows = reader.rows(); + batch = reader.getSchema().createRowBatch(1000); + rand = new Random(3); + for(int b=0; b < 10; ++b) { + rows.nextBatch(batch); + assertEquals(1000, batch.size); + for(int r=0; r < batch.size; ++r) { + assertEquals(rand.nextInt(), + ((LongColumnVector) batch.cols[0]).vector[r]); + assertEquals(rand.nextDouble(), + ((DoubleColumnVector) batch.cols[1]).vector[r], 0.00001); + assertEquals(rand.nextLong(), + ((LongColumnVector) batch.cols[2]).vector[r]); + } + } + rows.nextBatch(batch); + assertEquals(0, batch.size); + rows.close(); + } + /** * Read and write a randomly generated snappy file. * @throws Exception diff --git a/java/pom.xml b/java/pom.xml index 8b42fe5d48..5f35323984 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -245,6 +245,11 @@ commons-lang 2.6 + + io.airlift + aircompressor + 0.3 + org.apache.hadoop hadoop-common @@ -348,11 +353,6 @@ jettison 1.1 - - org.iq80.snappy - snappy - 0.2 - org.slf4j slf4j-api