Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Configurable GzipCompressingInputStream compression buffer size #5848

Merged
merged 4 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.CountingInputStream;
import com.google.common.primitives.Shorts;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.SequenceInputStream;
Expand Down Expand Up @@ -51,12 +52,31 @@ public static byte[] getMagicPrefix() {
return Arrays.copyOf(GZIP_HEADER, 2);
}

/**
* Wraps a stream with GZIP compression using default compression level and reasonable default buffer size.
*
* @param uncompressed uncompressed stream to wrap
* @return GZIP compressed stream
*/
public static InputStream compress(InputStream uncompressed) {
return compress(
uncompressed, Shorts.checkedCast(Deflater.DEFAULT_COMPRESSION), StreamCompression.DEFAULT_BLOCK_SIZE);
}

/**
* Wraps a stream with GZIP compression.
*
* @param uncompressed uncompressed stream to wrap
* @param compressionLevel compression level (1 to 9), see {@link Deflater}
* @param bufferSize deflation buffer size
* @return GZIP compressed stream
*/
public static InputStream compress(InputStream uncompressed, short compressionLevel, int bufferSize) {
InputStream header = createHeaderStream();
CountingInputStream counting = new CountingInputStream(uncompressed);
CRC32 crc = new CRC32();
CheckedInputStream checked = new CheckedInputStream(counting, crc);
InputStream content = new DeflaterInputStream(checked, new Deflater(Deflater.DEFAULT_COMPRESSION, true));
InputStream content = new DeflaterInputStream(checked, new Deflater(compressionLevel, true), bufferSize);
List<Supplier<InputStream>> allStreams =
ImmutableList.of(() -> header, () -> content, () -> trailerStream(counting.getCount(), crc));
return new SequenceInputStream(Collections.enumeration(Lists.transform(allStreams, Supplier::get)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public enum StreamCompression {

private static final byte[] gzipMagic = GzipCompressingInputStream.getMagicPrefix();
private static final byte[] lz4Magic = "LZ4Block".getBytes(StandardCharsets.UTF_8);
static final int DEFAULT_BLOCK_SIZE = 1 << 16; // 64 KiB

public InputStream compress(InputStream stream) {
switch (this) {
Expand Down Expand Up @@ -60,8 +61,8 @@ public InputStream decompress(InputStream stream) {
private static boolean startsWith(InputStream stream, byte[] data) throws IOException {
stream.mark(data.length);
try {
for (int i = 0; i < data.length; i++) {
if (stream.read() != Byte.toUnsignedInt(data[i])) {
for (byte datum : data) {
if (stream.read() != Byte.toUnsignedInt(datum)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

return false;
}
}
Expand All @@ -73,7 +74,7 @@ private static boolean startsWith(InputStream stream, byte[] data) throws IOExce

private static InputStream decompressWithHeader(InputStream unbuffered) {
try {
BufferedInputStream stream = new BufferedInputStream(unbuffered);
BufferedInputStream stream = new BufferedInputStream(unbuffered, DEFAULT_BLOCK_SIZE);
if (startsWith(stream, gzipMagic)) {
return new GZIPInputStream(stream);
} else if (startsWith(stream, lz4Magic)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class StreamCompressionTests {
private static final byte SINGLE_VALUE = 42;
private static final int BLOCK_SIZE = 1 << 16; // 64 KB

private ByteArrayInputStream uncompressedStream;
private InputStream compressingStream;
private InputStream decompressingStream;

Expand All @@ -47,7 +46,7 @@ public StreamCompressionTests(StreamCompression compression) {
this.compression = compression;
}

@Parameterized.Parameters
@Parameterized.Parameters(name = "{index} {0} compression")
public static Object[] parameters() {
return StreamCompression.values();
}
Expand Down Expand Up @@ -125,9 +124,9 @@ public void testMultiBlock_singleByteReads() throws IOException {
fillWithIncompressibleData(uncompressedData);
initializeStreams(uncompressedData);

for (int i = 0; i < uncompressedData.length; ++i) {
for (byte uncompressedDatum : uncompressedData) {
int value = decompressingStream.read();
assertThat(value).isEqualTo(uncompressedData[i] & 0xFF);
assertThat(value).isEqualTo(uncompressedDatum & 0xFF);
}
assertStreamIsEmpty(decompressingStream);
}
Expand Down Expand Up @@ -159,16 +158,16 @@ private void testStream_incompressible(int streamSize) throws IOException {
}

private void initializeStreams(byte[] uncompressedData) {
uncompressedStream = new ByteArrayInputStream(uncompressedData);
ByteArrayInputStream uncompressedStream = new ByteArrayInputStream(uncompressedData);
compressingStream = compression.compress(uncompressedStream);
decompressingStream = compression.decompress(compressingStream);
}

private void fillWithCompressibleData(byte[] data) {
private static void fillWithCompressibleData(byte[] data) {
Arrays.fill(data, SINGLE_VALUE);
}

private void fillWithIncompressibleData(byte[] data) {
private static void fillWithIncompressibleData(byte[] data) {
new Random(0).nextBytes(data);
}

Expand All @@ -179,7 +178,7 @@ private void verifyStreamContents(byte[] uncompressedData) throws IOException {
assertStreamIsEmpty(decompressingStream);
}

private void assertStreamIsEmpty(InputStream stream) throws IOException {
private static void assertStreamIsEmpty(InputStream stream) throws IOException {
assertThat(stream.read()).isEqualTo(-1);
}
}
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-5848.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: improvement
improvement:
description: |-
Added a configurable GzipCompressingInputStream compression level and buffer size. Increased the default compression buffer size to 64 KiB.
links:
- https://github.com/palantir/atlasdb/pull/5848