Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Commit

Permalink
Configurable GzipCompressingInputStream compression buffer size (#5848)
Browse files Browse the repository at this point in the history
  • Loading branch information
schlosna authored Jan 11, 2022
1 parent c1130c9 commit a1a92e9
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.CountingInputStream;
import com.google.common.primitives.Shorts;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.SequenceInputStream;
Expand Down Expand Up @@ -51,12 +52,31 @@ public static byte[] getMagicPrefix() {
return Arrays.copyOf(GZIP_HEADER, 2);
}

/**
* Wraps a stream with GZIP compression using default compression level and reasonable default buffer size.
*
* @param uncompressed uncompressed stream to wrap
* @return GZIP compressed stream
*/
public static InputStream compress(InputStream uncompressed) {
return compress(
uncompressed, Shorts.checkedCast(Deflater.DEFAULT_COMPRESSION), StreamCompression.DEFAULT_BLOCK_SIZE);
}

/**
* Wraps a stream with GZIP compression.
*
* @param uncompressed uncompressed stream to wrap
* @param compressionLevel compression level (1 to 9), see {@link Deflater}
* @param bufferSize deflation buffer size
* @return GZIP compressed stream
*/
public static InputStream compress(InputStream uncompressed, short compressionLevel, int bufferSize) {
InputStream header = createHeaderStream();
CountingInputStream counting = new CountingInputStream(uncompressed);
CRC32 crc = new CRC32();
CheckedInputStream checked = new CheckedInputStream(counting, crc);
InputStream content = new DeflaterInputStream(checked, new Deflater(Deflater.DEFAULT_COMPRESSION, true));
InputStream content = new DeflaterInputStream(checked, new Deflater(compressionLevel, true), bufferSize);
List<Supplier<InputStream>> allStreams =
ImmutableList.of(() -> header, () -> content, () -> trailerStream(counting.getCount(), crc));
return new SequenceInputStream(Collections.enumeration(Lists.transform(allStreams, Supplier::get)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public enum StreamCompression {

private static final byte[] gzipMagic = GzipCompressingInputStream.getMagicPrefix();
private static final byte[] lz4Magic = "LZ4Block".getBytes(StandardCharsets.UTF_8);
static final int DEFAULT_BLOCK_SIZE = 1 << 16; // 64 KiB

public InputStream compress(InputStream stream) {
switch (this) {
Expand Down Expand Up @@ -60,8 +61,8 @@ public InputStream decompress(InputStream stream) {
private static boolean startsWith(InputStream stream, byte[] data) throws IOException {
stream.mark(data.length);
try {
for (int i = 0; i < data.length; i++) {
if (stream.read() != Byte.toUnsignedInt(data[i])) {
for (byte datum : data) {
if (stream.read() != Byte.toUnsignedInt(datum)) {
return false;
}
}
Expand All @@ -73,7 +74,7 @@ private static boolean startsWith(InputStream stream, byte[] data) throws IOExce

private static InputStream decompressWithHeader(InputStream unbuffered) {
try {
BufferedInputStream stream = new BufferedInputStream(unbuffered);
BufferedInputStream stream = new BufferedInputStream(unbuffered, DEFAULT_BLOCK_SIZE);
if (startsWith(stream, gzipMagic)) {
return new GZIPInputStream(stream);
} else if (startsWith(stream, lz4Magic)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class StreamCompressionTests {
private static final byte SINGLE_VALUE = 42;
private static final int BLOCK_SIZE = 1 << 16; // 64 KB

private ByteArrayInputStream uncompressedStream;
private InputStream compressingStream;
private InputStream decompressingStream;

Expand All @@ -47,7 +46,7 @@ public StreamCompressionTests(StreamCompression compression) {
this.compression = compression;
}

@Parameterized.Parameters
@Parameterized.Parameters(name = "{index} {0} compression")
public static Object[] parameters() {
return StreamCompression.values();
}
Expand Down Expand Up @@ -125,9 +124,9 @@ public void testMultiBlock_singleByteReads() throws IOException {
fillWithIncompressibleData(uncompressedData);
initializeStreams(uncompressedData);

for (int i = 0; i < uncompressedData.length; ++i) {
for (byte uncompressedDatum : uncompressedData) {
int value = decompressingStream.read();
assertThat(value).isEqualTo(uncompressedData[i] & 0xFF);
assertThat(value).isEqualTo(uncompressedDatum & 0xFF);
}
assertStreamIsEmpty(decompressingStream);
}
Expand Down Expand Up @@ -159,16 +158,16 @@ private void testStream_incompressible(int streamSize) throws IOException {
}

private void initializeStreams(byte[] uncompressedData) {
uncompressedStream = new ByteArrayInputStream(uncompressedData);
ByteArrayInputStream uncompressedStream = new ByteArrayInputStream(uncompressedData);
compressingStream = compression.compress(uncompressedStream);
decompressingStream = compression.decompress(compressingStream);
}

private void fillWithCompressibleData(byte[] data) {
private static void fillWithCompressibleData(byte[] data) {
Arrays.fill(data, SINGLE_VALUE);
}

private void fillWithIncompressibleData(byte[] data) {
private static void fillWithIncompressibleData(byte[] data) {
new Random(0).nextBytes(data);
}

Expand All @@ -179,7 +178,7 @@ private void verifyStreamContents(byte[] uncompressedData) throws IOException {
assertStreamIsEmpty(decompressingStream);
}

private void assertStreamIsEmpty(InputStream stream) throws IOException {
private static void assertStreamIsEmpty(InputStream stream) throws IOException {
assertThat(stream.read()).isEqualTo(-1);
}
}
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-5848.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: improvement
improvement:
description: |-
Added a configurable GzipCompressingInputStream compression level and buffer size. Increased the default compression buffer size to 64 KiB.
links:
- https://github.com/palantir/atlasdb/pull/5848

0 comments on commit a1a92e9

Please sign in to comment.