Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

Gzip compression #4311

Merged
merged 18 commits into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.palantir.atlasdb.table.description.render.Renderers;
import com.palantir.atlasdb.table.description.render.StreamStoreRenderer;
import com.palantir.common.base.Throwables;
import com.palantir.common.compression.StreamCompression;

public class StreamStoreDefinition {
// from ArrayList.MAX_ARRAY_SIZE on 64-bit systems
Expand All @@ -36,7 +37,7 @@ public class StreamStoreDefinition {
private final String shortName;
private final String longName;
private final ValueType idType;
private final boolean compressStream;
private final StreamCompression streamCompression;
private final int numberOfRowComponentsHashed;

private int inMemoryThreshold;
Expand All @@ -47,14 +48,14 @@ public class StreamStoreDefinition {
String longName,
ValueType idType,
int inMemoryThreshold,
boolean compressStream,
StreamCompression streamCompression,
int numberOfRowComponentsHashed) {
this.streamStoreTables = streamStoreTables;
this.shortName = shortName;
this.longName = longName;
this.idType = idType;
this.inMemoryThreshold = inMemoryThreshold;
this.compressStream = compressStream;
this.streamCompression = streamCompression;
this.numberOfRowComponentsHashed = numberOfRowComponentsHashed;
}

Expand All @@ -72,7 +73,8 @@ public int getNumberOfRowComponentsHashed() {

public StreamStoreRenderer getRenderer(String packageName, String name) {
String renderedLongName = Renderers.CamelCase(longName);
return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold, compressStream);
return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold,
streamCompression);
}

public Multimap<String, Supplier<OnCleanupTask>> getCleanupTasks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.palantir.atlasdb.protos.generated.TableMetadataPersistence;
import com.palantir.atlasdb.table.description.TableDefinition;
import com.palantir.atlasdb.table.description.ValueType;
import com.palantir.common.compression.StreamCompression;

public class StreamStoreDefinitionBuilder {
private final ValueType valueType;
Expand All @@ -32,7 +33,7 @@ public class StreamStoreDefinitionBuilder {
private Map<String, StreamTableDefinitionBuilder> streamTables =
Maps.newHashMapWithExpectedSize(StreamTableType.values().length);
private int inMemoryThreshold = AtlasDbConstants.DEFAULT_STREAM_IN_MEMORY_THRESHOLD;
private boolean compressStream;
private StreamCompression compressStreamType;
private int numberOfRowComponentsHashed = 0;

/**
Expand All @@ -48,7 +49,7 @@ public StreamStoreDefinitionBuilder(String shortName, String longName, ValueType
this.valueType = valueType;
this.shortName = shortName;
this.longName = longName;
this.compressStream = false;
this.compressStreamType = StreamCompression.NONE;
}

/**
Expand Down Expand Up @@ -102,7 +103,11 @@ public StreamStoreDefinitionBuilder compressBlocksInDb() {
}

public StreamStoreDefinitionBuilder compressStreamInClient() {
compressStream = true;
return compressStreamInClient(StreamCompression.LZ4);
}

public StreamStoreDefinitionBuilder compressStreamInClient(StreamCompression compressionType) {
compressStreamType = compressionType;
return this;
}

Expand All @@ -125,7 +130,7 @@ public StreamStoreDefinition build() {
longName,
valueType,
inMemoryThreshold,
compressStream,
compressStreamType,
numberOfRowComponentsHashed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,29 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
import com.palantir.atlasdb.protos.generated.StreamPersistence.Status;
import com.palantir.atlasdb.protos.generated.StreamPersistence.StreamMetadata;
import com.palantir.atlasdb.transaction.api.Transaction;
import com.palantir.atlasdb.transaction.api.TransactionManager;
import com.palantir.common.base.Throwables;
import com.palantir.common.compression.StreamCompression;
import com.palantir.logsafe.exceptions.SafeIllegalArgumentException;
import com.palantir.util.ByteArrayIOStream;

public abstract class AbstractGenericStreamStore<T> implements GenericStreamStore<T> {
protected static final Logger log = LoggerFactory.getLogger(AbstractGenericStreamStore.class);

@CheckForNull protected final TransactionManager txnMgr;
private final StreamCompression compression;

protected AbstractGenericStreamStore(TransactionManager txManager) {
protected AbstractGenericStreamStore(
TransactionManager txManager,
StreamCompression compression) {
this.txnMgr = txManager;
this.compression = compression;
}

private long getNumberOfBlocksFromMetadata(StreamMetadata metadata) {
Expand Down Expand Up @@ -96,7 +102,7 @@ public Map<T, InputStream> loadStreams(Transaction transaction, Set<T> ids) {

private InputStream getStream(Transaction transaction, T id, StreamMetadata metadata) {
try {
return tryGetStream(transaction, id, metadata);
return compression.decompress(tryGetStream(transaction, id, metadata));
} catch (FileNotFoundException e) {
log.error("Error opening temp file for stream {}", id, e);
throw Throwables.rewrapAndThrowUncheckedException("Could not open temp file to create stream.", e);
Expand Down Expand Up @@ -209,12 +215,10 @@ private void loadNBlocksToOutputStream(
}
}

// This method is overridden in generated code. Changes to this method may have unintended consequences.
protected void tryWriteStreamToFile(Transaction transaction, T id, StreamMetadata metadata, FileOutputStream fos)
private void tryWriteStreamToFile(Transaction transaction, T id, StreamMetadata metadata, FileOutputStream fos)
throws IOException {
long numBlocks = getNumberOfBlocksFromMetadata(metadata);
for (long i = 0; i < numBlocks; i++) {
loadSingleBlockToOutputStream(transaction, id, i, fos);
try (InputStream in = loadStream(transaction, id)) {
ByteStreams.copy(in, fos);
}
fos.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,25 @@
import com.palantir.atlasdb.transaction.api.TransactionTask;
import com.palantir.atlasdb.transaction.impl.TxTask;
import com.palantir.common.base.Throwables;
import com.palantir.common.compression.StreamCompression;
import com.palantir.util.Pair;
import com.palantir.util.crypto.Sha256Hash;

public abstract class AbstractPersistentStreamStore extends AbstractGenericStreamStore<Long>
implements PersistentStreamStore {
private final StreamStoreBackoffStrategy backoffStrategy;
private final StreamCompression compression;

protected AbstractPersistentStreamStore(TransactionManager txManager) {
this(txManager, () -> StreamStorePersistenceConfiguration.DEFAULT_CONFIG);
protected AbstractPersistentStreamStore(TransactionManager txManager,
StreamCompression compression) {
this(txManager, compression, () -> StreamStorePersistenceConfiguration.DEFAULT_CONFIG);
}

protected AbstractPersistentStreamStore(TransactionManager txManager,
StreamCompression compression,
Supplier<StreamStorePersistenceConfiguration> persistenceConfiguration) {
super(txManager);
super(txManager, compression);
this.compression = compression;
this.backoffStrategy = StandardPeriodicBackoffStrategy.create(persistenceConfiguration);
}

Expand Down Expand Up @@ -145,8 +150,9 @@ public Map<Long, Sha256Hash> storeStreams(final Transaction tx, final Map<Long,
// This method is overridden in generated code. Changes to this method may have unintended consequences.
protected StreamMetadata storeBlocksAndGetFinalMetadata(@Nullable Transaction tx, long id, InputStream stream) {
MessageDigest digest = Sha256Hash.getMessageDigest();
try (InputStream hashingStream = new DigestInputStream(stream, digest)) {
StreamMetadata metadata = storeBlocksAndGetHashlessMetadata(tx, id, hashingStream);
try (InputStream hashingStream = new DigestInputStream(stream, digest);
InputStream compressingStream = compression.compress(hashingStream)) {
StreamMetadata metadata = storeBlocksAndGetHashlessMetadata(tx, id, compressingStream);
return StreamMetadata.newBuilder(metadata)
.setHash(ByteString.copyFrom(digest.digest()))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
import com.palantir.atlasdb.transaction.api.TransactionTask;
import com.palantir.atlasdb.transaction.impl.TxTask;
import com.palantir.common.base.Throwables;
import com.palantir.common.compression.LZ4CompressingInputStream;
import com.palantir.common.compression.StreamCompression;
import com.palantir.common.io.ConcatenatedInputStream;
import com.palantir.util.AssertUtils;
import com.palantir.util.ByteArrayIOStream;
Expand All @@ -84,24 +84,22 @@
import com.palantir.util.file.DeleteOnCloseFileInputStream;
import com.palantir.util.file.TempFileUtils;

import net.jpountz.lz4.LZ4BlockInputStream;

@SuppressWarnings("checkstyle:all") // too many warnings to fix
public class StreamStoreRenderer {
private final String name;
private final ValueType streamIdType;
private final String packageName;
private final String schemaName;
private final int inMemoryThreshold;
private final boolean clientSideCompression;
private final StreamCompression streamCompression;

public StreamStoreRenderer(String name, ValueType streamIdType, String packageName, String schemaName, int inMemoryThreshold, boolean clientSideCompression) {
public StreamStoreRenderer(String name, ValueType streamIdType, String packageName, String schemaName, int inMemoryThreshold, StreamCompression streamCompression) {
this.name = name;
this.streamIdType = streamIdType;
this.packageName = packageName;
this.schemaName = schemaName;
this.inMemoryThreshold = inMemoryThreshold;
this.clientSideCompression = clientSideCompression;
this.streamCompression = streamCompression;
}

public String getPackageName() {
Expand Down Expand Up @@ -173,20 +171,6 @@ protected void run() {
line();
getBlock();
line();
if (clientSideCompression) {
storeBlocksAndGetFinalMetadata();
line();
loadStreamWithCompression();
line();
loadSingleStreamWithCompression();
line();
loadStreamsWithCompression();
line();
tryWriteStreamToFile();
line();
makeStreamUsingTransaction();
line();
}
getMetadata();
line();
lookupStreamIdsByHash();
Expand Down Expand Up @@ -228,7 +212,9 @@ private void constructors() {
line();
line("private ", StreamStore, "(TransactionManager txManager, ", TableFactory, " tables, ",
"Supplier<StreamStorePersistenceConfiguration> persistenceConfiguration) {"); {
line("super(txManager, persistenceConfiguration);");
line("super(txManager, ",
streamCompression.getClass().getSimpleName() + "." + streamCompression,
", persistenceConfiguration);");
line("this.tables = tables;");
} line("}");
line();
Expand Down Expand Up @@ -556,77 +542,6 @@ private void unmarkStreamsAsUsed() {
line("index.delete(toDelete);");
} line("}");
}

private void storeBlocksAndGetFinalMetadata() {
line("@Override");
line("protected StreamMetadata storeBlocksAndGetFinalMetadata(Transaction t, long id, InputStream stream) {"); {
line("//Hash the data before compressing it");
line("MessageDigest digest = Sha256Hash.getMessageDigest();");
line("try (InputStream hashingStream = new DigestInputStream(stream, digest);");
line(" InputStream compressingStream = new LZ4CompressingInputStream(hashingStream)) {"); {
line("StreamMetadata metadata = storeBlocksAndGetHashlessMetadata(t, id, compressingStream);");
line("return StreamMetadata.newBuilder(metadata)");
line(" .setHash(ByteString.copyFrom(digest.digest()))");
line(" .build();");
} line("} catch (IOException e) {"); {
line("throw new RuntimeException(e);");
} line("}");
} line("}");
}

private void loadStreamWithCompression() {
line("@Override");
line("public InputStream loadStream(Transaction t, final ", StreamId, " id) {"); {
line("return new LZ4BlockInputStream(super.loadStream(t, id));");
} line("}");
}

private void loadSingleStreamWithCompression() {
line("@Override");
line("public Optional<InputStream> loadSingleStream(Transaction t, final ", StreamId, " id) {"); {
line("Optional<InputStream> inputStream = super.loadSingleStream(t, id);");
line("return inputStream.map(LZ4BlockInputStream::new);");
} line("}");
}

private void loadStreamsWithCompression() {
line("@Override");
line("public Map<", StreamId, ", InputStream> loadStreams(Transaction t, Set<", StreamId, "> ids) {"); {
line("Map<", StreamId, ", InputStream> compressedStreams = super.loadStreams(t, ids);");
line("return Maps.transformValues(compressedStreams, stream -> {"); {
line("return new LZ4BlockInputStream(stream);");
} line("});");
} line("}");
}

private void tryWriteStreamToFile() {
line("@Override");
line("protected void tryWriteStreamToFile(Transaction transaction, ", StreamId, " id, StreamMetadata metadata, FileOutputStream fos) throws IOException {"); {
line("try (InputStream blockStream = makeStreamUsingTransaction(transaction, id, metadata);");
line(" InputStream decompressingStream = new LZ4BlockInputStream(blockStream);");
line(" OutputStream fileStream = fos;) {"); {
line("ByteStreams.copy(decompressingStream, fileStream);");
} line("}");
} line("}");
}

private void makeStreamUsingTransaction() {
line("private InputStream makeStreamUsingTransaction(Transaction parent, ", StreamId, " id, StreamMetadata metadata) {"); {
line("BiConsumer<Long, OutputStream> singleBlockLoader = (index, destination) ->");
line(" loadSingleBlockToOutputStream(parent, id, index, destination);");
line();
line("BlockGetter pageRefresher = new BlockLoader(singleBlockLoader, BLOCK_SIZE_IN_BYTES);");
line("long totalBlocks = getNumberOfBlocksFromMetadata(metadata);");
line("int blocksInMemory = getNumberOfBlocksThatFitInMemory();");
line();
line("try {"); {
line("return BlockConsumingInputStream.create(pageRefresher, totalBlocks, blocksInMemory);");
} line("} catch(IOException e) {"); {
line("throw Throwables.throwUncheckedException(e);");
} line("}");
} line("}");
}

}.render();
}

Expand Down Expand Up @@ -869,8 +784,7 @@ private void cellsCleanedUp() {
List.class,
CheckForNull.class,
Generated.class,
LZ4CompressingInputStream.class,
LZ4BlockInputStream.class,
StreamCompression.class,
Pair.class,
Functions.class,
ByteStreams.class,
Expand Down
Loading