From 7e79aa5a48a59d6e16ce5c535b5019123ae22743 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Tue, 11 Jan 2022 11:22:09 -0500 Subject: [PATCH 1/4] Configurable GzipCompressingInputStream compression buffer size Allow configuring the GZIP compression buffer size. Increase default compression buffer size to 64 KiB. --- .../compression/GzipCompressingInputStream.java | 7 ++++++- .../common/compression/StreamCompression.java | 9 +++++---- .../compression/StreamCompressionTests.java | 15 +++++++-------- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java b/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java index 0859df3b938..8f4fcb7621a 100644 --- a/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java +++ b/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java @@ -52,11 +52,16 @@ public static byte[] getMagicPrefix() { } public static InputStream compress(InputStream uncompressed) { + return compress(uncompressed, StreamCompression.DEFAULT_BLOCK_SIZE); + } + + public static InputStream compress(InputStream uncompressed, int bufferSize) { InputStream header = createHeaderStream(); CountingInputStream counting = new CountingInputStream(uncompressed); CRC32 crc = new CRC32(); CheckedInputStream checked = new CheckedInputStream(counting, crc); - InputStream content = new DeflaterInputStream(checked, new Deflater(Deflater.DEFAULT_COMPRESSION, true)); + InputStream content = + new DeflaterInputStream(checked, new Deflater(Deflater.DEFAULT_COMPRESSION, true), bufferSize); List> allStreams = ImmutableList.of(() -> header, () -> content, () -> trailerStream(counting.getCount(), crc)); return new SequenceInputStream(Collections.enumeration(Lists.transform(allStreams, Supplier::get))); diff --git a/atlasdb-commons/src/main/java/com/palantir/common/compression/StreamCompression.java b/atlasdb-commons/src/main/java/com/palantir/common/compression/StreamCompression.java index a05aae3bddb..280a2caa5ec 100644 --- a/atlasdb-commons/src/main/java/com/palantir/common/compression/StreamCompression.java +++ b/atlasdb-commons/src/main/java/com/palantir/common/compression/StreamCompression.java @@ -33,11 +33,12 @@ public enum StreamCompression { private static final byte[] gzipMagic = GzipCompressingInputStream.getMagicPrefix(); private static final byte[] lz4Magic = "LZ4Block".getBytes(StandardCharsets.UTF_8); + static final int DEFAULT_BLOCK_SIZE = 1 << 16; // 64 KiB public InputStream compress(InputStream stream) { switch (this) { case GZIP: - return GzipCompressingInputStream.compress(stream); + return GzipCompressingInputStream.compress(stream, DEFAULT_BLOCK_SIZE); case LZ4: return new LZ4CompressingInputStream(stream); case NONE: @@ -60,8 +61,8 @@ public InputStream decompress(InputStream stream) { private static boolean startsWith(InputStream stream, byte[] data) throws IOException { stream.mark(data.length); try { - for (int i = 0; i < data.length; i++) { - if (stream.read() != Byte.toUnsignedInt(data[i])) { + for (byte datum : data) { + if (stream.read() != Byte.toUnsignedInt(datum)) { return false; } } @@ -73,7 +74,7 @@ private static boolean startsWith(InputStream stream, byte[] data) throws IOExce private static InputStream decompressWithHeader(InputStream unbuffered) { try { - BufferedInputStream stream = new BufferedInputStream(unbuffered); + BufferedInputStream stream = new BufferedInputStream(unbuffered, DEFAULT_BLOCK_SIZE); if (startsWith(stream, gzipMagic)) { return new GZIPInputStream(stream); } else if (startsWith(stream, lz4Magic)) { diff --git a/atlasdb-commons/src/test/java/com/palantir/common/compression/StreamCompressionTests.java b/atlasdb-commons/src/test/java/com/palantir/common/compression/StreamCompressionTests.java index d9f6490a32c..0d132c13c08 100644 --- a/atlasdb-commons/src/test/java/com/palantir/common/compression/StreamCompressionTests.java +++ b/atlasdb-commons/src/test/java/com/palantir/common/compression/StreamCompressionTests.java @@ -37,7 +37,6 @@ public class StreamCompressionTests { private static final byte SINGLE_VALUE = 42; private static final int BLOCK_SIZE = 1 << 16; // 64 KB - private ByteArrayInputStream uncompressedStream; private InputStream compressingStream; private InputStream decompressingStream; @@ -47,7 +46,7 @@ public StreamCompressionTests(StreamCompression compression) { this.compression = compression; } - @Parameterized.Parameters + @Parameterized.Parameters(name = "{index} {0} compression") public static Object[] parameters() { return StreamCompression.values(); } @@ -125,9 +124,9 @@ public void testMultiBlock_singleByteReads() throws IOException { fillWithIncompressibleData(uncompressedData); initializeStreams(uncompressedData); - for (int i = 0; i < uncompressedData.length; ++i) { + for (byte uncompressedDatum : uncompressedData) { int value = decompressingStream.read(); - assertThat(value).isEqualTo(uncompressedData[i] & 0xFF); + assertThat(value).isEqualTo(uncompressedDatum & 0xFF); } assertStreamIsEmpty(decompressingStream); } @@ -159,16 +158,16 @@ private void testStream_incompressible(int streamSize) throws IOException { } private void initializeStreams(byte[] uncompressedData) { - uncompressedStream = new ByteArrayInputStream(uncompressedData); + ByteArrayInputStream uncompressedStream = new ByteArrayInputStream(uncompressedData); compressingStream = compression.compress(uncompressedStream); decompressingStream = compression.decompress(compressingStream); } - private void fillWithCompressibleData(byte[] data) { + private static void fillWithCompressibleData(byte[] data) { Arrays.fill(data, SINGLE_VALUE); } - private void fillWithIncompressibleData(byte[] data) { + private static void fillWithIncompressibleData(byte[] data) { new Random(0).nextBytes(data); } @@ -179,7 +178,7 @@ private void verifyStreamContents(byte[] uncompressedData) throws IOException { assertStreamIsEmpty(decompressingStream); } - private void assertStreamIsEmpty(InputStream stream) throws IOException { + private static void assertStreamIsEmpty(InputStream stream) throws IOException { assertThat(stream.read()).isEqualTo(-1); } } From 5cffc33065441088104e9c7bf9322b8365b49353 Mon Sep 17 00:00:00 2001 From: svc-changelog Date: Tue, 11 Jan 2022 16:28:54 +0000 Subject: [PATCH 2/4] Add generated changelog entries --- changelog/@unreleased/pr-5848.v2.yml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 changelog/@unreleased/pr-5848.v2.yml diff --git a/changelog/@unreleased/pr-5848.v2.yml b/changelog/@unreleased/pr-5848.v2.yml new file mode 100644 index 00000000000..337fb31367f --- /dev/null +++ b/changelog/@unreleased/pr-5848.v2.yml @@ -0,0 +1,27 @@ +type: improvement +improvement: + description: |- + Configurable GzipCompressingInputStream compression buffer size + + **Goals (and why)**: + `GzipCompressingInputStream` did not allow configuring compression buffer size, leading to suboptimal compression with default `DeflaterInputStream` buffer size of 512 bytes. + + **Implementation Description (bullets)**: + Allow configuring the GZIP compression buffer size. Increase default compression buffer size to 64 KiB. + + **Testing (What was existing testing like? What have you done to improve it?)**: + StreamCompressionTests + + **Concerns (what feedback would you like?)**: Is 64 KiB too large default? My sense is no, and that's the current default for LZ4 block size. + + **Where should we start reviewing?**: GzipCompressingInputStream + + **Priority (whenever / two weeks / yesterday)**: this week + + + links: + - https://github.com/palantir/atlasdb/pull/5848 From e98385d65042aa92f87b8214be24ab3724ff74b9 Mon Sep 17 00:00:00 2001 From: Glenn Sheasby Date: Tue, 11 Jan 2022 17:02:57 +0000 Subject: [PATCH 3/4] fixup changelog --- changelog/@unreleased/pr-5848.v2.yml | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/changelog/@unreleased/pr-5848.v2.yml b/changelog/@unreleased/pr-5848.v2.yml index 337fb31367f..8bbcc0e83f6 100644 --- a/changelog/@unreleased/pr-5848.v2.yml +++ b/changelog/@unreleased/pr-5848.v2.yml @@ -1,27 +1,6 @@ type: improvement improvement: description: |- - Configurable GzipCompressingInputStream compression buffer size - - **Goals (and why)**: - `GzipCompressingInputStream` did not allow configuring compression buffer size, leading to suboptimal compression with default `DeflaterInputStream` buffer size of 512 bytes. - - **Implementation Description (bullets)**: - Allow configuring the GZIP compression buffer size. Increase default compression buffer size to 64 KiB. - - **Testing (What was existing testing like? What have you done to improve it?)**: - StreamCompressionTests - - **Concerns (what feedback would you like?)**: Is 64 KiB too large default? My sense is no, and that's the current default for LZ4 block size. - - **Where should we start reviewing?**: GzipCompressingInputStream - - **Priority (whenever / two weeks / yesterday)**: this week - - + Added a configurable GzipCompressingInputStream compression buffer size. Increased the default compression buffer size to 64 KiB. links: - https://github.com/palantir/atlasdb/pull/5848 From 3e0d262c98b6c6140e3ceeb2932e6cc85d0ca3c2 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Tue, 11 Jan 2022 12:17:51 -0500 Subject: [PATCH 4/4] Configurable GzipCompressingInputStream compression level --- .../GzipCompressingInputStream.java | 23 +++++++++++++++---- .../common/compression/StreamCompression.java | 2 +- changelog/@unreleased/pr-5848.v2.yml | 2 +- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java b/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java index 8f4fcb7621a..e353aa394a0 100644 --- a/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java +++ b/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.io.CountingInputStream; +import com.google.common.primitives.Shorts; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.SequenceInputStream; @@ -51,17 +52,31 @@ public static byte[] getMagicPrefix() { return Arrays.copyOf(GZIP_HEADER, 2); } + /** + * Wraps a stream with GZIP compression using default compression level and reasonable default buffer size. + * + * @param uncompressed uncompressed stream to wrap + * @return GZIP compressed stream + */ public static InputStream compress(InputStream uncompressed) { - return compress(uncompressed, StreamCompression.DEFAULT_BLOCK_SIZE); + return compress( + uncompressed, Shorts.checkedCast(Deflater.DEFAULT_COMPRESSION), StreamCompression.DEFAULT_BLOCK_SIZE); } - public static InputStream compress(InputStream uncompressed, int bufferSize) { + /** + * Wraps a stream with GZIP compression. + * + * @param uncompressed uncompressed stream to wrap + * @param compressionLevel compression level (1 to 9), see {@link Deflater} + * @param bufferSize deflation buffer size + * @return GZIP compressed stream + */ + public static InputStream compress(InputStream uncompressed, short compressionLevel, int bufferSize) { InputStream header = createHeaderStream(); CountingInputStream counting = new CountingInputStream(uncompressed); CRC32 crc = new CRC32(); CheckedInputStream checked = new CheckedInputStream(counting, crc); - InputStream content = - new DeflaterInputStream(checked, new Deflater(Deflater.DEFAULT_COMPRESSION, true), bufferSize); + InputStream content = new DeflaterInputStream(checked, new Deflater(compressionLevel, true), bufferSize); List> allStreams = ImmutableList.of(() -> header, () -> content, () -> trailerStream(counting.getCount(), crc)); return new SequenceInputStream(Collections.enumeration(Lists.transform(allStreams, Supplier::get))); diff --git a/atlasdb-commons/src/main/java/com/palantir/common/compression/StreamCompression.java b/atlasdb-commons/src/main/java/com/palantir/common/compression/StreamCompression.java index 280a2caa5ec..218a52b584f 100644 --- a/atlasdb-commons/src/main/java/com/palantir/common/compression/StreamCompression.java +++ b/atlasdb-commons/src/main/java/com/palantir/common/compression/StreamCompression.java @@ -38,7 +38,7 @@ public enum StreamCompression { public InputStream compress(InputStream stream) { switch (this) { case GZIP: - return GzipCompressingInputStream.compress(stream, DEFAULT_BLOCK_SIZE); + return GzipCompressingInputStream.compress(stream); case LZ4: return new LZ4CompressingInputStream(stream); case NONE: diff --git a/changelog/@unreleased/pr-5848.v2.yml b/changelog/@unreleased/pr-5848.v2.yml index 8bbcc0e83f6..80c54908c89 100644 --- a/changelog/@unreleased/pr-5848.v2.yml +++ b/changelog/@unreleased/pr-5848.v2.yml @@ -1,6 +1,6 @@ type: improvement improvement: description: |- - Added a configurable GzipCompressingInputStream compression buffer size. Increased the default compression buffer size to 64 KiB. + Added a configurable GzipCompressingInputStream compression level and buffer size. Increased the default compression buffer size to 64 KiB. links: - https://github.com/palantir/atlasdb/pull/5848