Skip to content
This repository has been archived by the owner on Nov 14, 2024. It is now read-only.

adding gzip compression support to the schema #4220

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.palantir.atlasdb.table.description.render.Renderers;
import com.palantir.atlasdb.table.description.render.StreamStoreRenderer;
import com.palantir.common.base.Throwables;
import com.palantir.common.compression.ClientCompressor;

public class StreamStoreDefinition {
// from ArrayList.MAX_ARRAY_SIZE on 64-bit systems
Expand All @@ -36,7 +37,7 @@ public class StreamStoreDefinition {
private final String shortName;
private final String longName;
private final ValueType idType;
private final boolean compressStream;
private final ClientCompressor streamCompressType;
private final int numberOfRowComponentsHashed;

private int inMemoryThreshold;
Expand All @@ -47,14 +48,14 @@ public class StreamStoreDefinition {
String longName,
ValueType idType,
int inMemoryThreshold,
boolean compressStream,
ClientCompressor streamCompressType,
int numberOfRowComponentsHashed) {
this.streamStoreTables = streamStoreTables;
this.shortName = shortName;
this.longName = longName;
this.idType = idType;
this.inMemoryThreshold = inMemoryThreshold;
this.compressStream = compressStream;
this.streamCompressType = streamCompressType;
this.numberOfRowComponentsHashed = numberOfRowComponentsHashed;
}

Expand All @@ -72,7 +73,7 @@ public int getNumberOfRowComponentsHashed() {

public StreamStoreRenderer getRenderer(String packageName, String name) {
String renderedLongName = Renderers.CamelCase(longName);
return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold, compressStream);
return new StreamStoreRenderer(renderedLongName, idType, packageName, name, inMemoryThreshold, streamCompressType);
}

public Multimap<String, Supplier<OnCleanupTask>> getCleanupTasks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.palantir.atlasdb.protos.generated.TableMetadataPersistence;
import com.palantir.atlasdb.table.description.TableDefinition;
import com.palantir.atlasdb.table.description.ValueType;
import com.palantir.common.compression.ClientCompressor;

public class StreamStoreDefinitionBuilder {
private final ValueType valueType;
Expand All @@ -32,7 +33,7 @@ public class StreamStoreDefinitionBuilder {
private Map<String, StreamTableDefinitionBuilder> streamTables =
Maps.newHashMapWithExpectedSize(StreamTableType.values().length);
private int inMemoryThreshold = AtlasDbConstants.DEFAULT_STREAM_IN_MEMORY_THRESHOLD;
private boolean compressStream;
private ClientCompressor compressStreamType;
private int numberOfRowComponentsHashed = 0;

/**
Expand All @@ -48,7 +49,7 @@ public StreamStoreDefinitionBuilder(String shortName, String longName, ValueType
this.valueType = valueType;
this.shortName = shortName;
this.longName = longName;
this.compressStream = false;
this.compressStreamType = ClientCompressor.NONE;
}

/**
Expand Down Expand Up @@ -102,7 +103,11 @@ public StreamStoreDefinitionBuilder compressBlocksInDb() {
}

public StreamStoreDefinitionBuilder compressStreamInClient() {
compressStream = true;
return compressStreamInClient(ClientCompressor.LZ4);
}

public StreamStoreDefinitionBuilder compressStreamInClient(ClientCompressor compressionType) {
compressStreamType = compressionType;
return this;
}

Expand All @@ -125,7 +130,7 @@ public StreamStoreDefinition build() {
longName,
valueType,
inMemoryThreshold,
compressStream,
compressStreamType,
numberOfRowComponentsHashed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -75,7 +76,8 @@
import com.palantir.atlasdb.transaction.api.TransactionTask;
import com.palantir.atlasdb.transaction.impl.TxTask;
import com.palantir.common.base.Throwables;
import com.palantir.common.compression.LZ4CompressingInputStream;
import com.palantir.common.compression.ClientCompressor;
import com.palantir.common.compression.CompressorForwardingInputStream;
import com.palantir.common.io.ConcatenatedInputStream;
import com.palantir.util.AssertUtils;
import com.palantir.util.ByteArrayIOStream;
Expand All @@ -84,24 +86,22 @@
import com.palantir.util.file.DeleteOnCloseFileInputStream;
import com.palantir.util.file.TempFileUtils;

import net.jpountz.lz4.LZ4BlockInputStream;

@SuppressWarnings("checkstyle:all") // too many warnings to fix
public class StreamStoreRenderer {
private final String name;
private final ValueType streamIdType;
private final String packageName;
private final String schemaName;
private final int inMemoryThreshold;
private final boolean clientSideCompression;
private final ClientCompressor clientSideCompressor;

public StreamStoreRenderer(String name, ValueType streamIdType, String packageName, String schemaName, int inMemoryThreshold, boolean clientSideCompression) {
public StreamStoreRenderer(String name, ValueType streamIdType, String packageName, String schemaName, int inMemoryThreshold, ClientCompressor clientSideCompressor) {
this.name = name;
this.streamIdType = streamIdType;
this.packageName = packageName;
this.schemaName = schemaName;
this.inMemoryThreshold = inMemoryThreshold;
this.clientSideCompression = clientSideCompression;
this.clientSideCompressor = clientSideCompressor;
}

public String getPackageName() {
Expand Down Expand Up @@ -173,7 +173,7 @@ protected void run() {
line();
getBlock();
line();
if (clientSideCompression) {
if (clientSideCompressor != ClientCompressor.NONE) {
storeBlocksAndGetFinalMetadata();
line();
loadStreamWithCompression();
Expand Down Expand Up @@ -219,6 +219,8 @@ private void fields() {
line("private static final Logger log = LoggerFactory.getLogger(", StreamStore, ".class);");
line();
line("private final ", TableFactory, " tables;");
line("private final ClientCompressor clientSideCompressor = ClientCompressor." + clientSideCompressor.name(), ";");

}

private void constructors() {
Expand Down Expand Up @@ -563,7 +565,7 @@ private void storeBlocksAndGetFinalMetadata() {
line("//Hash the data before compressing it");
line("MessageDigest digest = Sha256Hash.getMessageDigest();");
line("try (InputStream hashingStream = new DigestInputStream(stream, digest);");
line(" InputStream compressingStream = new LZ4CompressingInputStream(hashingStream)) {"); {
line(" InputStream compressingStream = clientSideCompressor.getCompressor(hashingStream)) {"); {
line("StreamMetadata metadata = storeBlocksAndGetHashlessMetadata(t, id, compressingStream);");
line("return StreamMetadata.newBuilder(metadata)");
line(" .setHash(ByteString.copyFrom(digest.digest()))");
Expand All @@ -577,15 +579,15 @@ private void storeBlocksAndGetFinalMetadata() {
private void loadStreamWithCompression() {
line("@Override");
line("public InputStream loadStream(Transaction t, final ", StreamId, " id) {"); {
line("return new LZ4BlockInputStream(super.loadStream(t, id));");
line("return new CompressorForwardingInputStream(super.loadStream(t, id));");
} line("}");
}

private void loadSingleStreamWithCompression() {
line("@Override");
line("public Optional<InputStream> loadSingleStream(Transaction t, final ", StreamId, " id) {"); {
line("Optional<InputStream> inputStream = super.loadSingleStream(t, id);");
line("return inputStream.map(LZ4BlockInputStream::new);");
line("return inputStream.map(CompressorForwardingInputStream::new);");
} line("}");
}

Expand All @@ -594,7 +596,7 @@ private void loadStreamsWithCompression() {
line("public Map<", StreamId, ", InputStream> loadStreams(Transaction t, Set<", StreamId, "> ids) {"); {
line("Map<", StreamId, ", InputStream> compressedStreams = super.loadStreams(t, ids);");
line("return Maps.transformValues(compressedStreams, stream -> {"); {
line("return new LZ4BlockInputStream(stream);");
line("return new CompressorForwardingInputStream(stream);");
} line("});");
} line("}");
}
Expand All @@ -603,7 +605,7 @@ private void tryWriteStreamToFile() {
line("@Override");
line("protected void tryWriteStreamToFile(Transaction transaction, ", StreamId, " id, StreamMetadata metadata, FileOutputStream fos) throws IOException {"); {
line("try (InputStream blockStream = makeStreamUsingTransaction(transaction, id, metadata);");
line(" InputStream decompressingStream = new LZ4BlockInputStream(blockStream);");
line(" InputStream decompressingStream = new CompressorForwardingInputStream(blockStream);");
line(" OutputStream fileStream = fos;) {"); {
line("ByteStreams.copy(decompressingStream, fileStream);");
} line("}");
Expand Down Expand Up @@ -847,8 +849,8 @@ private void cellsCleanedUp() {
List.class,
CheckForNull.class,
Generated.class,
LZ4CompressingInputStream.class,
LZ4BlockInputStream.class,
ClientCompressor.class,
CompressorForwardingInputStream.class,
Pair.class,
Functions.class,
ByteStreams.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* (c) Copyright 2019 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.common.compression;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;

import com.google.common.collect.MoreCollectors;
import com.google.common.io.ByteStreams;
import com.palantir.common.base.Throwables;

import net.jpountz.lz4.LZ4BlockInputStream;

public enum ClientCompressor {
GZIP(in -> {
try {
return new GzipCompressingInputStream(in);
} catch (Exception exc) {
throw new RuntimeException(exc);
}
}, in -> {
try {
return new GZIPInputStream(in);
} catch (Exception exc) {
throw new RuntimeException(exc);
}
}, GzipCompressingInputStream.GZIP_HEADER),
LZ4(LZ4CompressingInputStream::new, LZ4BlockInputStream::new,
new byte[] {'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k'}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"LZ4Block".getBytes(StandardCharsets.UTF_8)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks nicer but different than the original implementation. Unfortunately the access modifier does not allow to use it.
https://github.com/lz4/lz4-java/blob/d43546e24388533eebd40fccb4be5468f0411788/src/java/net/jpountz/lz4/LZ4BlockOutputStream.java#L37

NONE(null, UnaryOperator.identity(), new byte[] {});

private final UnaryOperator<InputStream> compressorCreator;
private UnaryOperator<InputStream> decompressorCreator;
public final byte[] magic;
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved

ClientCompressor(UnaryOperator<InputStream> compressorCreator, UnaryOperator<InputStream> decompressorCreator,
byte[] magic) {
this.compressorCreator = compressorCreator;
this.decompressorCreator = decompressorCreator;
this.magic = magic;
}

public InputStream getCompressor(InputStream stream) {
return compressorCreator.apply(stream);
}

private boolean matchMagic(byte[] buffer, int bufferLen) {
int i = 0;
while (i < magic.length && i < bufferLen && magic[i] == buffer[i++]);
return i >= magic.length && magic.length > 0;
}

/**
* Method that takes a compressed stream and returns a decompressor stream. It will throw {@code
* IllegalArgumentException} if more than one decompressor is detected, a {@code NoSuchElementException} if no
* compressor detected.
*/
static InputStream getDecompressorStream(InputStream stream) throws IOException {
BufferedInputStream buff = new BufferedInputStream(stream);
List<ClientCompressor> compressors = Arrays.stream(ClientCompressor.values()).sorted(
Comparator.comparingInt((ClientCompressor t) -> t.magic.length).reversed()
).collect(
Collectors.toList());
int maxLen = compressors.get(0).magic.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int maxLen = Arrays.stream(ClientCompressor.values()).mapToInt(c -> c.magic.length).max()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can avoid all of the sorting and reversing stuff :)

Copy link
Author

@mmigdiso mmigdiso Oct 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @j-baker , but i need to start from the longest prefix, otherwise a shorter magic char which is a substring of another magic char would cause a bug. the purpose of this code was not only to find the maxlen.

buff.mark(maxLen);
byte[] headerBuffer = new byte[maxLen];
int len = ByteStreams.read(buff, headerBuffer, 0, maxLen);
buff.reset();
ClientCompressor compressor = compressors.stream().filter(
t -> t.magic.length <= len && t.matchMagic(headerBuffer, len)).collect(MoreCollectors.onlyElement());
return compressor.decompressorCreator.apply(buff);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* (c) Copyright 2019 Palantir Technologies Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.common.compression;

import java.io.IOException;
import java.io.InputStream;

public class CompressorForwardingInputStream extends InputStream {
mmigdiso marked this conversation as resolved.
Show resolved Hide resolved
private InputStream compressedStream;
private InputStream delegate;

public CompressorForwardingInputStream(InputStream stream) {
compressedStream = stream;
}

@Override
public int read() throws IOException {
initializeDelegate();
return delegate.read();
}

@Override
public int read(byte b[], int off, int len) throws IOException {
initializeDelegate();
return delegate.read(b, off, len);
}

@Override
public void close() throws IOException {
if (delegate != null) {
delegate.close();
}
}

private void initializeDelegate() throws IOException {
if (delegate == null) {
delegate = ClientCompressor.getDecompressorStream(compressedStream);
}
}
}
Loading