diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinition.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinition.java index 37d0957af5a..115655eb7a3 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinition.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinition.java @@ -27,6 +27,7 @@ import com.palantir.atlasdb.table.description.render.Renderers; import com.palantir.atlasdb.table.description.render.StreamStoreRenderer; import com.palantir.common.base.Throwables; +import com.palantir.common.compression.ClientCompressor; public class StreamStoreDefinition { // from ArrayList.MAX_ARRAY_SIZE on 64-bit systems @@ -36,7 +37,7 @@ public class StreamStoreDefinition { private final String shortName; private final String longName; private final ValueType idType; - private final boolean compressStream; + private final ClientCompressor streamCompressType; private final int numberOfRowComponentsHashed; private int inMemoryThreshold; @@ -47,14 +48,14 @@ public class StreamStoreDefinition { String longName, ValueType idType, int inMemoryThreshold, - boolean compressStream, + ClientCompressor streamCompressType, int numberOfRowComponentsHashed) { this.streamStoreTables = streamStoreTables; this.shortName = shortName; this.longName = longName; this.idType = idType; this.inMemoryThreshold = inMemoryThreshold; - this.compressStream = compressStream; + this.streamCompressType = streamCompressType; this.numberOfRowComponentsHashed = numberOfRowComponentsHashed; } @@ -72,7 +73,7 @@ public int getNumberOfRowComponentsHashed() { public StreamStoreRenderer getRenderer(String packageName, String name) { String renderedLongName = Renderers.CamelCase(longName); - return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold, compressStream); + return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold, streamCompressType); } public Multimap> getCleanupTasks( diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinitionBuilder.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinitionBuilder.java index 636eeec1374..367a69a6098 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinitionBuilder.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinitionBuilder.java @@ -24,6 +24,7 @@ import com.palantir.atlasdb.protos.generated.TableMetadataPersistence; import com.palantir.atlasdb.table.description.TableDefinition; import com.palantir.atlasdb.table.description.ValueType; +import com.palantir.common.compression.ClientCompressor; public class StreamStoreDefinitionBuilder { private final ValueType valueType; @@ -32,7 +33,7 @@ public class StreamStoreDefinitionBuilder { private Map streamTables = Maps.newHashMapWithExpectedSize(StreamTableType.values().length); private int inMemoryThreshold = AtlasDbConstants.DEFAULT_STREAM_IN_MEMORY_THRESHOLD; - private boolean compressStream; + private ClientCompressor compressStreamType; private int numberOfRowComponentsHashed = 0; /** @@ -48,7 +49,7 @@ public StreamStoreDefinitionBuilder(String shortName, String longName, ValueType this.valueType = valueType; this.shortName = shortName; this.longName = longName; - this.compressStream = false; + this.compressStreamType = ClientCompressor.NONE; } /** @@ -102,7 +103,11 @@ public StreamStoreDefinitionBuilder compressBlocksInDb() { } public StreamStoreDefinitionBuilder compressStreamInClient() { - compressStream = true; + return compressStreamInClient(ClientCompressor.LZ4); + } + + public StreamStoreDefinitionBuilder compressStreamInClient(ClientCompressor compressionType) { + compressStreamType = compressionType; return this; } @@ -125,7 +130,7 @@ public StreamStoreDefinition build() { longName, valueType, inMemoryThreshold, - compressStream, + compressStreamType, numberOfRowComponentsHashed); } diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java index fe14f81075d..f94afb4f36c 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java @@ -27,6 +27,7 @@ import java.security.MessageDigest; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -75,7 +76,8 @@ import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; -import com.palantir.common.compression.LZ4CompressingInputStream; +import com.palantir.common.compression.ClientCompressor; +import com.palantir.common.compression.CompressorForwardingInputStream; import com.palantir.common.io.ConcatenatedInputStream; import com.palantir.util.AssertUtils; import com.palantir.util.ByteArrayIOStream; @@ -84,8 +86,6 @@ import com.palantir.util.file.DeleteOnCloseFileInputStream; import com.palantir.util.file.TempFileUtils; -import net.jpountz.lz4.LZ4BlockInputStream; - @SuppressWarnings("checkstyle:all") // too many warnings to fix public class StreamStoreRenderer { private final String name; @@ -93,15 +93,15 @@ public class StreamStoreRenderer { private final String packageName; private final String schemaName; private final int inMemoryThreshold; - private final boolean clientSideCompression; + private final ClientCompressor clientSideCompressor; - public StreamStoreRenderer(String name, ValueType streamIdType, String packageName, String schemaName, int inMemoryThreshold, boolean clientSideCompression) { + public StreamStoreRenderer(String name, ValueType streamIdType, String packageName, String schemaName, int inMemoryThreshold, ClientCompressor clientSideCompressor) { this.name = name; this.streamIdType = streamIdType; this.packageName = packageName; this.schemaName = schemaName; this.inMemoryThreshold = inMemoryThreshold; - this.clientSideCompression = clientSideCompression; + this.clientSideCompressor = clientSideCompressor; } public String getPackageName() { @@ -173,7 +173,7 @@ protected void run() { line(); getBlock(); line(); - if (clientSideCompression) { + if (clientSideCompressor != ClientCompressor.NONE) { storeBlocksAndGetFinalMetadata(); line(); loadStreamWithCompression(); @@ -219,6 +219,8 @@ private void fields() { line("private static final Logger log = LoggerFactory.getLogger(", StreamStore, ".class);"); line(); line("private final ", TableFactory, " tables;"); + line("private final ClientCompressor clientSideCompressor = ClientCompressor." + clientSideCompressor.name(), ";"); + } private void constructors() { @@ -563,7 +565,7 @@ private void storeBlocksAndGetFinalMetadata() { line("//Hash the data before compressing it"); line("MessageDigest digest = Sha256Hash.getMessageDigest();"); line("try (InputStream hashingStream = new DigestInputStream(stream, digest);"); - line(" InputStream compressingStream = new LZ4CompressingInputStream(hashingStream)) {"); { + line(" InputStream compressingStream = clientSideCompressor.getCompressor(hashingStream)) {"); { line("StreamMetadata metadata = storeBlocksAndGetHashlessMetadata(t, id, compressingStream);"); line("return StreamMetadata.newBuilder(metadata)"); line(" .setHash(ByteString.copyFrom(digest.digest()))"); @@ -577,7 +579,7 @@ private void storeBlocksAndGetFinalMetadata() { private void loadStreamWithCompression() { line("@Override"); line("public InputStream loadStream(Transaction t, final ", StreamId, " id) {"); { - line("return new LZ4BlockInputStream(super.loadStream(t, id));"); + line("return new CompressorForwardingInputStream(super.loadStream(t, id));"); } line("}"); } @@ -585,7 +587,7 @@ private void loadSingleStreamWithCompression() { line("@Override"); line("public Optional loadSingleStream(Transaction t, final ", StreamId, " id) {"); { line("Optional inputStream = super.loadSingleStream(t, id);"); - line("return inputStream.map(LZ4BlockInputStream::new);"); + line("return inputStream.map(CompressorForwardingInputStream::new);"); } line("}"); } @@ -594,7 +596,7 @@ private void loadStreamsWithCompression() { line("public Map<", StreamId, ", InputStream> loadStreams(Transaction t, Set<", StreamId, "> ids) {"); { line("Map<", StreamId, ", InputStream> compressedStreams = super.loadStreams(t, ids);"); line("return Maps.transformValues(compressedStreams, stream -> {"); { - line("return new LZ4BlockInputStream(stream);"); + line("return new CompressorForwardingInputStream(stream);"); } line("});"); } line("}"); } @@ -603,7 +605,7 @@ private void tryWriteStreamToFile() { line("@Override"); line("protected void tryWriteStreamToFile(Transaction transaction, ", StreamId, " id, StreamMetadata metadata, FileOutputStream fos) throws IOException {"); { line("try (InputStream blockStream = makeStreamUsingTransaction(transaction, id, metadata);"); - line(" InputStream decompressingStream = new LZ4BlockInputStream(blockStream);"); + line(" InputStream decompressingStream = new CompressorForwardingInputStream(blockStream);"); line(" OutputStream fileStream = fos;) {"); { line("ByteStreams.copy(decompressingStream, fileStream);"); } line("}"); @@ -847,8 +849,8 @@ private void cellsCleanedUp() { List.class, CheckForNull.class, Generated.class, - LZ4CompressingInputStream.class, - LZ4BlockInputStream.class, + ClientCompressor.class, + CompressorForwardingInputStream.class, Pair.class, Functions.class, ByteStreams.class, diff --git a/atlasdb-commons/src/main/java/com/palantir/common/compression/ClientCompressor.java b/atlasdb-commons/src/main/java/com/palantir/common/compression/ClientCompressor.java new file mode 100644 index 00000000000..2d00915a209 --- /dev/null +++ b/atlasdb-commons/src/main/java/com/palantir/common/compression/ClientCompressor.java @@ -0,0 +1,95 @@ +/* + * (c) Copyright 2019 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.common.compression; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; + +import com.google.common.collect.MoreCollectors; +import com.google.common.io.ByteStreams; +import com.palantir.common.base.Throwables; + +import net.jpountz.lz4.LZ4BlockInputStream; + +public enum ClientCompressor { + GZIP(in -> { + try { + return new GzipCompressingInputStream(in); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + }, in -> { + try { + return new GZIPInputStream(in); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + }, GzipCompressingInputStream.GZIP_HEADER), + LZ4(LZ4CompressingInputStream::new, LZ4BlockInputStream::new, + new byte[] {'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k'}), + NONE(null, UnaryOperator.identity(), new byte[] {}); + + private final UnaryOperator compressorCreator; + private UnaryOperator decompressorCreator; + public final byte[] magic; + + ClientCompressor(UnaryOperator compressorCreator, UnaryOperator decompressorCreator, + byte[] magic) { + this.compressorCreator = compressorCreator; + this.decompressorCreator = decompressorCreator; + this.magic = magic; + } + + public InputStream getCompressor(InputStream stream) { + return compressorCreator.apply(stream); + } + + private boolean matchMagic(byte[] buffer, int bufferLen) { + int i = 0; + while (i < magic.length && i < bufferLen && magic[i] == buffer[i++]); + return i >= magic.length && magic.length > 0; + } + + /** + * Method that takes a compressed stream and returns a decompressor stream. It will throw {@code + * IllegalArgumentException} if more than one decompressor is detected, a {@code NoSuchElementException} if no + * compressor detected. + */ + static InputStream getDecompressorStream(InputStream stream) throws IOException { + BufferedInputStream buff = new BufferedInputStream(stream); + List compressors = Arrays.stream(ClientCompressor.values()).sorted( + Comparator.comparingInt((ClientCompressor t) -> t.magic.length).reversed() + ).collect( + Collectors.toList()); + int maxLen = compressors.get(0).magic.length; + buff.mark(maxLen); + byte[] headerBuffer = new byte[maxLen]; + int len = ByteStreams.read(buff, headerBuffer, 0, maxLen); + buff.reset(); + ClientCompressor compressor = compressors.stream().filter( + t -> t.magic.length <= len && t.matchMagic(headerBuffer, len)).collect(MoreCollectors.onlyElement()); + return compressor.decompressorCreator.apply(buff); + } +} diff --git a/atlasdb-commons/src/main/java/com/palantir/common/compression/CompressorForwardingInputStream.java b/atlasdb-commons/src/main/java/com/palantir/common/compression/CompressorForwardingInputStream.java new file mode 100644 index 00000000000..3f2bbf82bcc --- /dev/null +++ b/atlasdb-commons/src/main/java/com/palantir/common/compression/CompressorForwardingInputStream.java @@ -0,0 +1,54 @@ +/* + * (c) Copyright 2019 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.common.compression; + +import java.io.IOException; +import java.io.InputStream; + +public class CompressorForwardingInputStream extends InputStream { + private InputStream compressedStream; + private InputStream delegate; + + public CompressorForwardingInputStream(InputStream stream) { + compressedStream = stream; + } + + @Override + public int read() throws IOException { + initializeDelegate(); + return delegate.read(); + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + initializeDelegate(); + return delegate.read(b, off, len); + } + + @Override + public void close() throws IOException { + if (delegate != null) { + delegate.close(); + } + } + + private void initializeDelegate() throws IOException { + if (delegate == null) { + delegate = ClientCompressor.getDecompressorStream(compressedStream); + } + } +} diff --git a/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java b/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java new file mode 100644 index 00000000000..d431db13bb1 --- /dev/null +++ b/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java @@ -0,0 +1,120 @@ +/* + * (c) Copyright 2018 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.palantir.common.compression; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.function.Supplier; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import java.util.zip.Deflater; +import java.util.zip.DeflaterInputStream; + +import com.google.common.io.CountingInputStream; + + +public class GzipCompressingInputStream extends SequenceInputStream { + public static final int GZIP_MAGIC = 0x8b1f; + public static final byte[] GZIP_HEADER = new byte[] { + (byte) GZIP_MAGIC, // Magic number (short) + (byte) (GZIP_MAGIC >> 8), // Magic number (short) + Deflater.DEFLATED, // Compression method (CM) + 0, // Flags (FLG) + 0, // Modification time MTIME (int) + 0, // Modification time MTIME (int) + 0, // Modification time MTIME (int) + 0, // Modification time MTIME (int) + 0, // Extra flags (XFLG) + 0 // Operating system (OS) + }; + + public GzipCompressingInputStream(InputStream in) throws IOException { + this(in, 512); + } + + public GzipCompressingInputStream(InputStream in, int bufferSize) throws IOException { + super(new GzipStreamEnumeration(in, bufferSize)); + } + + protected static class GzipStreamEnumeration implements Enumeration { + + private static final int DEFAULT_DEFLATER_BUFFER_SIZE = 512; + + private final InputStream in; + private final int bufferSize; + private final Iterator> streamSupplierIt; + private CheckedInputStream checkedInputStream; + private DeflaterInputStream contentStream; + private CountingInputStream counting; + + public GzipStreamEnumeration(InputStream in) { + this(in, DEFAULT_DEFLATER_BUFFER_SIZE); + } + + public GzipStreamEnumeration(InputStream in, int bufferSize) { + this.in = in; + this.bufferSize = bufferSize; + streamSupplierIt = Arrays.>asList(() -> createHeaderStream(), + () -> createContentStream(), + () -> createTrailerStream() + ).iterator(); + } + + public boolean hasMoreElements() { + return streamSupplierIt.hasNext(); + } + + public InputStream nextElement() { + if (hasMoreElements()) { + return streamSupplierIt.next().get(); + } else { + return null; + } + } + + private InputStream createHeaderStream() { + return new ByteArrayInputStream(GZIP_HEADER); + } + + private InputStream createContentStream() { + counting = new CountingInputStream(in); + CRC32 crc = new CRC32(); + checkedInputStream = new CheckedInputStream(counting, crc); + contentStream = new DeflaterInputStream(checkedInputStream, + new Deflater(Deflater.DEFAULT_COMPRESSION, true), bufferSize); + return contentStream; + } + + private InputStream createTrailerStream() { + long checksum = checkedInputStream.getChecksum().getValue(); + long count = counting.getCount(); + byte[] trailer = new byte[Integer.BYTES * 2]; + ByteBuffer buffer = ByteBuffer.wrap(trailer).order(ByteOrder.LITTLE_ENDIAN); + buffer.putInt((int)(checksum & 0xffffffffL)); + buffer.putInt((int) count); + return new ByteArrayInputStream(trailer); + + } + } + +} \ No newline at end of file diff --git a/atlasdb-commons/src/main/java/com/palantir/common/compression/LZ4CompressingInputStream.java b/atlasdb-commons/src/main/java/com/palantir/common/compression/LZ4CompressingInputStream.java index 2a525d438bb..3c2bd0a88ff 100644 --- a/atlasdb-commons/src/main/java/com/palantir/common/compression/LZ4CompressingInputStream.java +++ b/atlasdb-commons/src/main/java/com/palantir/common/compression/LZ4CompressingInputStream.java @@ -51,11 +51,11 @@ public final class LZ4CompressingInputStream extends BufferedDelegateInputStream // Flag to indicate whether this stream has been exhausted. private boolean finished; - public LZ4CompressingInputStream(InputStream delegate) throws IOException { + public LZ4CompressingInputStream(InputStream delegate) { this(delegate, DEFAULT_BLOCK_SIZE); } - public LZ4CompressingInputStream(InputStream delegate, int blockSize) throws IOException { + public LZ4CompressingInputStream(InputStream delegate, int blockSize) { super(delegate, LZ4_HEADER_SIZE + COMPRESSOR.maxCompressedLength(blockSize)); this.blockSize = blockSize; this.uncompressedBuffer = new byte[blockSize]; diff --git a/atlasdb-commons/src/test/java/com/palantir/common/compression/AbstractCompressionTests.java b/atlasdb-commons/src/test/java/com/palantir/common/compression/AbstractCompressionTests.java new file mode 100644 index 00000000000..34639ea5e95 --- /dev/null +++ b/atlasdb-commons/src/test/java/com/palantir/common/compression/AbstractCompressionTests.java @@ -0,0 +1,156 @@ +/* + * (c) Copyright 2019 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.palantir.common.compression; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Random; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Test; + +import com.google.common.io.ByteStreams; + +public abstract class AbstractCompressionTests { + + private static final byte SINGLE_VALUE = 42; + private static final int BLOCK_SIZE = 1 << 16; // 64 KB + + protected ByteArrayInputStream uncompressedStream; + protected InputStream compressingStream; + protected InputStream decompressingStream; + + @After + public void close() throws IOException { + decompressingStream.close(); + } + + + @Test + public void testEmptyStream() throws Exception { + initializeStreams(new byte[0]); + assertStreamIsEmpty(decompressingStream); + } + + @Test + public void testSingleCharacterStream() throws Exception { + testStream_incompressible(1); // 1 byte input will always be incompressible + } + + @Test + public void testSingleCharacterStream_singleByteRead() throws Exception { + byte[] uncompressedData = new byte[] { SINGLE_VALUE }; + initializeStreams(uncompressedData); + int value = decompressingStream.read(); + + assertEquals(uncompressedData[0] & 0xFF, value); + assertStreamIsEmpty(decompressingStream); + } + + @Test + public void testSingleBlock_compressible() throws Exception { + testStream_compressible(BLOCK_SIZE); + } + + @Test + public void testSingleBlock_incompressible() throws Exception { + testStream_incompressible(BLOCK_SIZE); + } + + @Test + public void testMultiBlock_compressible() throws Exception { + testStream_compressible(16 * BLOCK_SIZE); + } + + @Test + public void testMultiBlock_incompressible() throws Exception { + testStream_incompressible(16 * BLOCK_SIZE); + } + + @Test + public void testMultiBlock_singleByteReads() throws Exception { + byte[] uncompressedData = new byte[16 * BLOCK_SIZE]; + fillWithIncompressibleData(uncompressedData); + initializeStreams(uncompressedData); + + for (int i = 0; i < uncompressedData.length; ++i) { + int value = decompressingStream.read(); + assertEquals(uncompressedData[i] & 0xFF, value); + } + assertStreamIsEmpty(decompressingStream); + } + + @Test + public void testMultiBlock_readPastEnd() throws Exception { + byte[] uncompressedData = new byte[16 * BLOCK_SIZE]; + fillWithCompressibleData(uncompressedData); + initializeStreams(uncompressedData); + + byte[] decompressedData = new byte[17 * BLOCK_SIZE]; + int bytesRead = ByteStreams.read(decompressingStream, decompressedData, 0, decompressedData.length); + assertEquals(uncompressedData.length, bytesRead); + assertArrayEquals(uncompressedData, Arrays.copyOf(decompressedData, bytesRead)); + } + + private void testStream_compressible(int streamSize) throws Exception { + byte[] uncompressedData = new byte[streamSize]; + fillWithCompressibleData(uncompressedData); + initializeStreams(uncompressedData); + verifyStreamContents(uncompressedData); + } + + private void testStream_incompressible(int streamSize) throws Exception { + byte[] uncompressedData = new byte[streamSize]; + fillWithIncompressibleData(uncompressedData); + initializeStreams(uncompressedData); + verifyStreamContents(uncompressedData); + } + + protected abstract void initializeCompressStreams() throws Exception; + + private void initializeStreams(byte[] uncompressedData) throws Exception { + uncompressedStream = new ByteArrayInputStream(uncompressedData); + initializeCompressStreams(); + } + + + + private void fillWithCompressibleData(byte[] data) { + Arrays.fill(data, SINGLE_VALUE); + } + + private void fillWithIncompressibleData(byte[] data) { + new Random(0).nextBytes(data); + } + + private void verifyStreamContents(byte[] uncompressedData) throws IOException { + byte[] decompressedData = new byte[uncompressedData.length]; + ByteStreams.read(decompressingStream, decompressedData, 0, decompressedData.length); + assertArrayEquals(uncompressedData, decompressedData); + assertStreamIsEmpty(decompressingStream); + } + + private void assertStreamIsEmpty(InputStream stream) throws IOException { + assertEquals(-1, stream.read()); + } +} diff --git a/atlasdb-commons/src/test/java/com/palantir/common/compression/GzipCompressionTests.java b/atlasdb-commons/src/test/java/com/palantir/common/compression/GzipCompressionTests.java new file mode 100644 index 00000000000..874c5b606af --- /dev/null +++ b/atlasdb-commons/src/test/java/com/palantir/common/compression/GzipCompressionTests.java @@ -0,0 +1,25 @@ +/* + * (c) Copyright 2019 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.palantir.common.compression; + +public class GzipCompressionTests extends AbstractCompressionTests { + + @Override + protected void initializeCompressStreams() throws Exception { + compressingStream = ClientCompressor.GZIP.getCompressor(uncompressedStream); + decompressingStream = new CompressorForwardingInputStream(compressingStream); + } +} diff --git a/atlasdb-commons/src/test/java/com/palantir/common/compression/LZ4CompressionTests.java b/atlasdb-commons/src/test/java/com/palantir/common/compression/LZ4CompressionTests.java index 76f10c7808d..69004dbe3e8 100644 --- a/atlasdb-commons/src/test/java/com/palantir/common/compression/LZ4CompressionTests.java +++ b/atlasdb-commons/src/test/java/com/palantir/common/compression/LZ4CompressionTests.java @@ -15,140 +15,22 @@ */ package com.palantir.common.compression; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.Random; import org.junit.After; -import org.junit.Test; - -import com.google.common.io.ByteStreams; - -import net.jpountz.lz4.LZ4BlockInputStream; -public class LZ4CompressionTests { - private static final byte SINGLE_VALUE = 42; - private static final int BLOCK_SIZE = 1 << 16; // 64 KB +public class LZ4CompressionTests extends AbstractCompressionTests { - private ByteArrayInputStream uncompressedStream; - private LZ4CompressingInputStream compressingStream; - private LZ4BlockInputStream decompressingStream; + @Override + protected void initializeCompressStreams() throws Exception { + compressingStream = ClientCompressor.LZ4.getCompressor(uncompressedStream); + decompressingStream = new CompressorForwardingInputStream(compressingStream); + } @After public void after() throws IOException { uncompressedStream.close(); - compressingStream.close(); decompressingStream.close(); } - - @Test - public void testEmptyStream() throws IOException { - initializeStreams(new byte[0]); - assertStreamIsEmpty(decompressingStream); - } - - @Test - public void testSingleCharacterStream() throws IOException { - testStream_incompressible(1); // 1 byte input will always be incompressible - } - - @Test - public void testSingleCharacterStream_singleByteRead() throws IOException { - byte[] uncompressedData = new byte[] { SINGLE_VALUE }; - initializeStreams(uncompressedData); - int value = decompressingStream.read(); - - assertEquals(uncompressedData[0] & 0xFF, value); - assertStreamIsEmpty(decompressingStream); - } - - @Test - public void testSingleBlock_compressible() throws IOException { - testStream_compressible(BLOCK_SIZE); - } - - @Test - public void testSingleBlock_incompressible() throws IOException { - testStream_incompressible(BLOCK_SIZE); - } - - @Test - public void testMultiBlock_compressible() throws IOException { - testStream_compressible(16 * BLOCK_SIZE); - } - - @Test - public void testMultiBlock_incompressible() throws IOException { - testStream_incompressible(16 * BLOCK_SIZE); - } - - @Test - public void testMultiBlock_singleByteReads() throws IOException { - byte[] uncompressedData = new byte[16 * BLOCK_SIZE]; - fillWithIncompressibleData(uncompressedData); - initializeStreams(uncompressedData); - - for (int i = 0; i < uncompressedData.length; ++i) { - int value = decompressingStream.read(); - assertEquals(uncompressedData[i] & 0xFF, value); - } - assertStreamIsEmpty(decompressingStream); - } - - @Test - public void testMultiBlock_readPastEnd() throws IOException { - byte[] uncompressedData = new byte[16 * BLOCK_SIZE]; - fillWithCompressibleData(uncompressedData); - initializeStreams(uncompressedData); - - byte[] decompressedData = new byte[17 * BLOCK_SIZE]; - int bytesRead = ByteStreams.read(decompressingStream, decompressedData, 0, decompressedData.length); - assertEquals(uncompressedData.length, bytesRead); - assertArrayEquals(uncompressedData, Arrays.copyOf(decompressedData, bytesRead)); - } - - private void testStream_compressible(int streamSize) throws IOException { - byte[] uncompressedData = new byte[streamSize]; - fillWithCompressibleData(uncompressedData); - initializeStreams(uncompressedData); - verifyStreamContents(uncompressedData); - } - - private void testStream_incompressible(int streamSize) throws IOException { - byte[] uncompressedData = new byte[streamSize]; - fillWithIncompressibleData(uncompressedData); - initializeStreams(uncompressedData); - verifyStreamContents(uncompressedData); - } - - private void initializeStreams(byte[] uncompressedData) throws IOException { - uncompressedStream = new ByteArrayInputStream(uncompressedData); - compressingStream = new LZ4CompressingInputStream(uncompressedStream); - decompressingStream = new LZ4BlockInputStream(compressingStream); - } - - private void fillWithCompressibleData(byte[] data) { - Arrays.fill(data, SINGLE_VALUE); - } - - private void fillWithIncompressibleData(byte[] data) { - new Random(0).nextBytes(data); - } - - private void verifyStreamContents(byte[] uncompressedData) throws IOException { - byte[] decompressedData = new byte[uncompressedData.length]; - ByteStreams.read(decompressingStream, decompressedData, 0, decompressedData.length); - assertArrayEquals(uncompressedData, decompressedData); - assertStreamIsEmpty(decompressingStream); - } - - private void assertStreamIsEmpty(InputStream stream) throws IOException { - assertEquals(-1, stream.read()); - } } diff --git a/atlasdb-commons/src/test/java/com/palantir/common/compression/NotCompressedStreamTests.java b/atlasdb-commons/src/test/java/com/palantir/common/compression/NotCompressedStreamTests.java new file mode 100644 index 00000000000..a5dcbc5d17a --- /dev/null +++ b/atlasdb-commons/src/test/java/com/palantir/common/compression/NotCompressedStreamTests.java @@ -0,0 +1,34 @@ +/* + * (c) Copyright 2019 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.palantir.common.compression; + +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.NoSuchElementException; + +import org.junit.Test; + +public class NotCompressedStreamTests { + + @Test + public void testNonCompressedStreamRead() { + ByteArrayInputStream compressingStream = new ByteArrayInputStream(new byte[10]); + InputStream decompressingStream = new CompressorForwardingInputStream(compressingStream); + assertThatExceptionOfType(NoSuchElementException.class).isThrownBy(() -> decompressingStream.read()); + } +} diff --git a/changelog/@unreleased/pr-4220.v2.yml b/changelog/@unreleased/pr-4220.v2.yml new file mode 100644 index 00000000000..1beb9595357 --- /dev/null +++ b/changelog/@unreleased/pr-4220.v2.yml @@ -0,0 +1,6 @@ +type: improvement +improvement: + description: | + Adding gzip compression support to the store streams. + links: + - https://github.com/palantir/atlasdb/pull/4220