diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinition.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinition.java index 37d0957af5a..00910ba3ed4 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinition.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinition.java @@ -27,6 +27,7 @@ import com.palantir.atlasdb.table.description.render.Renderers; import com.palantir.atlasdb.table.description.render.StreamStoreRenderer; import com.palantir.common.base.Throwables; +import com.palantir.common.compression.StreamCompression; public class StreamStoreDefinition { // from ArrayList.MAX_ARRAY_SIZE on 64-bit systems @@ -36,7 +37,7 @@ public class StreamStoreDefinition { private final String shortName; private final String longName; private final ValueType idType; - private final boolean compressStream; + private final StreamCompression streamCompression; private final int numberOfRowComponentsHashed; private int inMemoryThreshold; @@ -47,14 +48,14 @@ public class StreamStoreDefinition { String longName, ValueType idType, int inMemoryThreshold, - boolean compressStream, + StreamCompression streamCompression, int numberOfRowComponentsHashed) { this.streamStoreTables = streamStoreTables; this.shortName = shortName; this.longName = longName; this.idType = idType; this.inMemoryThreshold = inMemoryThreshold; - this.compressStream = compressStream; + this.streamCompression = streamCompression; this.numberOfRowComponentsHashed = numberOfRowComponentsHashed; } @@ -72,7 +73,8 @@ public int getNumberOfRowComponentsHashed() { public StreamStoreRenderer getRenderer(String packageName, String name) { String renderedLongName = Renderers.CamelCase(longName); - return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold, compressStream); + return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold, + streamCompression); } public Multimap> getCleanupTasks( diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinitionBuilder.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinitionBuilder.java index 636eeec1374..1fdd2cc2cd3 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinitionBuilder.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/schema/stream/StreamStoreDefinitionBuilder.java @@ -24,6 +24,7 @@ import com.palantir.atlasdb.protos.generated.TableMetadataPersistence; import com.palantir.atlasdb.table.description.TableDefinition; import com.palantir.atlasdb.table.description.ValueType; +import com.palantir.common.compression.StreamCompression; public class StreamStoreDefinitionBuilder { private final ValueType valueType; @@ -32,7 +33,7 @@ public class StreamStoreDefinitionBuilder { private Map streamTables = Maps.newHashMapWithExpectedSize(StreamTableType.values().length); private int inMemoryThreshold = AtlasDbConstants.DEFAULT_STREAM_IN_MEMORY_THRESHOLD; - private boolean compressStream; + private StreamCompression compressStreamType; private int numberOfRowComponentsHashed = 0; /** @@ -48,7 +49,7 @@ public StreamStoreDefinitionBuilder(String shortName, String longName, ValueType this.valueType = valueType; this.shortName = shortName; this.longName = longName; - this.compressStream = false; + this.compressStreamType = StreamCompression.NONE; } /** @@ -102,7 +103,11 @@ public StreamStoreDefinitionBuilder compressBlocksInDb() { } public StreamStoreDefinitionBuilder compressStreamInClient() { - compressStream = true; + return compressStreamInClient(StreamCompression.LZ4); + } + + public StreamStoreDefinitionBuilder compressStreamInClient(StreamCompression compressionType) { + compressStreamType = compressionType; return this; } @@ -125,7 +130,7 @@ public StreamStoreDefinition build() { longName, valueType, inMemoryThreshold, - compressStream, + compressStreamType, numberOfRowComponentsHashed); } diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractGenericStreamStore.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractGenericStreamStore.java index 29e5e5eb53b..a56737739fe 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractGenericStreamStore.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractGenericStreamStore.java @@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import com.google.common.io.ByteStreams; import com.google.common.primitives.Ints; import com.google.protobuf.ByteString; import com.palantir.atlasdb.protos.generated.StreamPersistence.Status; @@ -42,6 +43,7 @@ import com.palantir.atlasdb.transaction.api.Transaction; import com.palantir.atlasdb.transaction.api.TransactionManager; import com.palantir.common.base.Throwables; +import com.palantir.common.compression.StreamCompression; import com.palantir.logsafe.exceptions.SafeIllegalArgumentException; import com.palantir.util.ByteArrayIOStream; @@ -49,9 +51,13 @@ public abstract class AbstractGenericStreamStore implements GenericStreamStor protected static final Logger log = LoggerFactory.getLogger(AbstractGenericStreamStore.class); @CheckForNull protected final TransactionManager txnMgr; + private final StreamCompression compression; - protected AbstractGenericStreamStore(TransactionManager txManager) { + protected AbstractGenericStreamStore( + TransactionManager txManager, + StreamCompression compression) { this.txnMgr = txManager; + this.compression = compression; } private long getNumberOfBlocksFromMetadata(StreamMetadata metadata) { @@ -96,7 +102,7 @@ public Map loadStreams(Transaction transaction, Set ids) { private InputStream getStream(Transaction transaction, T id, StreamMetadata metadata) { try { - return tryGetStream(transaction, id, metadata); + return compression.decompress(tryGetStream(transaction, id, metadata)); } catch (FileNotFoundException e) { log.error("Error opening temp file for stream {}", id, e); throw Throwables.rewrapAndThrowUncheckedException("Could not open temp file to create stream.", e); @@ -209,12 +215,10 @@ private void loadNBlocksToOutputStream( } } - // This method is overridden in generated code. Changes to this method may have unintended consequences. - protected void tryWriteStreamToFile(Transaction transaction, T id, StreamMetadata metadata, FileOutputStream fos) + private void tryWriteStreamToFile(Transaction transaction, T id, StreamMetadata metadata, FileOutputStream fos) throws IOException { - long numBlocks = getNumberOfBlocksFromMetadata(metadata); - for (long i = 0; i < numBlocks; i++) { - loadSingleBlockToOutputStream(transaction, id, i, fos); + try (InputStream in = loadStream(transaction, id)) { + ByteStreams.copy(in, fos); } fos.close(); } diff --git a/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractPersistentStreamStore.java b/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractPersistentStreamStore.java index 233434a8fe7..309da695b38 100644 --- a/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractPersistentStreamStore.java +++ b/atlasdb-client/src/main/java/com/palantir/atlasdb/stream/AbstractPersistentStreamStore.java @@ -41,20 +41,25 @@ import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; +import com.palantir.common.compression.StreamCompression; import com.palantir.util.Pair; import com.palantir.util.crypto.Sha256Hash; public abstract class AbstractPersistentStreamStore extends AbstractGenericStreamStore implements PersistentStreamStore { private final StreamStoreBackoffStrategy backoffStrategy; + private final StreamCompression compression; - protected AbstractPersistentStreamStore(TransactionManager txManager) { - this(txManager, () -> StreamStorePersistenceConfiguration.DEFAULT_CONFIG); + protected AbstractPersistentStreamStore(TransactionManager txManager, + StreamCompression compression) { + this(txManager, compression, () -> StreamStorePersistenceConfiguration.DEFAULT_CONFIG); } protected AbstractPersistentStreamStore(TransactionManager txManager, + StreamCompression compression, Supplier persistenceConfiguration) { - super(txManager); + super(txManager, compression); + this.compression = compression; this.backoffStrategy = StandardPeriodicBackoffStrategy.create(persistenceConfiguration); } @@ -145,8 +150,9 @@ public Map storeStreams(final Transaction tx, final Map persistenceConfiguration) {"); { - line("super(txManager, persistenceConfiguration);"); + line("super(txManager, ", + streamCompression.getClass().getSimpleName() + "." + streamCompression, + ", persistenceConfiguration);"); line("this.tables = tables;"); } line("}"); line(); @@ -556,77 +542,6 @@ private void unmarkStreamsAsUsed() { line("index.delete(toDelete);"); } line("}"); } - - private void storeBlocksAndGetFinalMetadata() { - line("@Override"); - line("protected StreamMetadata storeBlocksAndGetFinalMetadata(Transaction t, long id, InputStream stream) {"); { - line("//Hash the data before compressing it"); - line("MessageDigest digest = Sha256Hash.getMessageDigest();"); - line("try (InputStream hashingStream = new DigestInputStream(stream, digest);"); - line(" InputStream compressingStream = new LZ4CompressingInputStream(hashingStream)) {"); { - line("StreamMetadata metadata = storeBlocksAndGetHashlessMetadata(t, id, compressingStream);"); - line("return StreamMetadata.newBuilder(metadata)"); - line(" .setHash(ByteString.copyFrom(digest.digest()))"); - line(" .build();"); - } line("} catch (IOException e) {"); { - line("throw new RuntimeException(e);"); - } line("}"); - } line("}"); - } - - private void loadStreamWithCompression() { - line("@Override"); - line("public InputStream loadStream(Transaction t, final ", StreamId, " id) {"); { - line("return new LZ4BlockInputStream(super.loadStream(t, id));"); - } line("}"); - } - - private void loadSingleStreamWithCompression() { - line("@Override"); - line("public Optional loadSingleStream(Transaction t, final ", StreamId, " id) {"); { - line("Optional inputStream = super.loadSingleStream(t, id);"); - line("return inputStream.map(LZ4BlockInputStream::new);"); - } line("}"); - } - - private void loadStreamsWithCompression() { - line("@Override"); - line("public Map<", StreamId, ", InputStream> loadStreams(Transaction t, Set<", StreamId, "> ids) {"); { - line("Map<", StreamId, ", InputStream> compressedStreams = super.loadStreams(t, ids);"); - line("return Maps.transformValues(compressedStreams, stream -> {"); { - line("return new LZ4BlockInputStream(stream);"); - } line("});"); - } line("}"); - } - - private void tryWriteStreamToFile() { - line("@Override"); - line("protected void tryWriteStreamToFile(Transaction transaction, ", StreamId, " id, StreamMetadata metadata, FileOutputStream fos) throws IOException {"); { - line("try (InputStream blockStream = makeStreamUsingTransaction(transaction, id, metadata);"); - line(" InputStream decompressingStream = new LZ4BlockInputStream(blockStream);"); - line(" OutputStream fileStream = fos;) {"); { - line("ByteStreams.copy(decompressingStream, fileStream);"); - } line("}"); - } line("}"); - } - - private void makeStreamUsingTransaction() { - line("private InputStream makeStreamUsingTransaction(Transaction parent, ", StreamId, " id, StreamMetadata metadata) {"); { - line("BiConsumer singleBlockLoader = (index, destination) ->"); - line(" loadSingleBlockToOutputStream(parent, id, index, destination);"); - line(); - line("BlockGetter pageRefresher = new BlockLoader(singleBlockLoader, BLOCK_SIZE_IN_BYTES);"); - line("long totalBlocks = getNumberOfBlocksFromMetadata(metadata);"); - line("int blocksInMemory = getNumberOfBlocksThatFitInMemory();"); - line(); - line("try {"); { - line("return BlockConsumingInputStream.create(pageRefresher, totalBlocks, blocksInMemory);"); - } line("} catch(IOException e) {"); { - line("throw Throwables.throwUncheckedException(e);"); - } line("}"); - } line("}"); - } - }.render(); } @@ -869,8 +784,7 @@ private void cellsCleanedUp() { List.class, CheckForNull.class, Generated.class, - LZ4CompressingInputStream.class, - LZ4BlockInputStream.class, + StreamCompression.class, Pair.class, Functions.class, ByteStreams.class, diff --git a/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java b/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java new file mode 100644 index 00000000000..5376a67df77 --- /dev/null +++ b/atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java @@ -0,0 +1,79 @@ +/* + * (c) Copyright 2018 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.palantir.common.compression; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import java.util.zip.Deflater; +import java.util.zip.DeflaterInputStream; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.io.CountingInputStream; + + +public final class GzipCompressingInputStream { + private static final int GZIP_MAGIC = 0x8b1f; + private static final byte[] GZIP_HEADER = new byte[] { + (byte) GZIP_MAGIC, // Magic number (short) + (byte) (GZIP_MAGIC >> 8), // Magic number (short) + Deflater.DEFLATED, // Compression method (CM) + 0, // Flags (FLG) + 0, // Modification time MTIME (int) + 0, // Modification time MTIME (int) + 0, // Modification time MTIME (int) + 0, // Modification time MTIME (int) + 0, // Extra flags (XFLG) + 0 // Operating system (OS) + }; + + public static byte[] getMagicPrefix() { + return Arrays.copyOf(GZIP_HEADER, 2); + } + + public static InputStream compress(InputStream uncompressed) { + InputStream header = createHeaderStream(); + CountingInputStream counting = new CountingInputStream(uncompressed); + CRC32 crc = new CRC32(); + CheckedInputStream checked = new CheckedInputStream(counting, crc); + InputStream content = new DeflaterInputStream(checked, new Deflater(Deflater.DEFAULT_COMPRESSION, true)); + List> allStreams = ImmutableList.of( + () -> header, () -> content, () -> trailerStream(counting.getCount(), crc)); + return new SequenceInputStream(Collections.enumeration(Lists.transform(allStreams, Supplier::get))); + } + + private static InputStream trailerStream(long count, CRC32 crc) { + long checksum = crc.getValue(); + byte[] trailer = new byte[Integer.BYTES * 2]; + ByteBuffer buffer = ByteBuffer.wrap(trailer).order(ByteOrder.LITTLE_ENDIAN); + buffer.putInt((int)(checksum & 0xffffffffL)); + buffer.putInt((int) count); + return new ByteArrayInputStream(trailer); + } + + private static InputStream createHeaderStream() { + return new ByteArrayInputStream(GZIP_HEADER); + } +} diff --git a/atlasdb-commons/src/main/java/com/palantir/common/compression/LZ4CompressingInputStream.java b/atlasdb-commons/src/main/java/com/palantir/common/compression/LZ4CompressingInputStream.java index 2a525d438bb..3c2bd0a88ff 100644 --- a/atlasdb-commons/src/main/java/com/palantir/common/compression/LZ4CompressingInputStream.java +++ b/atlasdb-commons/src/main/java/com/palantir/common/compression/LZ4CompressingInputStream.java @@ -51,11 +51,11 @@ public final class LZ4CompressingInputStream extends BufferedDelegateInputStream // Flag to indicate whether this stream has been exhausted. private boolean finished; - public LZ4CompressingInputStream(InputStream delegate) throws IOException { + public LZ4CompressingInputStream(InputStream delegate) { this(delegate, DEFAULT_BLOCK_SIZE); } - public LZ4CompressingInputStream(InputStream delegate, int blockSize) throws IOException { + public LZ4CompressingInputStream(InputStream delegate, int blockSize) { super(delegate, LZ4_HEADER_SIZE + COMPRESSOR.maxCompressedLength(blockSize)); this.blockSize = blockSize; this.uncompressedBuffer = new byte[blockSize]; diff --git a/atlasdb-commons/src/main/java/com/palantir/common/compression/StreamCompression.java b/atlasdb-commons/src/main/java/com/palantir/common/compression/StreamCompression.java new file mode 100644 index 00000000000..feb113cecf7 --- /dev/null +++ b/atlasdb-commons/src/main/java/com/palantir/common/compression/StreamCompression.java @@ -0,0 +1,97 @@ +/* + * (c) Copyright 2019 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.common.compression; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.zip.GZIPInputStream; + +import com.google.common.io.Closeables; + +import net.jpountz.lz4.LZ4BlockInputStream; + +public enum StreamCompression { + GZIP, LZ4, NONE; + + private static final byte[] gzipMagic = GzipCompressingInputStream.getMagicPrefix(); + private static final byte[] lz4Magic = "LZ4Block".getBytes(StandardCharsets.UTF_8); + + public InputStream compress(InputStream stream) { + switch (this) { + case GZIP: return GzipCompressingInputStream.compress(stream); + case LZ4: return new LZ4CompressingInputStream(stream); + case NONE: return stream; + } + throw new AssertionError("Unreachable code"); + } + + public InputStream decompress(InputStream stream) { + switch (this) { + case NONE: return stream; + case GZIP: + case LZ4: + return decompressWithHeader(stream); + } + throw new AssertionError("Unreachable code"); + } + + private static boolean startsWith(InputStream stream, byte[] data) throws IOException { + stream.mark(data.length); + try { + for (int i = 0; i < data.length; i++) { + if (stream.read() != Byte.toUnsignedInt(data[i])) { + return false; + } + } + return true; + } finally { + stream.reset(); + } + } + + private static InputStream decompressWithHeader(InputStream unbuffered) { + try { + BufferedInputStream stream = new BufferedInputStream(unbuffered); + if (startsWith(stream, gzipMagic)) { + return new GZIPInputStream(stream); + } else if (startsWith(stream, lz4Magic)) { + return new LZ4BlockInputStream(stream); + } else { + return new ThrowingInputStream(new UnsupportedOperationException("Unknown compression scheme")); + } + } catch (IOException e) { + Closeables.closeQuietly(unbuffered); + // This avoids awkward cases of us having to close many returned InputStreams in wrapping code. + return new ThrowingInputStream(e); + } + } + + private static final class ThrowingInputStream extends InputStream { + private final Throwable thrown; + + private ThrowingInputStream(Throwable thrown) { + this.thrown = thrown; + } + + @Override + public int read() throws IOException { + throw new IOException("Could not construct decompressed stream", thrown); + } + } +} diff --git a/atlasdb-commons/src/test/java/com/palantir/common/compression/LZ4CompressionTests.java b/atlasdb-commons/src/test/java/com/palantir/common/compression/StreamCompressionTests.java similarity index 70% rename from atlasdb-commons/src/test/java/com/palantir/common/compression/LZ4CompressionTests.java rename to atlasdb-commons/src/test/java/com/palantir/common/compression/StreamCompressionTests.java index 76f10c7808d..09e79977074 100644 --- a/atlasdb-commons/src/test/java/com/palantir/common/compression/LZ4CompressionTests.java +++ b/atlasdb-commons/src/test/java/com/palantir/common/compression/StreamCompressionTests.java @@ -1,5 +1,5 @@ /* - * (c) Copyright 2018 Palantir Technologies Inc. All rights reserved. + * (c) Copyright 2019 Palantir Technologies Inc. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ */ package com.palantir.common.compression; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -26,25 +27,58 @@ import org.junit.After; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import com.google.common.io.ByteStreams; -import net.jpountz.lz4.LZ4BlockInputStream; - -public class LZ4CompressionTests { +@RunWith(Parameterized.class) +public class StreamCompressionTests { + private static final StreamCompression GZIP = StreamCompression.GZIP; + private static final StreamCompression LZ4 = StreamCompression.LZ4; + private static final StreamCompression NONE = StreamCompression.NONE; private static final byte SINGLE_VALUE = 42; private static final int BLOCK_SIZE = 1 << 16; // 64 KB private ByteArrayInputStream uncompressedStream; - private LZ4CompressingInputStream compressingStream; - private LZ4BlockInputStream decompressingStream; + private InputStream compressingStream; + private InputStream decompressingStream; + + private final StreamCompression compression; + + public StreamCompressionTests(StreamCompression compression) { + this.compression = compression; + } + + @Parameterized.Parameters + public static Object[] parameters() { + return StreamCompression.values(); + } @After - public void after() throws IOException { - uncompressedStream.close(); - compressingStream.close(); - decompressingStream.close(); + public void close() throws IOException { + if (decompressingStream != null) { + decompressingStream.close(); + } else if (compressingStream != null) { + compressingStream.close(); + } + } + + @Test + public void testUncompressed_doesNotDecompressEvenIfDataCompressed() throws IOException { + byte[] data = new byte[1_000_000]; + fillWithIncompressibleData(data); + assertThat(ByteStreams.toByteArray(GZIP.decompress(NONE.decompress(GZIP.compress( + new ByteArrayInputStream(data)))))).isEqualTo(data); + } + + @Test + public void testCanDecompressGzipAsLz4() throws IOException { + byte[] data = new byte[1_000_000]; + fillWithIncompressibleData(data); + assertThat(ByteStreams.toByteArray(LZ4.decompress((GZIP.compress( + new ByteArrayInputStream(data)))))).isEqualTo(data); } @Test @@ -127,10 +161,10 @@ private void testStream_incompressible(int streamSize) throws IOException { verifyStreamContents(uncompressedData); } - private void initializeStreams(byte[] uncompressedData) throws IOException { + private void initializeStreams(byte[] uncompressedData) { uncompressedStream = new ByteArrayInputStream(uncompressedData); - compressingStream = new LZ4CompressingInputStream(uncompressedStream); - decompressingStream = new LZ4BlockInputStream(compressingStream); + compressingStream = compression.compress(uncompressedStream); + decompressingStream = compression.decompress(compressingStream); } private void fillWithCompressibleData(byte[] data) { diff --git a/atlasdb-ete-test-utils/src/main/java/com/palantir/atlasdb/todo/generated/SnapshotsStreamStore.java b/atlasdb-ete-test-utils/src/main/java/com/palantir/atlasdb/todo/generated/SnapshotsStreamStore.java index 8dffea7a7b7..9e9d05f21c1 100644 --- a/atlasdb-ete-test-utils/src/main/java/com/palantir/atlasdb/todo/generated/SnapshotsStreamStore.java +++ b/atlasdb-ete-test-utils/src/main/java/com/palantir/atlasdb/todo/generated/SnapshotsStreamStore.java @@ -60,7 +60,7 @@ import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; -import com.palantir.common.compression.LZ4CompressingInputStream; +import com.palantir.common.compression.StreamCompression; import com.palantir.common.io.ConcatenatedInputStream; import com.palantir.util.AssertUtils; import com.palantir.util.ByteArrayIOStream; @@ -69,8 +69,6 @@ import com.palantir.util.file.DeleteOnCloseFileInputStream; import com.palantir.util.file.TempFileUtils; -import net.jpountz.lz4.LZ4BlockInputStream; - @Generated("com.palantir.atlasdb.table.description.render.StreamStoreRenderer") @SuppressWarnings("all") public final class SnapshotsStreamStore extends AbstractPersistentStreamStore { @@ -88,7 +86,7 @@ private SnapshotsStreamStore(TransactionManager txManager, TodoSchemaTableFactor } private SnapshotsStreamStore(TransactionManager txManager, TodoSchemaTableFactory tables, Supplier persistenceConfiguration) { - super(txManager, persistenceConfiguration); + super(txManager, StreamCompression.NONE, persistenceConfiguration); this.tables = tables; } @@ -411,8 +409,6 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link ImmutableSet} * {@link InputStream} * {@link Ints} - * {@link LZ4BlockInputStream} - * {@link LZ4CompressingInputStream} * {@link List} * {@link Lists} * {@link Logger} @@ -433,6 +429,7 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link Sha256Hash} * {@link Status} * {@link StreamCleanedException} + * {@link StreamCompression} * {@link StreamMetadata} * {@link StreamStorePersistenceConfiguration} * {@link Supplier} diff --git a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/DataStreamStore.java b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/DataStreamStore.java index 2d029621387..ab04995c3c7 100644 --- a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/DataStreamStore.java +++ b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/DataStreamStore.java @@ -60,7 +60,7 @@ import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; -import com.palantir.common.compression.LZ4CompressingInputStream; +import com.palantir.common.compression.StreamCompression; import com.palantir.common.io.ConcatenatedInputStream; import com.palantir.util.AssertUtils; import com.palantir.util.ByteArrayIOStream; @@ -69,8 +69,6 @@ import com.palantir.util.file.DeleteOnCloseFileInputStream; import com.palantir.util.file.TempFileUtils; -import net.jpountz.lz4.LZ4BlockInputStream; - @Generated("com.palantir.atlasdb.table.description.render.StreamStoreRenderer") @SuppressWarnings("all") public final class DataStreamStore extends AbstractPersistentStreamStore { @@ -88,7 +86,7 @@ private DataStreamStore(TransactionManager txManager, BlobSchemaTableFactory tab } private DataStreamStore(TransactionManager txManager, BlobSchemaTableFactory tables, Supplier persistenceConfiguration) { - super(txManager, persistenceConfiguration); + super(txManager, StreamCompression.NONE, persistenceConfiguration); this.tables = tables; } @@ -411,8 +409,6 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link ImmutableSet} * {@link InputStream} * {@link Ints} - * {@link LZ4BlockInputStream} - * {@link LZ4CompressingInputStream} * {@link List} * {@link Lists} * {@link Logger} @@ -433,6 +429,7 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link Sha256Hash} * {@link Status} * {@link StreamCleanedException} + * {@link StreamCompression} * {@link StreamMetadata} * {@link StreamStorePersistenceConfiguration} * {@link Supplier} diff --git a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/HotspottyDataStreamStore.java b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/HotspottyDataStreamStore.java index e758956c158..b0b521afc18 100644 --- a/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/HotspottyDataStreamStore.java +++ b/atlasdb-ete-tests/src/main/java/com/palantir/atlasdb/blob/generated/HotspottyDataStreamStore.java @@ -60,7 +60,7 @@ import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; -import com.palantir.common.compression.LZ4CompressingInputStream; +import com.palantir.common.compression.StreamCompression; import com.palantir.common.io.ConcatenatedInputStream; import com.palantir.util.AssertUtils; import com.palantir.util.ByteArrayIOStream; @@ -69,8 +69,6 @@ import com.palantir.util.file.DeleteOnCloseFileInputStream; import com.palantir.util.file.TempFileUtils; -import net.jpountz.lz4.LZ4BlockInputStream; - @Generated("com.palantir.atlasdb.table.description.render.StreamStoreRenderer") @SuppressWarnings("all") public final class HotspottyDataStreamStore extends AbstractPersistentStreamStore { @@ -88,7 +86,7 @@ private HotspottyDataStreamStore(TransactionManager txManager, BlobSchemaTableFa } private HotspottyDataStreamStore(TransactionManager txManager, BlobSchemaTableFactory tables, Supplier persistenceConfiguration) { - super(txManager, persistenceConfiguration); + super(txManager, StreamCompression.NONE, persistenceConfiguration); this.tables = tables; } @@ -411,8 +409,6 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link ImmutableSet} * {@link InputStream} * {@link Ints} - * {@link LZ4BlockInputStream} - * {@link LZ4CompressingInputStream} * {@link List} * {@link Lists} * {@link Logger} @@ -433,6 +429,7 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link Sha256Hash} * {@link Status} * {@link StreamCleanedException} + * {@link StreamCompression} * {@link StreamMetadata} * {@link StreamStorePersistenceConfiguration} * {@link Supplier} diff --git a/atlasdb-perf/src/main/java/com/palantir/atlasdb/performance/schema/generated/ValueStreamStore.java b/atlasdb-perf/src/main/java/com/palantir/atlasdb/performance/schema/generated/ValueStreamStore.java index 615b4c143fa..a8b3786234e 100644 --- a/atlasdb-perf/src/main/java/com/palantir/atlasdb/performance/schema/generated/ValueStreamStore.java +++ b/atlasdb-perf/src/main/java/com/palantir/atlasdb/performance/schema/generated/ValueStreamStore.java @@ -60,7 +60,7 @@ import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; -import com.palantir.common.compression.LZ4CompressingInputStream; +import com.palantir.common.compression.StreamCompression; import com.palantir.common.io.ConcatenatedInputStream; import com.palantir.util.AssertUtils; import com.palantir.util.ByteArrayIOStream; @@ -69,8 +69,6 @@ import com.palantir.util.file.DeleteOnCloseFileInputStream; import com.palantir.util.file.TempFileUtils; -import net.jpountz.lz4.LZ4BlockInputStream; - @Generated("com.palantir.atlasdb.table.description.render.StreamStoreRenderer") @SuppressWarnings("all") public final class ValueStreamStore extends AbstractPersistentStreamStore { @@ -88,7 +86,7 @@ private ValueStreamStore(TransactionManager txManager, StreamTestTableFactory ta } private ValueStreamStore(TransactionManager txManager, StreamTestTableFactory tables, Supplier persistenceConfiguration) { - super(txManager, persistenceConfiguration); + super(txManager, StreamCompression.NONE, persistenceConfiguration); this.tables = tables; } @@ -411,8 +409,6 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link ImmutableSet} * {@link InputStream} * {@link Ints} - * {@link LZ4BlockInputStream} - * {@link LZ4CompressingInputStream} * {@link List} * {@link Lists} * {@link Logger} @@ -433,6 +429,7 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link Sha256Hash} * {@link Status} * {@link StreamCleanedException} + * {@link StreamCompression} * {@link StreamMetadata} * {@link StreamStorePersistenceConfiguration} * {@link Supplier} diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestMaxMemStreamStore.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestMaxMemStreamStore.java index a2a9f698e50..aba613eaf0f 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestMaxMemStreamStore.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestMaxMemStreamStore.java @@ -60,7 +60,7 @@ import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; -import com.palantir.common.compression.LZ4CompressingInputStream; +import com.palantir.common.compression.StreamCompression; import com.palantir.common.io.ConcatenatedInputStream; import com.palantir.util.AssertUtils; import com.palantir.util.ByteArrayIOStream; @@ -69,8 +69,6 @@ import com.palantir.util.file.DeleteOnCloseFileInputStream; import com.palantir.util.file.TempFileUtils; -import net.jpountz.lz4.LZ4BlockInputStream; - @Generated("com.palantir.atlasdb.table.description.render.StreamStoreRenderer") @SuppressWarnings("all") public final class StreamTestMaxMemStreamStore extends AbstractPersistentStreamStore { @@ -88,7 +86,7 @@ private StreamTestMaxMemStreamStore(TransactionManager txManager, StreamTestTabl } private StreamTestMaxMemStreamStore(TransactionManager txManager, StreamTestTableFactory tables, Supplier persistenceConfiguration) { - super(txManager, persistenceConfiguration); + super(txManager, StreamCompression.NONE, persistenceConfiguration); this.tables = tables; } @@ -411,8 +409,6 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link ImmutableSet} * {@link InputStream} * {@link Ints} - * {@link LZ4BlockInputStream} - * {@link LZ4CompressingInputStream} * {@link List} * {@link Lists} * {@link Logger} @@ -433,6 +429,7 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link Sha256Hash} * {@link Status} * {@link StreamCleanedException} + * {@link StreamCompression} * {@link StreamMetadata} * {@link StreamStorePersistenceConfiguration} * {@link Supplier} diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestStreamStore.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestStreamStore.java index 54c277e3d88..11451ace4a1 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestStreamStore.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestStreamStore.java @@ -60,7 +60,7 @@ import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; -import com.palantir.common.compression.LZ4CompressingInputStream; +import com.palantir.common.compression.StreamCompression; import com.palantir.common.io.ConcatenatedInputStream; import com.palantir.util.AssertUtils; import com.palantir.util.ByteArrayIOStream; @@ -69,8 +69,6 @@ import com.palantir.util.file.DeleteOnCloseFileInputStream; import com.palantir.util.file.TempFileUtils; -import net.jpountz.lz4.LZ4BlockInputStream; - @Generated("com.palantir.atlasdb.table.description.render.StreamStoreRenderer") @SuppressWarnings("all") public final class StreamTestStreamStore extends AbstractPersistentStreamStore { @@ -88,7 +86,7 @@ private StreamTestStreamStore(TransactionManager txManager, StreamTestTableFacto } private StreamTestStreamStore(TransactionManager txManager, StreamTestTableFactory tables, Supplier persistenceConfiguration) { - super(txManager, persistenceConfiguration); + super(txManager, StreamCompression.NONE, persistenceConfiguration); this.tables = tables; } @@ -411,8 +409,6 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link ImmutableSet} * {@link InputStream} * {@link Ints} - * {@link LZ4BlockInputStream} - * {@link LZ4CompressingInputStream} * {@link List} * {@link Lists} * {@link Logger} @@ -433,6 +429,7 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link Sha256Hash} * {@link Status} * {@link StreamCleanedException} + * {@link StreamCompression} * {@link StreamMetadata} * {@link StreamStorePersistenceConfiguration} * {@link Supplier} diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestWithHashStreamStore.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestWithHashStreamStore.java index b6ce1ead414..844fca5f6a6 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestWithHashStreamStore.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/StreamTestWithHashStreamStore.java @@ -60,7 +60,7 @@ import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; -import com.palantir.common.compression.LZ4CompressingInputStream; +import com.palantir.common.compression.StreamCompression; import com.palantir.common.io.ConcatenatedInputStream; import com.palantir.util.AssertUtils; import com.palantir.util.ByteArrayIOStream; @@ -69,8 +69,6 @@ import com.palantir.util.file.DeleteOnCloseFileInputStream; import com.palantir.util.file.TempFileUtils; -import net.jpountz.lz4.LZ4BlockInputStream; - @Generated("com.palantir.atlasdb.table.description.render.StreamStoreRenderer") @SuppressWarnings("all") public final class StreamTestWithHashStreamStore extends AbstractPersistentStreamStore { @@ -88,7 +86,7 @@ private StreamTestWithHashStreamStore(TransactionManager txManager, StreamTestTa } private StreamTestWithHashStreamStore(TransactionManager txManager, StreamTestTableFactory tables, Supplier persistenceConfiguration) { - super(txManager, persistenceConfiguration); + super(txManager, StreamCompression.LZ4, persistenceConfiguration); this.tables = tables; } @@ -199,64 +197,6 @@ private byte[] getBlock(Transaction t, StreamTestWithHashStreamValueTable.Stream return valueTable.getValues(ImmutableSet.of(row)).get(row); } - @Override - protected StreamMetadata storeBlocksAndGetFinalMetadata(Transaction t, long id, InputStream stream) { - //Hash the data before compressing it - MessageDigest digest = Sha256Hash.getMessageDigest(); - try (InputStream hashingStream = new DigestInputStream(stream, digest); - InputStream compressingStream = new LZ4CompressingInputStream(hashingStream)) { - StreamMetadata metadata = storeBlocksAndGetHashlessMetadata(t, id, compressingStream); - return StreamMetadata.newBuilder(metadata) - .setHash(ByteString.copyFrom(digest.digest())) - .build(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public InputStream loadStream(Transaction t, final Long id) { - return new LZ4BlockInputStream(super.loadStream(t, id)); - } - - @Override - public Optional loadSingleStream(Transaction t, final Long id) { - Optional inputStream = super.loadSingleStream(t, id); - return inputStream.map(LZ4BlockInputStream::new); - } - - @Override - public Map loadStreams(Transaction t, Set ids) { - Map compressedStreams = super.loadStreams(t, ids); - return Maps.transformValues(compressedStreams, stream -> { - return new LZ4BlockInputStream(stream); - }); - } - - @Override - protected void tryWriteStreamToFile(Transaction transaction, Long id, StreamMetadata metadata, FileOutputStream fos) throws IOException { - try (InputStream blockStream = makeStreamUsingTransaction(transaction, id, metadata); - InputStream decompressingStream = new LZ4BlockInputStream(blockStream); - OutputStream fileStream = fos;) { - ByteStreams.copy(decompressingStream, fileStream); - } - } - - private InputStream makeStreamUsingTransaction(Transaction parent, Long id, StreamMetadata metadata) { - BiConsumer singleBlockLoader = (index, destination) -> - loadSingleBlockToOutputStream(parent, id, index, destination); - - BlockGetter pageRefresher = new BlockLoader(singleBlockLoader, BLOCK_SIZE_IN_BYTES); - long totalBlocks = getNumberOfBlocksFromMetadata(metadata); - int blocksInMemory = getNumberOfBlocksThatFitInMemory(); - - try { - return BlockConsumingInputStream.create(pageRefresher, totalBlocks, blocksInMemory); - } catch(IOException e) { - throw Throwables.throwUncheckedException(e); - } - } - @Override protected Map getMetadata(Transaction t, Set streamIds) { if (streamIds.isEmpty()) { @@ -469,8 +409,6 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link ImmutableSet} * {@link InputStream} * {@link Ints} - * {@link LZ4BlockInputStream} - * {@link LZ4CompressingInputStream} * {@link List} * {@link Lists} * {@link Logger} @@ -491,6 +429,7 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link Sha256Hash} * {@link Status} * {@link StreamCleanedException} + * {@link StreamCompression} * {@link StreamMetadata} * {@link StreamStorePersistenceConfiguration} * {@link Supplier} diff --git a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/TestHashComponentsStreamStore.java b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/TestHashComponentsStreamStore.java index 8f8b2202d8b..1418c17cdba 100644 --- a/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/TestHashComponentsStreamStore.java +++ b/atlasdb-tests-shared/src/test/java/com/palantir/atlasdb/schema/stream/generated/TestHashComponentsStreamStore.java @@ -60,7 +60,7 @@ import com.palantir.atlasdb.transaction.api.TransactionTask; import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; -import com.palantir.common.compression.LZ4CompressingInputStream; +import com.palantir.common.compression.StreamCompression; import com.palantir.common.io.ConcatenatedInputStream; import com.palantir.util.AssertUtils; import com.palantir.util.ByteArrayIOStream; @@ -69,8 +69,6 @@ import com.palantir.util.file.DeleteOnCloseFileInputStream; import com.palantir.util.file.TempFileUtils; -import net.jpountz.lz4.LZ4BlockInputStream; - @Generated("com.palantir.atlasdb.table.description.render.StreamStoreRenderer") @SuppressWarnings("all") public final class TestHashComponentsStreamStore extends AbstractPersistentStreamStore { @@ -88,7 +86,7 @@ private TestHashComponentsStreamStore(TransactionManager txManager, StreamTestTa } private TestHashComponentsStreamStore(TransactionManager txManager, StreamTestTableFactory tables, Supplier persistenceConfiguration) { - super(txManager, persistenceConfiguration); + super(txManager, StreamCompression.NONE, persistenceConfiguration); this.tables = tables; } @@ -411,8 +409,6 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link ImmutableSet} * {@link InputStream} * {@link Ints} - * {@link LZ4BlockInputStream} - * {@link LZ4CompressingInputStream} * {@link List} * {@link Lists} * {@link Logger} @@ -433,6 +429,7 @@ protected void touchMetadataWhileMarkingUsedForConflicts(Transaction t, Iterable * {@link Sha256Hash} * {@link Status} * {@link StreamCleanedException} + * {@link StreamCompression} * {@link StreamMetadata} * {@link StreamStorePersistenceConfiguration} * {@link Supplier} diff --git a/changelog/@unreleased/pr-4220.v2.yml b/changelog/@unreleased/pr-4220.v2.yml new file mode 100644 index 00000000000..1beb9595357 --- /dev/null +++ b/changelog/@unreleased/pr-4220.v2.yml @@ -0,0 +1,6 @@ +type: improvement +improvement: + description: | + Adding gzip compression support to the store streams. + links: + - https://github.com/palantir/atlasdb/pull/4220 diff --git a/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosStreamStore.java b/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosStreamStore.java index 4850866f8a6..8561b6b58aa 100644 --- a/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosStreamStore.java +++ b/examples/profile-client/src/main/java/com/palantir/example/profile/schema/generated/UserPhotosStreamStore.java @@ -61,6 +61,7 @@ import com.palantir.atlasdb.transaction.impl.TxTask; import com.palantir.common.base.Throwables; import com.palantir.common.compression.LZ4CompressingInputStream; +import com.palantir.common.compression.StreamCompression; import com.palantir.common.io.ConcatenatedInputStream; import com.palantir.util.AssertUtils; import com.palantir.util.ByteArrayIOStream; @@ -88,7 +89,7 @@ private UserPhotosStreamStore(TransactionManager txManager, ProfileTableFactory } private UserPhotosStreamStore(TransactionManager txManager, ProfileTableFactory tables, Supplier persistenceConfiguration) { - super(txManager, persistenceConfiguration); + super(txManager, StreamCompression.NONE, persistenceConfiguration); this.tables = tables; }