diff --git a/pom.xml b/pom.xml
index 986901ba..1771d0e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,7 @@
org.lz4
lz4-java
1.8.0
- test
+
diff --git a/src/main/java/io/airlift/compress/lz4/Lz4FrameCompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4FrameCompressor.java
new file mode 100644
index 00000000..85a2ddbb
--- /dev/null
+++ b/src/main/java/io/airlift/compress/lz4/Lz4FrameCompressor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.airlift.compress.lz4;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import static io.airlift.compress.lz4.Lz4RawCompressor.MAX_TABLE_SIZE;
+import static io.airlift.compress.lz4.UnsafeUtil.getAddress;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;
+
+/**
+ * This class is not thread-safe
+ */
+public class Lz4FrameCompressor
+ implements Compressor
+{
+ private final int[] table = new int[MAX_TABLE_SIZE];
+
+ @Override
+ public int maxCompressedLength(int uncompressedSize)
+ {
+ return Lz4FrameRawCompressor.maxCompressedLength(uncompressedSize);
+ }
+
+ @Override
+ public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
+ {
+ verifyRange(input, inputOffset, inputLength);
+ verifyRange(output, outputOffset, maxOutputLength);
+
+ long inputAddress = ARRAY_BYTE_BASE_OFFSET + inputOffset;
+ long outputAddress = ARRAY_BYTE_BASE_OFFSET + outputOffset;
+
+ return Lz4FrameRawCompressor.compress(
+ input,
+ inputAddress,
+ inputLength,
+ output,
+ outputAddress,
+ maxOutputLength,
+ table);
+ }
+
+ @Override
+ public void compress(ByteBuffer inputBuffer, ByteBuffer outputBuffer)
+ {
+ if (true) {
+ // TODO support byte buffers, see disabled tests
+ throw new UnsupportedOperationException("This is disabled, does not work with direct buffers yet");
+ }
+
+ // Java 9+ added an overload of various methods in ByteBuffer. When compiling with Java 11+ and targeting Java 8 bytecode
+ // the resulting signatures are invalid for JDK 8, so accesses below result in NoSuchMethodError. Accessing the
+ // methods through the interface class works around the problem
+ // Sidenote: we can't target "javac --release 8" because Unsafe is not available in the signature data for that profile
+ Buffer input = inputBuffer;
+ Buffer output = outputBuffer;
+
+ Object inputBase;
+ long inputAddress;
+ int inputLimit;
+ if (input.isDirect()) {
+ inputBase = null;
+ long address = getAddress(input);
+ inputAddress = address + input.position();
+ inputLimit = input.limit();
+ }
+ else if (input.hasArray()) {
+ inputBase = input.array();
+ inputAddress = ARRAY_BYTE_BASE_OFFSET + input.arrayOffset() + input.position();
+ inputLimit = input.limit();
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported input ByteBuffer implementation " + input.getClass().getName());
+ }
+
+ Object outputBase;
+ long outputAddress;
+ int outputLimit;
+ if (output.isDirect()) {
+ outputBase = null;
+ long address = getAddress(output);
+ outputAddress = address + output.position();
+ outputLimit = output.limit();
+ }
+ else if (output.hasArray()) {
+ outputBase = output.array();
+ outputAddress = ARRAY_BYTE_BASE_OFFSET + output.arrayOffset() + output.position();
+ outputLimit = output.limit();
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported output ByteBuffer implementation " + output.getClass().getName());
+ }
+
+ // HACK: Assure JVM does not collect Slice wrappers while compressing, since the
+ // collection may trigger freeing of the underlying memory resulting in a segfault
+ // There is no other known way to signal to the JVM that an object should not be
+ // collected in a block, and technically, the JVM is allowed to eliminate these locks.
+ synchronized (input) {
+ synchronized (output) {
+ int written = Lz4FrameRawCompressor.compress(
+ inputBase,
+ inputAddress,
+ inputLimit,
+ outputBase,
+ outputAddress,
+ outputLimit,
+ table);
+ output.position(output.position() + written);
+ }
+ }
+ }
+
+ private static void verifyRange(byte[] data, int offset, int length)
+ {
+ requireNonNull(data, "data is null");
+ if (offset < 0 || length < 0 || offset + length > data.length) {
+ throw new IllegalArgumentException(format("Invalid offset or length (%s, %s) in array of length %s", offset, length, data.length));
+ }
+ }
+}
diff --git a/src/main/java/io/airlift/compress/lz4/Lz4FrameDecompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4FrameDecompressor.java
new file mode 100644
index 00000000..d3b3b3a9
--- /dev/null
+++ b/src/main/java/io/airlift/compress/lz4/Lz4FrameDecompressor.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.airlift.compress.lz4;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import static io.airlift.compress.lz4.UnsafeUtil.getAddress;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;
+
+public class Lz4FrameDecompressor
+ implements Decompressor
+{
+ @Override
+ public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
+ throws MalformedInputException
+ {
+ verifyRange(input, inputOffset, inputLength);
+ verifyRange(output, outputOffset, maxOutputLength);
+
+ return Lz4FrameRawDecompressor.decompress(
+ input,
+ ARRAY_BYTE_BASE_OFFSET + inputOffset,
+ inputLength,
+ output,
+ ARRAY_BYTE_BASE_OFFSET + outputOffset,
+ maxOutputLength);
+ }
+
+ @Override
+ public void decompress(ByteBuffer inputBuffer, ByteBuffer outputBuffer)
+ throws MalformedInputException
+ {
+ if (true) {
+ // TODO support byte buffers, see disabled tests
+ throw new UnsupportedOperationException("This is disabled, does not work with direct buffers yet");
+ }
+
+ // Java 9+ added an overload of various methods in ByteBuffer. When compiling with Java 11+ and targeting Java 8 bytecode
+ // the resulting signatures are invalid for JDK 8, so accesses below result in NoSuchMethodError. Accessing the
+ // methods through the interface class works around the problem
+ // Sidenote: we can't target "javac --release 8" because Unsafe is not available in the signature data for that profile
+ Buffer input = inputBuffer;
+ Buffer output = outputBuffer;
+
+ Object inputBase;
+ long inputAddress;
+ int inputLimit;
+ if (input.isDirect()) {
+ inputBase = null;
+ long address = getAddress(input);
+ inputAddress = address + input.position();
+ inputLimit = input.limit();
+ }
+ else if (input.hasArray()) {
+ inputBase = input.array();
+ inputAddress = ARRAY_BYTE_BASE_OFFSET + input.arrayOffset() + input.position();
+ inputLimit = input.limit();
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported input ByteBuffer implementation " + input.getClass().getName());
+ }
+
+ Object outputBase;
+ long outputAddress;
+ int outputLimit;
+ if (output.isDirect()) {
+ outputBase = null;
+ long address = getAddress(output);
+ outputAddress = address + output.position();
+ outputLimit = output.limit();
+ }
+ else if (output.hasArray()) {
+ outputBase = output.array();
+ outputAddress = ARRAY_BYTE_BASE_OFFSET + output.arrayOffset() + output.position();
+ outputLimit = output.limit();
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported output ByteBuffer implementation " + output.getClass().getName());
+ }
+
+ // HACK: Assure JVM does not collect Slice wrappers while decompressing, since the
+ // collection may trigger freeing of the underlying memory resulting in a segfault
+ // There is no other known way to signal to the JVM that an object should not be
+ // collected in a block, and technically, the JVM is allowed to eliminate these locks.
+ synchronized (input) {
+ synchronized (output) {
+ int written = Lz4FrameRawDecompressor.decompress(inputBase, inputAddress, inputLimit, outputBase, outputAddress, outputLimit);
+ output.position(output.position() + written);
+ }
+ }
+ }
+
+ public static long getDecompressedSize(byte[] input, int offset, int length)
+ {
+ int baseAddress = ARRAY_BYTE_BASE_OFFSET + offset;
+ return Lz4FrameRawDecompressor.getDecompressedSize(input, baseAddress, length);
+ }
+
+ private static void verifyRange(byte[] data, int offset, int length)
+ {
+ requireNonNull(data, "data is null");
+ if (offset < 0 || length < 0 || offset + length > data.length) {
+ throw new IllegalArgumentException(format("Invalid offset or length (%s, %s) in array of length %s", offset, length, data.length));
+ }
+ }
+}
diff --git a/src/main/java/io/airlift/compress/lz4/Lz4FrameRawCompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4FrameRawCompressor.java
new file mode 100644
index 00000000..48e71961
--- /dev/null
+++ b/src/main/java/io/airlift/compress/lz4/Lz4FrameRawCompressor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.airlift.compress.lz4;
+
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+import static io.airlift.compress.lz4.UnsafeUtil.UNSAFE;
+import static java.lang.Math.toIntExact;
+import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;
+
+/**
+ * Implementation of LZ4 Frame format.
+ */
+final class Lz4FrameRawCompressor
+{
+ private Lz4FrameRawCompressor() {}
+
+ private static final byte[] MAGIC = {0x04, 0x22, 0x4D, 0x18};
+
+ private static final int FRAME_DESCRIPTOR_SIZE =
+ 2 + // FLG byte, BD byte
+ 8 + // content size
+ 1; // HC (Header Checksum)
+
+ private static final int FRAME_START_SIZE = 4 /* magic */ + FRAME_DESCRIPTOR_SIZE;
+ private static final int FRAME_END_SIZE = 4 /* EndMark */;
+
+ private static final int BLOCK_MAX_SIZE = 4 * 1024 * 1024;
+ private static final int BLOCK_MAX_SIZE_MARKER = 7; // per "Block Maximum Size" spec, 7 means 4 MB
+
+ public static int maxCompressedLength(int uncompressedSize)
+ {
+ return FRAME_START_SIZE + Lz4RawCompressor.maxCompressedLength(uncompressedSize) + FRAME_END_SIZE +
+ // block sizes
+ 4 * (uncompressedSize / BLOCK_MAX_SIZE + 2);
+ }
+
+ public static int compress(
+ Object inputBase,
+ long inputAddress,
+ int inputLength,
+ Object outputBase,
+ long outputAddress,
+ int maxOutputLength,
+ int[] table)
+ {
+ long originalOutputAddress = outputAddress;
+
+ if (maxOutputLength < maxCompressedLength(inputLength)) {
+ throw new IllegalArgumentException("Max output length must be larger than " + maxCompressedLength(inputLength));
+ }
+
+ UNSAFE.copyMemory(MAGIC, ARRAY_BYTE_BASE_OFFSET, outputBase, outputAddress, MAGIC.length);
+ outputAddress += MAGIC.length;
+ maxOutputLength -= MAGIC.length;
+
+ byte[] frameDescriptor = new byte[FRAME_DESCRIPTOR_SIZE];
+
+ // FLG byte
+ frameDescriptor[0] =
+ 0b01 << 6 | // Version: 1
+ 1 << 5 | // B.Indep: blocks are independent
+ 0 << 4 | // B.Checksum: no checksum
+ 1 << 3 | // C.Size: content size present
+ 0 << 2 | // C.Checksum: no checksum
+ 0 << 1 | // Reserved
+ 0 << 0; // DictID: no dictionary
+
+ // BD byte
+ frameDescriptor[1] = (BLOCK_MAX_SIZE_MARKER << 4);
+
+ // content size
+ UNSAFE.putLong(frameDescriptor, ARRAY_BYTE_BASE_OFFSET + 2L, inputLength);
+
+ // HC (Header Checksum)
+ XXHash32 xxHash32 = XXHashFactory.fastestInstance().hash32();
+ byte hc = (byte) ((xxHash32.hash(frameDescriptor, 0, frameDescriptor.length - 1, 0) >> 8) & 0xFF);
+ frameDescriptor[frameDescriptor.length - 1] = hc;
+ UNSAFE.copyMemory(frameDescriptor, ARRAY_BYTE_BASE_OFFSET, outputBase, outputAddress, frameDescriptor.length);
+ outputAddress += frameDescriptor.length;
+ maxOutputLength -= frameDescriptor.length;
+
+ while (inputLength > 0) {
+ int blockSize = Math.min(inputLength, BLOCK_MAX_SIZE);
+ int blockHeaderSize = 4;
+ int compressedSize = Lz4RawCompressor.compress(
+ inputBase,
+ inputAddress,
+ blockSize,
+ outputBase,
+ outputAddress + blockHeaderSize,
+ maxOutputLength - blockHeaderSize,
+ table);
+ int uncompressed;
+ if (compressedSize >= blockSize) {
+ // incompressible data
+ uncompressed = 1;
+ compressedSize = blockSize;
+ UNSAFE.copyMemory(inputBase, inputAddress, outputBase, outputAddress + blockHeaderSize, blockSize);
+ UNSAFE.putInt(outputBase, outputAddress, (1 << 31) | blockSize);
+ }
+ else {
+ // compressed data, already written to the output
+ uncompressed = 0;
+ }
+
+ UNSAFE.putInt(outputBase, outputAddress, (uncompressed << 31) | compressedSize);
+ outputAddress += blockHeaderSize + compressedSize;
+ maxOutputLength -= blockHeaderSize + compressedSize;
+
+ inputAddress += blockSize;
+ inputLength -= blockSize;
+ }
+
+ // EndMark
+ UNSAFE.putInt(outputBase, outputAddress, 0);
+ outputAddress += 4;
+ maxOutputLength -= 4;
+
+ return toIntExact(outputAddress - originalOutputAddress);
+ }
+}
diff --git a/src/main/java/io/airlift/compress/lz4/Lz4FrameRawDecompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4FrameRawDecompressor.java
new file mode 100644
index 00000000..22ea1b6f
--- /dev/null
+++ b/src/main/java/io/airlift/compress/lz4/Lz4FrameRawDecompressor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.airlift.compress.lz4;
+
+import static io.airlift.compress.lz4.UnsafeUtil.UNSAFE;
+import static java.lang.Math.toIntExact;
+
+final class Lz4FrameRawDecompressor
+{
+ private Lz4FrameRawDecompressor() {}
+
+ private static final int SUPPORTED_FLAGS = 0b01101000;
+ private static final int SUPPORTED_BD = 0b01110000;
+
+ static int decompress(
+ Object inputBase,
+ long inputAddress,
+ int inputLimit,
+ Object outputBase,
+ long outputAddress,
+ int outputLimit)
+ {
+ long originalOutputAddress = outputAddress;
+
+ checkArgument(inputLimit >= 6, "Not enough input bytes");
+ checkArgument(UNSAFE.getInt(inputBase, inputAddress) == 0x184D2204, "Invalid magic number");
+ inputAddress += 4;
+ inputLimit -= 4;
+
+ byte flags = UNSAFE.getByte(inputBase, inputAddress);
+ byte bd = UNSAFE.getByte(inputBase, inputAddress + 1);
+ inputAddress += 2;
+ inputLimit -= 2;
+
+ checkArgument(getVersion(flags) == 1, "Unsupported version");
+ long contentSize = -1;
+ if (hasContentSize(flags)) {
+ contentSize = UNSAFE.getLong(inputBase, inputAddress);
+ inputAddress += 8;
+ inputLimit -= 8;
+ checkArgument(contentSize <= outputLimit, "Output buffer too small");
+ }
+
+ checkArgument(inputLimit >= 1, "Not enough input bytes");
+ byte hc = UNSAFE.getByte(inputBase, inputAddress); // header checksum
+ inputAddress += 1;
+ inputLimit -= 1;
+
+ checkArgument((flags & ~SUPPORTED_FLAGS) == 0 && areBlocksIndependent(flags), "Unsupported flags");
+ checkArgument((bd & ~SUPPORTED_BD) == 0, "Invalid BD byte");
+
+ while (true) {
+ checkArgument(inputLimit >= 4, "Not enough input bytes");
+ int blockSize = UNSAFE.getInt(inputBase, inputAddress);
+ inputAddress += 4;
+ inputLimit -= 4;
+
+ if (blockSize == 0) {
+ // EndMark
+ break;
+ }
+
+ if ((blockSize & (1 << 31)) != 0) {
+ // uncompressed
+ blockSize = blockSize & Integer.MAX_VALUE;
+ checkArgument(inputLimit >= blockSize, "Not enough input bytes");
+ checkArgument(outputLimit >= blockSize, "Output buffer too small");
+ UNSAFE.copyMemory(inputBase, inputAddress, outputBase, outputAddress, blockSize);
+ inputAddress += blockSize;
+ inputLimit -= blockSize;
+ outputAddress += blockSize;
+ outputLimit -= blockSize;
+ }
+ else {
+ // compressed
+ checkArgument(inputLimit >= blockSize, "Not enough input bytes");
+ int decompressed = Lz4RawDecompressor.decompress(
+ inputBase,
+ inputAddress,
+ inputAddress + blockSize,
+ outputBase,
+ outputAddress,
+ outputAddress + outputLimit);
+ inputAddress += blockSize;
+ inputLimit -= blockSize;
+ outputAddress += decompressed;
+ outputLimit -= decompressed;
+ }
+ }
+
+ checkArgument(inputLimit == 0, "Some input not consumed");
+ int decompressed = toIntExact(outputAddress - originalOutputAddress);
+ checkArgument(contentSize == -1 || decompressed == contentSize, "Decompressed wrong number of bytes");
+ return decompressed;
+ }
+
+ public static long getDecompressedSize(Object inputBase, long inputAddress, int inputLimit)
+ {
+ checkArgument(inputLimit >= 6, "Not enough input bytes");
+ checkArgument(UNSAFE.getInt(inputBase, inputAddress) == 0x184D2204, "Invalid magic number");
+ inputAddress += 4;
+ inputLimit -= 4;
+
+ byte flags = UNSAFE.getByte(inputBase, inputAddress);
+ // BD byte not read
+ inputAddress += 2;
+ inputLimit -= 2;
+ checkArgument(hasContentSize(flags), "Content size (C.Size) not present");
+
+ checkArgument(inputLimit >= 8, "Not enough input bytes");
+ return UNSAFE.getLong(inputBase, inputAddress);
+ }
+
+ private static int getVersion(byte flags)
+ {
+ return flags >> 6;
+ }
+
+ private static boolean hasContentSize(byte flags)
+ {
+ return (flags & (1 << 3)) != 0;
+ }
+
+ private static boolean areBlocksIndependent(byte flags)
+ {
+ return (flags & (1 << 5)) != 0;
+ }
+
+ private static void checkArgument(boolean condition, String message)
+ {
+ if (!condition) {
+ throw new IllegalArgumentException(message);
+ }
+ }
+}
diff --git a/src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java b/src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java
index f17163c0..9205dd2b 100644
--- a/src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java
+++ b/src/main/java/io/airlift/compress/lz4/Lz4RawCompressor.java
@@ -40,7 +40,7 @@ public final class Lz4RawCompressor
private static final int RUN_BITS = 8 - ML_BITS;
private static final int RUN_MASK = (1 << RUN_BITS) - 1;
- private static final int MAX_DISTANCE = ((1 << 16) - 1);
+ static final int MAX_DISTANCE = ((1 << 16) - 1);
private static final int SKIP_TRIGGER = 6; /* Increase this value ==> compression run slower on incompressible data */
diff --git a/src/test/java/io/airlift/compress/lz4/TestLz4Frame.java b/src/test/java/io/airlift/compress/lz4/TestLz4Frame.java
new file mode 100644
index 00000000..50c4a871
--- /dev/null
+++ b/src/test/java/io/airlift/compress/lz4/TestLz4Frame.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.airlift.compress.lz4;
+
+import io.airlift.compress.AbstractTestCompression;
+import io.airlift.compress.Compressor;
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.benchmark.DataSet;
+import io.airlift.compress.thirdparty.JPountzLz4FrameCompressor;
+import io.airlift.compress.thirdparty.JPountzLz4FrameDecompressor;
+import net.jpountz.lz4.LZ4Factory;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+import static org.testng.Assert.assertEquals;
+
+public class TestLz4Frame
+ extends AbstractTestCompression
+{
+ @Override
+ protected Compressor getCompressor()
+ {
+ return new Lz4FrameCompressor();
+ }
+
+ @Override
+ protected Decompressor getDecompressor()
+ {
+ return new Lz4FrameDecompressor();
+ }
+
+ @Override
+ protected boolean isByteBufferSupported()
+ {
+ // TODO support byte buffer
+ return false;
+ }
+
+ @Override
+ protected Compressor getVerifyCompressor()
+ {
+ return new JPountzLz4FrameCompressor(LZ4Factory.fastestInstance());
+ }
+
+ @Override
+ protected Decompressor getVerifyDecompressor()
+ {
+ return new JPountzLz4FrameDecompressor(LZ4Factory.fastestInstance());
+ }
+
+ // test over data sets, should the result depend on input size or its compressibility
+ @Test(dataProvider = "data")
+ public void testGetDecompressedSize(DataSet dataSet)
+ {
+ Compressor compressor = getCompressor();
+ byte[] originalUncompressed = dataSet.getUncompressed();
+ byte[] compressed = new byte[compressor.maxCompressedLength(originalUncompressed.length)];
+
+ int compressedLength = compressor.compress(originalUncompressed, 0, originalUncompressed.length, compressed, 0, compressed.length);
+
+ assertEquals(Lz4FrameDecompressor.getDecompressedSize(compressed, 0, compressedLength), originalUncompressed.length);
+
+ int padding = 10;
+ byte[] compressedWithPadding = new byte[compressedLength + padding];
+ Arrays.fill(compressedWithPadding, (byte) 42);
+ System.arraycopy(compressed, 0, compressedWithPadding, padding, compressedLength);
+ assertEquals(Lz4FrameDecompressor.getDecompressedSize(compressedWithPadding, padding, compressedLength), originalUncompressed.length);
+ }
+}
diff --git a/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameCompressor.java b/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameCompressor.java
new file mode 100644
index 00000000..ecc25724
--- /dev/null
+++ b/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameCompressor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.airlift.compress.thirdparty;
+
+import io.airlift.compress.Compressor;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FrameOutputStream;
+import net.jpountz.xxhash.XXHashFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+
+import static java.lang.String.format;
+
+public class JPountzLz4FrameCompressor
+ implements Compressor
+{
+ private final LZ4Compressor compressor;
+
+ public JPountzLz4FrameCompressor(LZ4Factory factory)
+ {
+ compressor = factory.fastCompressor();
+ }
+
+ @Override
+ public int maxCompressedLength(int uncompressedSize)
+ {
+ int maxHeaderLength;
+ try {
+ Field maxHeaderLengthField = LZ4FrameOutputStream.class.getDeclaredField("LZ4_MAX_HEADER_LENGTH");
+ maxHeaderLengthField.setAccessible(true);
+ maxHeaderLength = maxHeaderLengthField.getInt(null);
+ }
+ catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ return maxHeaderLength + compressor.maxCompressedLength(uncompressedSize);
+ }
+
+ @Override
+ public int compress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
+ {
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ try (LZ4FrameOutputStream compressingOutputStream = new LZ4FrameOutputStream(
+ outputStream,
+ LZ4FrameOutputStream.BLOCKSIZE.SIZE_64KB,
+ inputLength,
+ compressor,
+ XXHashFactory.fastestInstance().hash32(),
+ LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE,
+ LZ4FrameOutputStream.FLG.Bits.CONTENT_SIZE)) {
+ compressingOutputStream.write(input, inputOffset, inputLength);
+ }
+ byte[] compressed = outputStream.toByteArray();
+ if (compressed.length > maxOutputLength) {
+ throw new IllegalArgumentException(format("Output buffer too small, provided capacity %s, compressed data size %s", maxOutputLength, compressed.length));
+ }
+ System.arraycopy(compressed, 0, output, outputOffset, compressed.length);
+ return compressed.length;
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public void compress(ByteBuffer input, ByteBuffer output)
+ {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameDecompressor.java b/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameDecompressor.java
new file mode 100644
index 00000000..c123fa76
--- /dev/null
+++ b/src/test/java/io/airlift/compress/thirdparty/JPountzLz4FrameDecompressor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.airlift.compress.thirdparty;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FrameInputStream;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import net.jpountz.xxhash.XXHashFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+
+import static com.google.common.io.ByteStreams.read;
+
+public class JPountzLz4FrameDecompressor
+ implements Decompressor
+{
+ private final LZ4SafeDecompressor decompressor;
+
+ public JPountzLz4FrameDecompressor(LZ4Factory factory)
+ {
+ decompressor = factory.safeDecompressor();
+ }
+
+ @Override
+ public int decompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset, int maxOutputLength)
+ throws MalformedInputException
+ {
+ try (ByteArrayInputStream inputStream = new ByteArrayInputStream(input, inputOffset, inputLength);
+ LZ4FrameInputStream decompressingInputStream = new LZ4FrameInputStream(inputStream, decompressor, XXHashFactory.fastestInstance().hash32())) {
+ return read(decompressingInputStream, output, outputOffset, maxOutputLength);
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public void decompress(ByteBuffer input, ByteBuffer output)
+ throws MalformedInputException
+ {
+ throw new UnsupportedOperationException();
+ }
+}