Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

adding gzip compression support to the schema #4220

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.palantir.atlasdb.table.description.render.Renderers;
import com.palantir.atlasdb.table.description.render.StreamStoreRenderer;
import com.palantir.common.base.Throwables;
import com.palantir.common.compression.EnumClientCompressor;

public class StreamStoreDefinition {
// from ArrayList.MAX_ARRAY_SIZE on 64-bit systems
Expand All @@ -36,7 +37,7 @@ public class StreamStoreDefinition {
private final String shortName;
private final String longName;
private final ValueType idType;
private final boolean compressStream;
private final EnumClientCompressor streamCompressType;
private final int numberOfRowComponentsHashed;

private int inMemoryThreshold;
Expand All @@ -47,14 +48,14 @@ public class StreamStoreDefinition {
String longName,
ValueType idType,
int inMemoryThreshold,
boolean compressStream,
EnumClientCompressor streamCompressType,
int numberOfRowComponentsHashed) {
this.streamStoreTables = streamStoreTables;
this.shortName = shortName;
this.longName = longName;
this.idType = idType;
this.inMemoryThreshold = inMemoryThreshold;
this.compressStream = compressStream;
this.streamCompressType = streamCompressType;
this.numberOfRowComponentsHashed = numberOfRowComponentsHashed;
}

Expand All @@ -72,7 +73,7 @@ public int getNumberOfRowComponentsHashed() {

public StreamStoreRenderer getRenderer(String packageName, String name) {
String renderedLongName = Renderers.CamelCase(longName);
return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold, compressStream);
return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold, streamCompressType);
}

public Multimap<String, Supplier<OnCleanupTask>> getCleanupTasks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.palantir.atlasdb.protos.generated.TableMetadataPersistence;
import com.palantir.atlasdb.table.description.TableDefinition;
import com.palantir.atlasdb.table.description.ValueType;
import com.palantir.common.compression.EnumClientCompressor;

public class StreamStoreDefinitionBuilder {
private final ValueType valueType;
Expand All @@ -32,7 +33,7 @@ public class StreamStoreDefinitionBuilder {
private Map<String, StreamTableDefinitionBuilder> streamTables =
Maps.newHashMapWithExpectedSize(StreamTableType.values().length);
private int inMemoryThreshold = AtlasDbConstants.DEFAULT_STREAM_IN_MEMORY_THRESHOLD;
private boolean compressStream;
private EnumClientCompressor compressStreamType;
private int numberOfRowComponentsHashed = 0;

/**
Expand All @@ -48,7 +49,7 @@ public StreamStoreDefinitionBuilder(String shortName, String longName, ValueType
this.valueType = valueType;
this.shortName = shortName;
this.longName = longName;
this.compressStream = false;
this.compressStreamType = EnumClientCompressor.NONE;
}

/**
Expand Down Expand Up @@ -102,7 +103,12 @@ public StreamStoreDefinitionBuilder compressBlocksInDb() {
}

public StreamStoreDefinitionBuilder compressStreamInClient() {
compressStream = true;
compressStreamType = EnumClientCompressor.LZ4;
return this;
}

public StreamStoreDefinitionBuilder compressStreamInClient(EnumClientCompressor compressionType) {
compressStreamType = compressionType;
return this;
}

Expand All @@ -125,7 +131,7 @@ public StreamStoreDefinition build() {
longName,
valueType,
inMemoryThreshold,
compressStream,
compressStreamType,
numberOfRowComponentsHashed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.zip.GZIPInputStream;

import javax.annotation.CheckForNull;
import javax.annotation.Generated;
Expand Down Expand Up @@ -75,6 +76,8 @@
import com.palantir.atlasdb.transaction.api.TransactionTask;
import com.palantir.atlasdb.transaction.impl.TxTask;
import com.palantir.common.base.Throwables;
import com.palantir.common.compression.EnumClientCompressor;
import com.palantir.common.compression.GzipCompressingInputStream;
import com.palantir.common.compression.LZ4CompressingInputStream;
import com.palantir.common.io.ConcatenatedInputStream;
import com.palantir.util.AssertUtils;
Expand All @@ -93,9 +96,9 @@ public class StreamStoreRenderer {
private final String packageName;
private final String schemaName;
private final int inMemoryThreshold;
private final boolean clientSideCompression;
private final EnumClientCompressor clientSideCompression;

public StreamStoreRenderer(String name, ValueType streamIdType, String packageName, String schemaName, int inMemoryThreshold, boolean clientSideCompression) {
public StreamStoreRenderer(String name, ValueType streamIdType, String packageName, String schemaName, int inMemoryThreshold, EnumClientCompressor clientSideCompression) {
this.name = name;
this.streamIdType = streamIdType;
this.packageName = packageName;
Expand Down Expand Up @@ -173,7 +176,7 @@ protected void run() {
line();
getBlock();
line();
if (clientSideCompression) {
if (clientSideCompression != EnumClientCompressor.NONE) {
storeBlocksAndGetFinalMetadata();
line();
loadStreamWithCompression();
Expand Down Expand Up @@ -558,12 +561,13 @@ private void unmarkStreamsAsUsed() {
}

private void storeBlocksAndGetFinalMetadata() {
String compression = clientSideCompression.compressionType;
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
line("@Override");
line("protected StreamMetadata storeBlocksAndGetFinalMetadata(Transaction t, long id, InputStream stream) {"); {
line("//Hash the data before compressing it");
line("MessageDigest digest = Sha256Hash.getMessageDigest();");
line("try (InputStream hashingStream = new DigestInputStream(stream, digest);");
line(" InputStream compressingStream = new LZ4CompressingInputStream(hashingStream)) {"); {
line(" InputStream compressingStream = new " + compression + "(hashingStream)) {"); {
line("StreamMetadata metadata = storeBlocksAndGetHashlessMetadata(t, id, compressingStream);");
line("return StreamMetadata.newBuilder(metadata)");
line(" .setHash(ByteString.copyFrom(digest.digest()))");
Expand All @@ -577,15 +581,27 @@ private void storeBlocksAndGetFinalMetadata() {
private void loadStreamWithCompression() {
line("@Override");
line("public InputStream loadStream(Transaction t, final ", StreamId, " id) {"); {
line("return new LZ4BlockInputStream(super.loadStream(t, id));");
line("try {"); {
line("return new " + clientSideCompression.inputClass + "(super.loadStream(t, id));");
} line("} catch (IOException e) {"); {
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
line("throw new RuntimeException(e);");
} line("}");
} line("}");
}

private void loadSingleStreamWithCompression() {
line("@Override");
line("public Optional<InputStream> loadSingleStream(Transaction t, final ", StreamId, " id) {"); {
line("Optional<InputStream> inputStream = super.loadSingleStream(t, id);");
line("return inputStream.map(LZ4BlockInputStream::new);");

line("return inputStream.map( s-> {"); {
line("try {"); {
line("return new " + clientSideCompression.inputClass + "(s);");
} line("} catch (IOException e) {"); {
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
line("throw new RuntimeException(e);");
} line("}");
}
line("});");
} line("}");
}

Expand All @@ -594,7 +610,11 @@ private void loadStreamsWithCompression() {
line("public Map<", StreamId, ", InputStream> loadStreams(Transaction t, Set<", StreamId, "> ids) {"); {
line("Map<", StreamId, ", InputStream> compressedStreams = super.loadStreams(t, ids);");
line("return Maps.transformValues(compressedStreams, stream -> {"); {
line("return new LZ4BlockInputStream(stream);");
line("try {"); {
line("return new " + clientSideCompression.inputClass + "(stream);");
} line("} catch (IOException e) {"); {
line("throw new RuntimeException(e);");
} line("}");
} line("});");
} line("}");
}
Expand All @@ -603,7 +623,7 @@ private void tryWriteStreamToFile() {
line("@Override");
line("protected void tryWriteStreamToFile(Transaction transaction, ", StreamId, " id, StreamMetadata metadata, FileOutputStream fos) throws IOException {"); {
line("try (InputStream blockStream = makeStreamUsingTransaction(transaction, id, metadata);");
line(" InputStream decompressingStream = new LZ4BlockInputStream(blockStream);");
line(" InputStream decompressingStream = new " + clientSideCompression.inputClass + "(blockStream);");
line(" OutputStream fileStream = fos;) {"); {
line("ByteStreams.copy(decompressingStream, fileStream);");
} line("}");
Expand Down Expand Up @@ -849,6 +869,8 @@ private void cellsCleanedUp() {
Generated.class,
LZ4CompressingInputStream.class,
LZ4BlockInputStream.class,
GzipCompressingInputStream.class,
GZIPInputStream.class,
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
Pair.class,
Functions.class,
ByteStreams.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* (c) Copyright 2019 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.palantir.common.compression;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import com.google.common.io.ByteStreams;
import com.palantir.logsafe.Preconditions;

/**
* {@link InputStream} that wraps a delegate InputStream, compressing its
* contents as they are read. Compression is managed via a compressing
* {@link OutputStream}. Reads pull data as necessary from the delegate
* InputStream, write the uncompressed data through the OutputStream, then
* serve the read from the resulting compressed buffer.
*/
public abstract class AbstractCompressingInputStream extends BufferedDelegateInputStream {
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved

private final int blockSize;
private final byte[] uncompressedBuffer;

// Position in the compressed buffer while writing
private int writeBufferPosition;
// Flag to indicate whether this stream has been exhausted.
private boolean finished;


public AbstractCompressingInputStream(InputStream delegate, int blockSize, int internalBufferSize) throws IOException {
this(delegate, blockSize, internalBufferSize, 0);
}

public AbstractCompressingInputStream(InputStream delegate, int blockSize, int internalBufferSize, int initalBufferPosition) throws IOException {
super(delegate, internalBufferSize, initalBufferPosition);
this.blockSize = blockSize;
this.uncompressedBuffer = new byte[blockSize];
this.finished = false;
}

@Override
public void close() throws IOException {
delegate.close();
getCompressionOutputStream().close();
}


@Override
protected int refill() throws IOException {
if (finished) {
return 0;
}
writeBufferPosition = 0;

// Read a block of data from the delegate input stream.
int bytesRead = ByteStreams.read(delegate, uncompressedBuffer, BUFFER_START, blockSize);
if (bytesRead == 0) {
// The delegate input stream has been exhausted, so we notify
// the compressing stream that there are no remaining bytes.
finishOutputStream();
getCompressionOutputStream().flush();

finished = true;
} else {
// Write the bytes to the compressing stream and flush it.
getCompressionOutputStream().write(uncompressedBuffer, BUFFER_START, bytesRead);
getCompressionOutputStream().flush();
}

return writeBufferPosition;
}

private void write(int b) throws IOException {
ensureCapacity(writeBufferPosition + 1);
buffer[writeBufferPosition] = (byte) b;
writeBufferPosition++;
}

private void write(byte b[], int off, int len) throws IOException {
Preconditions.checkNotNull(b, "Provided byte array b cannot be null.");
if ((off < 0) || (len < 0) || (off + len > b.length)) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return;
}
ensureCapacity(writeBufferPosition + len);
System.arraycopy(b, off, buffer, writeBufferPosition, len);
writeBufferPosition += len;
}

// Since the internal buffer size doesn't change, we throw if
// someone tries to write past the end of the buffer.
private void ensureCapacity(int size) {
String message = "Internal buffer overflow. Buffer " + (size - buffer.length) + "bytes shorter";
Preconditions.checkState(buffer.length >= size, message );
}



/**
* Internal {@link OutputStream} that wraps the pre-existing buffer. This
* is an inner class since LZ4CompressingInputStream cannot extend both
* {@link BufferedDelegateInputStream} and {@link OutputStream}.
*/
protected final class InternalByteArrayOutputStream extends OutputStream {

public InternalByteArrayOutputStream() {
super();
}

@Override
public void write(int b) throws IOException {
AbstractCompressingInputStream.this.write(b);
}

@Override
public void write(byte b[], int off, int len) throws IOException {
AbstractCompressingInputStream.this.write(b, off, len);
}
}

protected abstract OutputStream getCompressionOutputStream();

protected abstract void finishOutputStream() throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ public BufferedDelegateInputStream(InputStream delegate, int bufferLength) {
this.buffer = new byte[bufferLength];
}

public BufferedDelegateInputStream(InputStream delegate, int bufferLength, int initialBufferSize) {
Preconditions.checkArgument(bufferLength >= 0, "buffer size must be greater than or equal to zero");
this.delegate = delegate;
this.position = 0;
this.bufferSize = initialBufferSize;
this.buffer = new byte[bufferLength];
}

@Override
public final int read() throws IOException {
if (!ensureBytesAvailable()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* (c) Copyright 2019 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.common.compression;

import java.util.zip.GZIPInputStream;

import net.jpountz.lz4.LZ4BlockInputStream;

public enum EnumClientCompressor {
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
GZIP(GzipCompressingInputStream.class.getName(), GZIPInputStream.class.getName()),
LZ4(LZ4CompressingInputStream.class.getName(), LZ4BlockInputStream.class.getName()),
NONE(null, null);

public final String compressionType;
public final String inputClass;

EnumClientCompressor(String compressionType, String inputClass) {
this.compressionType = compressionType;
this.inputClass = inputClass;
}
}
Loading