Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

adding gzip compression support to the schema #4220

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.palantir.atlasdb.table.description.render.Renderers;
import com.palantir.atlasdb.table.description.render.StreamStoreRenderer;
import com.palantir.common.base.Throwables;
import com.palantir.common.compression.EnumClientCompressor;

public class StreamStoreDefinition {
// from ArrayList.MAX_ARRAY_SIZE on 64-bit systems
Expand All @@ -36,7 +37,7 @@ public class StreamStoreDefinition {
private final String shortName;
private final String longName;
private final ValueType idType;
private final boolean compressStream;
private final EnumClientCompressor streamCompressType;
private final int numberOfRowComponentsHashed;

private int inMemoryThreshold;
Expand All @@ -47,14 +48,14 @@ public class StreamStoreDefinition {
String longName,
ValueType idType,
int inMemoryThreshold,
boolean compressStream,
EnumClientCompressor streamCompressType,
int numberOfRowComponentsHashed) {
this.streamStoreTables = streamStoreTables;
this.shortName = shortName;
this.longName = longName;
this.idType = idType;
this.inMemoryThreshold = inMemoryThreshold;
this.compressStream = compressStream;
this.streamCompressType = streamCompressType;
this.numberOfRowComponentsHashed = numberOfRowComponentsHashed;
}

Expand All @@ -72,7 +73,7 @@ public int getNumberOfRowComponentsHashed() {

public StreamStoreRenderer getRenderer(String packageName, String name) {
String renderedLongName = Renderers.CamelCase(longName);
return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold, compressStream);
return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold, streamCompressType);
}

public Multimap<String, Supplier<OnCleanupTask>> getCleanupTasks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.palantir.atlasdb.protos.generated.TableMetadataPersistence;
import com.palantir.atlasdb.table.description.TableDefinition;
import com.palantir.atlasdb.table.description.ValueType;
import com.palantir.common.compression.EnumClientCompressor;

public class StreamStoreDefinitionBuilder {
private final ValueType valueType;
Expand All @@ -32,7 +33,7 @@ public class StreamStoreDefinitionBuilder {
private Map<String, StreamTableDefinitionBuilder> streamTables =
Maps.newHashMapWithExpectedSize(StreamTableType.values().length);
private int inMemoryThreshold = AtlasDbConstants.DEFAULT_STREAM_IN_MEMORY_THRESHOLD;
private boolean compressStream;
private EnumClientCompressor compressStreamType;
private int numberOfRowComponentsHashed = 0;

/**
Expand All @@ -48,7 +49,7 @@ public StreamStoreDefinitionBuilder(String shortName, String longName, ValueType
this.valueType = valueType;
this.shortName = shortName;
this.longName = longName;
this.compressStream = false;
this.compressStreamType = EnumClientCompressor.NONE;
}

/**
Expand Down Expand Up @@ -102,7 +103,12 @@ public StreamStoreDefinitionBuilder compressBlocksInDb() {
}

public StreamStoreDefinitionBuilder compressStreamInClient() {
compressStream = true;
compressStreamType = EnumClientCompressor.LZ4;
return this;
}

public StreamStoreDefinitionBuilder compressStreamInClient(EnumClientCompressor compressionType) {
compressStreamType = compressionType;
return this;
}

Expand All @@ -125,7 +131,7 @@ public StreamStoreDefinition build() {
longName,
valueType,
inMemoryThreshold,
compressStream,
compressStreamType,
numberOfRowComponentsHashed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.zip.GZIPInputStream;

import javax.annotation.CheckForNull;
import javax.annotation.Generated;
Expand Down Expand Up @@ -75,6 +76,8 @@
import com.palantir.atlasdb.transaction.api.TransactionTask;
import com.palantir.atlasdb.transaction.impl.TxTask;
import com.palantir.common.base.Throwables;
import com.palantir.common.compression.EnumClientCompressor;
import com.palantir.common.compression.GzipCompressingInputStream;
import com.palantir.common.compression.LZ4CompressingInputStream;
import com.palantir.common.io.ConcatenatedInputStream;
import com.palantir.util.AssertUtils;
Expand All @@ -93,9 +96,9 @@ public class StreamStoreRenderer {
private final String packageName;
private final String schemaName;
private final int inMemoryThreshold;
private final boolean clientSideCompression;
private final EnumClientCompressor clientSideCompression;

public StreamStoreRenderer(String name, ValueType streamIdType, String packageName, String schemaName, int inMemoryThreshold, boolean clientSideCompression) {
public StreamStoreRenderer(String name, ValueType streamIdType, String packageName, String schemaName, int inMemoryThreshold, EnumClientCompressor clientSideCompression) {
this.name = name;
this.streamIdType = streamIdType;
this.packageName = packageName;
Expand Down Expand Up @@ -173,7 +176,7 @@ protected void run() {
line();
getBlock();
line();
if (clientSideCompression) {
if (clientSideCompression != EnumClientCompressor.NONE) {
storeBlocksAndGetFinalMetadata();
line();
loadStreamWithCompression();
Expand Down Expand Up @@ -558,52 +561,86 @@ private void unmarkStreamsAsUsed() {
}

private void storeBlocksAndGetFinalMetadata() {
String compression = clientSideCompression.compressionType;
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
line("@Override");
line("protected StreamMetadata storeBlocksAndGetFinalMetadata(Transaction t, long id, InputStream stream) {"); {
line("//Hash the data before compressing it");
line("MessageDigest digest = Sha256Hash.getMessageDigest();");
line("try (InputStream hashingStream = new DigestInputStream(stream, digest);");
line(" InputStream compressingStream = new LZ4CompressingInputStream(hashingStream)) {"); {
line(" InputStream compressingStream = new " + compression + "(hashingStream)) {"); {
line("StreamMetadata metadata = storeBlocksAndGetHashlessMetadata(t, id, compressingStream);");
line("return StreamMetadata.newBuilder(metadata)");
line(" .setHash(ByteString.copyFrom(digest.digest()))");
line(" .build();");
} line("} catch (IOException e) {"); {
line("throw new RuntimeException(e);");
} line("} catch (Exception e) {"); {
line("throw Throwables.rewrapAndThrowUncheckedException(\"Error in creating compression stream\", e);");
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
} line("}");
} line("}");
}

private void closeStreamWithExceptionLogging(String streamVarName, String streamId) {
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
line("try {"); {
line(streamVarName + ".close();");
} line("} catch (IOException streamCloseExc) {"); {
line("log.error(\"Error closing stream id {}\", " + streamId + ", streamCloseExc);");
} line ("}");
}

private void loadStreamWithCompression() {
line("@Override");
line("public InputStream loadStream(Transaction t, final ", StreamId, " id) {"); {
line("return new LZ4BlockInputStream(super.loadStream(t, id));");
line("InputStream in = super.loadStream(t, id);");
line("try {"); {
line("return new " + clientSideCompression.inputClass + "(super.loadStream(t, id));");
} line("} catch (Exception e) {"); {
closeStreamWithExceptionLogging("in", "id");
line("throw Throwables.rewrapAndThrowUncheckedException(\"Error in creating compression stream\", e);");
} line("}");
} line("}");
}

private void loadSingleStreamWithCompression() {
line("@Override");
line("public Optional<InputStream> loadSingleStream(Transaction t, final ", StreamId, " id) {"); {
line("Optional<InputStream> inputStream = super.loadSingleStream(t, id);");
line("return inputStream.map(LZ4BlockInputStream::new);");

line("return inputStream.map( s-> {"); {
line("try {"); {
line("return new " + clientSideCompression.inputClass + "(s);");
} line("} catch (IOException e) {"); {
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
closeStreamWithExceptionLogging("s", "id");
line("throw Throwables.rewrapAndThrowUncheckedException(\"Error in creating compression stream\", e);");
} line("}");
} line("});");
} line("}");
}

private void loadStreamsWithCompression() {
line("@Override");
line("public Map<", StreamId, ", InputStream> loadStreams(Transaction t, Set<", StreamId, "> ids) {"); {
line("Map<", StreamId, ", InputStream> compressedStreams = super.loadStreams(t, ids);");
line("return Maps.transformValues(compressedStreams, stream -> {"); {
line("return new LZ4BlockInputStream(stream);");
} line("});");
line("try {"); {
line("return Maps.transformValues(compressedStreams, stream -> {"); {
line("try {"); {
line("return new " + clientSideCompression.inputClass + "(stream);");
} line("} catch (IOException e) {"); {
line("throw Throwables.rewrapAndThrowUncheckedException(\"Error in creating compression stream\", e);");
} line("}");
} line("});");
} line("} catch (Exception exc) {"); {
line("compressedStreams.entrySet().forEach(e -> {"); {
closeStreamWithExceptionLogging("e.getValue()", "e.getKey()");
} line ("});");
line("throw exc;");
} line("}");
} line("}");
}

private void tryWriteStreamToFile() {
line("@Override");
line("protected void tryWriteStreamToFile(Transaction transaction, ", StreamId, " id, StreamMetadata metadata, FileOutputStream fos) throws IOException {"); {
line("try (InputStream blockStream = makeStreamUsingTransaction(transaction, id, metadata);");
line(" InputStream decompressingStream = new LZ4BlockInputStream(blockStream);");
line(" InputStream decompressingStream = new " + clientSideCompression.inputClass + "(blockStream);");
line(" OutputStream fileStream = fos;) {"); {
line("ByteStreams.copy(decompressingStream, fileStream);");
} line("}");
Expand Down Expand Up @@ -849,6 +886,8 @@ private void cellsCleanedUp() {
Generated.class,
LZ4CompressingInputStream.class,
LZ4BlockInputStream.class,
GzipCompressingInputStream.class,
GZIPInputStream.class,
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
Pair.class,
Functions.class,
ByteStreams.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ public BufferedDelegateInputStream(InputStream delegate, int bufferLength) {
this.buffer = new byte[bufferLength];
}

public BufferedDelegateInputStream(InputStream delegate, int bufferLength, int initialBufferSize) {
Preconditions.checkArgument(bufferLength >= 0, "buffer size must be greater than or equal to zero");
this.delegate = delegate;
this.position = 0;
this.bufferSize = initialBufferSize;
this.buffer = new byte[bufferLength];
}

@Override
public final int read() throws IOException {
if (!ensureBytesAvailable()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* (c) Copyright 2019 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.common.compression;

import java.util.zip.GZIPInputStream;

import net.jpountz.lz4.LZ4BlockInputStream;

public enum EnumClientCompressor {
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
GZIP(GzipCompressingInputStream.class.getName(), GZIPInputStream.class.getName()),
LZ4(LZ4CompressingInputStream.class.getName(), LZ4BlockInputStream.class.getName()),
NONE(null, null);

public final String compressionType;
public final String inputClass;

EnumClientCompressor(String compressionType, String inputClass) {
this.compressionType = compressionType;
this.inputClass = inputClass;
}
}
Loading