diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc index d2f2b20d16535..ce15b9baaa501 100644 --- a/cpp/src/arrow/ipc/metadata_internal.cc +++ b/cpp/src/arrow/ipc/metadata_internal.cc @@ -936,6 +936,8 @@ static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options, codec = flatbuf::CompressionType::LZ4_FRAME; } else if (options.codec->compression_type() == Compression::ZSTD) { codec = flatbuf::CompressionType::ZSTD; + } else if (options.codec->compression_type() == Compression::FASTPFOR) { + codec = flatbuf::CompressionType::FASTPFOR; } else { return Status::Invalid("Unsupported IPC compression codec: ", options.codec->name()); diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 3ae718bea4230..774f44dcbdf08 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -687,6 +687,8 @@ Status GetCompression(const flatbuf::RecordBatch* batch, Compression::type* out) *out = Compression::LZ4_FRAME; } else if (compression->codec() == flatbuf::CompressionType::ZSTD) { *out = Compression::ZSTD; + } else if (compression->codec() == flatbuf::CompressionType::FASTPFOR) { + *out = Compression::FASTPFOR; } else { return Status::Invalid("Unsupported codec in RecordBatch::compression metadata"); } diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc index 9b33fb2a8ff40..c720154432655 100644 --- a/cpp/src/arrow/util/compression.cc +++ b/cpp/src/arrow/util/compression.cc @@ -100,7 +100,7 @@ Result Codec::GetCompressionType(const std::string& name) { return Compression::ZSTD; } else if (name == "bz2") { return Compression::BZ2; - } else if (name == "FASTPFOR") { + } else if (name == "fastpfor") { return Compression::FASTPFOR; } else { return Status::Invalid("Unrecognized compression type: ", name); diff --git a/cpp/src/generated/Message_generated.h b/cpp/src/generated/Message_generated.h index 1c51c6eafb3b7..6735365b9ff33 100644 --- a/cpp/src/generated/Message_generated.h +++ b/cpp/src/generated/Message_generated.h @@ -32,22 +32,25 @@ struct MessageBuilder; enum class CompressionType : int8_t { LZ4_FRAME = 0, ZSTD = 1, + FASTPFOR = 2, MIN = LZ4_FRAME, MAX = ZSTD }; -inline const CompressionType (&EnumValuesCompressionType())[2] { +inline const CompressionType (&EnumValuesCompressionType())[3] { static const CompressionType values[] = { CompressionType::LZ4_FRAME, - CompressionType::ZSTD + CompressionType::ZSTD, + CompressionType::FASTPFOR }; return values; } inline const char * const *EnumNamesCompressionType() { - static const char * const names[3] = { + static const char * const names[4] = { "LZ4_FRAME", "ZSTD", + "FASTPFOR", nullptr }; return names; diff --git a/format/Message.fbs b/format/Message.fbs index 170ea8fbced89..e350357334ad1 100644 --- a/format/Message.fbs +++ b/format/Message.fbs @@ -49,7 +49,8 @@ enum CompressionType:byte { LZ4_FRAME, // Zstandard - ZSTD + ZSTD, + FASTPFOR } /// Provided for forward compatibility in case we need to support different diff --git a/java/vector/pom.xml b/java/vector/pom.xml index da6b5c3f9b52b..33e72d02c00e4 100644 --- a/java/vector/pom.xml +++ b/java/vector/pom.xml @@ -74,6 +74,11 @@ org.slf4j slf4j-api + + org.apache.commons + commons-compress + 1.20 + diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index 4ae4ae1fc418e..8e5c8ba7a3e89 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -76,12 +76,9 @@ public VectorLoader(VectorSchemaRoot root, CompressionCodec.Factory factory) { public void load(ArrowRecordBatch recordBatch) { Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); - CompressionUtil.CodecType codecType = - CompressionUtil.CodecType.fromCompressionType(recordBatch.getBodyCompression().getCodec()); - decompressionNeeded = codecType != CompressionUtil.CodecType.NO_COMPRESSION; - CompressionCodec codec = decompressionNeeded ? factory.createCodec(codecType) : NoCompressionCodec.INSTANCE; + for (FieldVector fieldVector : root.getFieldVectors()) { - loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes, codec); + loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes); } root.setRowCount(recordBatch.getLength()); if (nodes.hasNext() || buffers.hasNext()) { @@ -94,8 +91,7 @@ protected void loadBuffers( FieldVector vector, Field field, Iterator buffers, - Iterator nodes, - CompressionCodec codec) { + Iterator nodes) { checkArgument(nodes.hasNext(), "no more field nodes for for field %s and vector %s", field, vector); ArrowFieldNode fieldNode = nodes.next(); int bufferLayoutCount = TypeLayout.getTypeBufferCount(field.getType()); @@ -103,8 +99,7 @@ protected void loadBuffers( for (int j = 0; j < bufferLayoutCount; j++) { ArrowBuf nextBuf = buffers.next(); // for vectors without nulls, the buffer is empty, so there is no need to decompress it. - ArrowBuf bufferToAdd = nextBuf.writerIndex() > 0 ? codec.decompress(vector.getAllocator(), nextBuf) : nextBuf; - ownBuffers.add(bufferToAdd); + ownBuffers.add(nextBuf); if (decompressionNeeded) { // decompression performed nextBuf.getReferenceManager().retain(); @@ -130,7 +125,7 @@ protected void loadBuffers( for (int i = 0; i < childrenFromFields.size(); i++) { Field child = children.get(i); FieldVector fieldVector = childrenFromFields.get(i); - loadBuffers(fieldVector, child, buffers, nodes, codec); + loadBuffers(fieldVector, child, buffers, nodes); } } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java index 1deb38c84da05..a3aa04013f7ce 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java @@ -100,4 +100,33 @@ public static ArrowBuf extractUncompressedBuffer(ArrowBuf inputBuffer) { return inputBuffer.slice(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH); } + + public static CompressionCodec createCodec(byte compressionType) { + switch (compressionType) { + case NoCompressionCodec.COMPRESSION_TYPE: + return NoCompressionCodec.INSTANCE; + case CompressionType.LZ4_FRAME: + return new Lz4CompressionCodec(); + default: + throw new IllegalArgumentException("Compression type not supported: " + compressionType); + } + } + /** + * Process compression by compressing the buffer as is. + */ + public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf inputBuffer) { + ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); + compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH); + compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, inputBuffer.writerIndex()); + compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + inputBuffer.writerIndex()); + return compressedBuffer; + } + + /** + * Process decompression by decompressing the buffer as is. + */ + public static ArrowBuf decompressRawBuffer(ArrowBuf inputBuffer) { + return inputBuffer.slice(SIZE_OF_UNCOMPRESSED_LENGTH, + inputBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH); + } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java new file mode 100644 index 0000000000000..6449704e2d242 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN; +import static org.apache.arrow.vector.compression.CompressionUtil.NO_COMPRESSION_LENGTH; +import static org.apache.arrow.vector.compression.CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.arrow.flatbuf.CompressionType; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream; +import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream; +import org.apache.commons.compress.utils.IOUtils; + +import io.netty.util.internal.PlatformDependent; + +/** + * Compression codec for the LZ4 algorithm. + */ +public class Lz4CompressionCodec implements CompressionCodec { + + @Override + public ArrowBuf compress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) { + Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The uncompressed buffer size exceeds the integer limit"); + + if (uncompressedBuffer.writerIndex() == 0L) { + // shortcut for empty buffer + ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH); + compressedBuffer.setLong(0, 0); + compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH); + uncompressedBuffer.close(); + return compressedBuffer; + } + + try { + ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer); + long compressedLength = compressedBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH; + if (compressedLength > uncompressedBuffer.writerIndex()) { + // compressed buffer is larger, send the raw buffer + compressedBuffer.close(); + compressedBuffer = CompressionUtil.compressRawBuffer(allocator, uncompressedBuffer); + } + + uncompressedBuffer.close(); + return compressedBuffer; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) throws IOException { + byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()]; + PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 0, uncompressedBuffer.writerIndex()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (InputStream in = new ByteArrayInputStream(inBytes); + OutputStream out = new FramedLZ4CompressorOutputStream(baos)) { + IOUtils.copy(in, out); + } + + byte[] outBytes = baos.toByteArray(); + + ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + + long uncompressedLength = uncompressedBuffer.writerIndex(); + if (!LITTLE_ENDIAN) { + uncompressedLength = Long.reverseBytes(uncompressedLength); + } + // first 8 bytes reserved for uncompressed length, to be consistent with the + // C++ implementation. + compressedBuffer.setLong(0, uncompressedLength); + + PlatformDependent.copyMemory( + outBytes, 0, compressedBuffer.memoryAddress() + SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length); + compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length); + return compressedBuffer; + } + + @Override + public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf compressedBuffer) { + Preconditions.checkArgument(compressedBuffer.writerIndex() <= Integer.MAX_VALUE, + "The compressed buffer size exceeds the integer limit"); + + Preconditions.checkArgument(compressedBuffer.writerIndex() >= SIZE_OF_UNCOMPRESSED_LENGTH, + "Not enough data to decompress."); + + long decompressedLength = compressedBuffer.getLong(0); + if (!LITTLE_ENDIAN) { + decompressedLength = Long.reverseBytes(decompressedLength); + } + + if (decompressedLength == 0L) { + // shortcut for empty buffer + compressedBuffer.close(); + return allocator.getEmpty(); + } + + if (decompressedLength == NO_COMPRESSION_LENGTH) { + // no compression + return CompressionUtil.decompressRawBuffer(compressedBuffer); + } + + try { + ArrowBuf decompressedBuffer = doDecompress(allocator, compressedBuffer); + compressedBuffer.close(); + return decompressedBuffer; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private ArrowBuf doDecompress(BufferAllocator allocator, ArrowBuf compressedBuffer) throws IOException { + long decompressedLength = compressedBuffer.getLong(0); + if (!LITTLE_ENDIAN) { + decompressedLength = Long.reverseBytes(decompressedLength); + } + + byte[] inBytes = new byte[(int) (compressedBuffer.writerIndex() - SIZE_OF_UNCOMPRESSED_LENGTH)]; + PlatformDependent.copyMemory( + compressedBuffer.memoryAddress() + SIZE_OF_UNCOMPRESSED_LENGTH, inBytes, 0, inBytes.length); + ByteArrayOutputStream out = new ByteArrayOutputStream((int) decompressedLength); + try (InputStream in = new FramedLZ4CompressorInputStream(new ByteArrayInputStream(inBytes))) { + IOUtils.copy(in, out); + } + + byte[] outBytes = out.toByteArray(); + ArrowBuf decompressedBuffer = allocator.buffer(outBytes.length); + PlatformDependent.copyMemory(outBytes, 0, decompressedBuffer.memoryAddress(), outBytes.length); + decompressedBuffer.writerIndex(decompressedLength); + return decompressedBuffer; + } + + @Override + public CompressionUtil.CodecType getCodecType() { + return CompressionUtil.CodecType.LZ4_FRAME; + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java index a0096aaf3ee56..3425af98f44e9 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java @@ -43,9 +43,9 @@ */ public class ArrowStreamReader extends ArrowReader { - private MessageChannelReader messageReader; + protected MessageChannelReader messageReader; - private int loadedDictionaryCount; + protected int loadedDictionaryCount; /** * Constructs a streaming reader using a MessageChannelReader. Non-blocking. @@ -176,7 +176,7 @@ public boolean loadNextBatch() throws IOException { /** * When read a record batch, check whether its dictionaries are available. */ - private void checkDictionaries() throws IOException { + protected void checkDictionaries() throws IOException { // if all dictionaries are loaded, return. if (loadedDictionaryCount == dictionaries.size()) { return; @@ -215,7 +215,7 @@ protected Schema readSchema() throws IOException { } - private ArrowDictionaryBatch readDictionary(MessageResult result) throws IOException { + protected ArrowDictionaryBatch readDictionary(MessageResult result) throws IOException { ArrowBuf bodyBuffer = result.getBodyBuffer(); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java b/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java new file mode 100644 index 0000000000000..1f0217bd0cebf --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/compression/TestCompressionCodec.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.compression; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Test cases for {@link CompressionCodec}s. + */ +@RunWith(Parameterized.class) +public class TestCompressionCodec { + + private final CompressionCodec codec; + + private BufferAllocator allocator; + + private final int vectorLength; + + @Before + public void init() { + allocator = new RootAllocator(Integer.MAX_VALUE); + } + + @After + public void terminate() { + allocator.close(); + } + + public TestCompressionCodec(String name, int vectorLength, CompressionCodec codec) { + this.codec = codec; + this.vectorLength = vectorLength; + } + + @Parameterized.Parameters(name = "codec = {0}, length = {1}") + public static Collection getCodecs() { + List params = new ArrayList<>(); + + int[] lengths = new int[] {10, 100, 1000}; + for (int len : lengths) { + CompressionCodec dumbCodec = NoCompressionCodec.INSTANCE; + params.add(new Object[]{dumbCodec.getCodecType(), len, dumbCodec}); + + CompressionCodec lz4Codec = new Lz4CompressionCodec(); + params.add(new Object[]{lz4Codec.getCodecType(), len, lz4Codec}); + } + return params; + } + + private List compressBuffers(List inputBuffers) { + List outputBuffers = new ArrayList<>(inputBuffers.size()); + for (ArrowBuf buf : inputBuffers) { + outputBuffers.add(codec.compress(allocator, buf)); + } + return outputBuffers; + } + + private List deCompressBuffers(List inputBuffers) { + List outputBuffers = new ArrayList<>(inputBuffers.size()); + for (ArrowBuf buf : inputBuffers) { + outputBuffers.add(codec.decompress(allocator, buf)); + } + return outputBuffers; + } + + @Test + public void testCompressFixedWidthBuffers() throws Exception { + // prepare vector to compress + IntVector origVec = new IntVector("vec", allocator); + origVec.allocateNew(vectorLength); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + origVec.setNull(i); + } else { + origVec.set(i, i); + } + } + origVec.setValueCount(vectorLength); + int nullCount = origVec.getNullCount(); + + // compress & decompress + List origBuffers = origVec.getFieldBuffers(); + List compressedBuffers = compressBuffers(origBuffers); + List decompressedBuffers = deCompressBuffers(compressedBuffers); + + assertEquals(2, decompressedBuffers.size()); + + // orchestrate new vector + IntVector newVec = new IntVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, nullCount), decompressedBuffers); + + // verify new vector + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + assertTrue(newVec.isNull(i)); + } else { + assertEquals(i, newVec.get(i)); + } + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } + + @Test + public void testCompressVariableWidthBuffers() throws Exception { + // prepare vector to compress + VarCharVector origVec = new VarCharVector("vec", allocator); + origVec.allocateNew(); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + origVec.setNull(i); + } else { + origVec.setSafe(i, String.valueOf(i).getBytes()); + } + } + origVec.setValueCount(vectorLength); + int nullCount = origVec.getNullCount(); + + // compress & decompress + List origBuffers = origVec.getFieldBuffers(); + List compressedBuffers = compressBuffers(origBuffers); + List decompressedBuffers = deCompressBuffers(compressedBuffers); + + assertEquals(3, decompressedBuffers.size()); + + // orchestrate new vector + VarCharVector newVec = new VarCharVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, nullCount), decompressedBuffers); + + // verify new vector + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { + if (i % 10 == 0) { + assertTrue(newVec.isNull(i)); + } else { + assertArrayEquals(String.valueOf(i).getBytes(), newVec.get(i)); + } + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } + + @Test + public void testEmptyBuffer() throws Exception { + final VarBinaryVector origVec = new VarBinaryVector("vec", allocator); + + origVec.allocateNew(vectorLength); + + // Do not set any values (all missing) + origVec.setValueCount(vectorLength); + + final List origBuffers = origVec.getFieldBuffers(); + final List compressedBuffers = compressBuffers(origBuffers); + final List decompressedBuffers = deCompressBuffers(compressedBuffers); + + // orchestrate new vector + VarBinaryVector newVec = new VarBinaryVector("new vec", allocator); + newVec.loadFieldBuffers(new ArrowFieldNode(vectorLength, vectorLength), decompressedBuffers); + + // verify new vector + assertEquals(vectorLength, newVec.getValueCount()); + for (int i = 0; i < vectorLength; i++) { + assertTrue(newVec.isNull(i)); + } + + newVec.close(); + AutoCloseables.close(decompressedBuffers); + } +}