-
Notifications
You must be signed in to change notification settings - Fork 15
adding gzip compression support to the schema #4220
Conversation
Thanks for your interest in palantir/atlasdb, @mmigdiso! Before we can accept your pull request, you need to sign our contributor license agreement - just visit https://cla.palantir.com/ and follow the instructions. Once you sign, I'll automatically update this pull request. |
Generate changelog in
|
Gzip input may throw an exception if it can not find the gzip magic chars.
Gzip input may throw an exception if it can not find the gzip magic chars.
…asdb into gzip_stream_support
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we talked about this, we agreed that for this to go in AtlasDB, the decompressor needs to be able to decide whether it's reading LZ4 or GZIP and choose between them respectively - the straightforward migration is where the value add is. This PR currently does not do this, which means that the complexity is probably considerably higher than we'd want.
...db-commons/src/main/java/com/palantir/common/compression/AbstractCompressingInputStream.java
Outdated
Show resolved
Hide resolved
…asdb into gzip_stream_support
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For stream heavy workloads, we may want to consider pooling Deflater
/Inflater
instances as we’ve seen heavy GC Finalizer pressure. This is better in OpenJDK 11+ with https://bugs.openjdk.java.net/browse/JDK-8185582 but not all AtlasDB are running 11+ (though most that would use this feature likely are).
...-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java
Outdated
Show resolved
Hide resolved
...-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java
Outdated
Show resolved
Hide resolved
super(new GzipStreamEnumeration(in, bufferSize)); | ||
} | ||
|
||
protected static class GzipStreamEnumeration implements Enumeration<InputStream> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementing Enumeration
for 3 elements is the Java 1.0 equivalent of implementing an Iterable
for the same thing.
What you do here is create a list, turn it into an iterator, and then implement an enumeration around the iterator. Instead, just do Collections.enumeration(theListYouHad)
in order to convert the list into an enumeration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason of the Enum implementation lies in the overridden next() method. the trailer should be generated after the content stream(deflater) is exhausted to be able to properly calculate the CRC and other trailer fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You still don't need to implement enumeration for that, though. That said, using suppliers makes sense now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So explicitly, I'm proposing you write something like:
List<Supplier<InputStream>> streams = ImmutableList.of(
this::createHeaderStream,
() -> countingStream,
() -> createTrailerStream(crc, countingStream));
Enumeration<InputStream> inputStream = Collections.enumeration(Lists.transform(streams, Supplier::get));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But as the GzipCompressingInputStream is extending from the SequenceInputStream, the first call of the constructor needs to be a super(). So I cannot initialize the inputstream enum above in the GzipCompressingInputStream constructor. Hence is the enum implementation
atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java
Show resolved
Hide resolved
atlasdb-commons/src/main/java/com/palantir/common/compression/GzipCompressingInputStream.java
Show resolved
Hide resolved
byte[] decompressedData = new byte[17 * BLOCK_SIZE]; | ||
int bytesRead = ByteStreams.read(decompressingStream, decompressedData, 0, decompressedData.length); | ||
assertEquals(uncompressedData.length, bytesRead); | ||
assertArrayEquals(uncompressedData, Arrays.copyOf(decompressedData, bytesRead)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of arrayEquals, assertArrayEquals and the other JUnit asserts, please can we use AssertJ's assertThat
methods? Leads to better perf. assertThat(decompressedData).startsWith(uncompressedData)
might well be possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, but I just moved the existing LZ4CompressionTests into this abstract class to generalize the test cases and to cover the gzip implementation with the same unit tests that were covering the lz4. Therefore I didn't touch the logic of the existing test cases, bc they were my checkpoint to assure the correctness of the new compression implementation. I'm a bit concerned of modifying the unit test during a refactor.
...-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java
Outdated
Show resolved
Hide resolved
...-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java
Outdated
Show resolved
Hide resolved
...-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java
Outdated
Show resolved
Hide resolved
...-client/src/main/java/com/palantir/atlasdb/table/description/render/StreamStoreRenderer.java
Outdated
Show resolved
Hide resolved
atlasdb-commons/src/main/java/com/palantir/common/compression/EnumClientCompressor.java
Outdated
Show resolved
Hide resolved
public enum ClientCompressor { | ||
GZIP(GzipCompressingInputStream.class, GZIPInputStream.class, GzipCompressingInputStream.GZIP_HEADER), | ||
LZ4(LZ4CompressingInputStream.class, LZ4BlockInputStream.class, | ||
new byte[] {'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k'}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"LZ4Block".getBytes(StandardCharsets.UTF_8)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks nicer but different than the original implementation. Unfortunately the access modifier does not allow to use it.
https://github.com/lz4/lz4-java/blob/d43546e24388533eebd40fccb4be5468f0411788/src/java/net/jpountz/lz4/LZ4BlockOutputStream.java#L37
atlasdb-commons/src/main/java/com/palantir/common/compression/ClientCompressor.java
Outdated
Show resolved
Hide resolved
atlasdb-commons/src/main/java/com/palantir/common/compression/ClientCompressor.java
Outdated
Show resolved
Hide resolved
Comparator.comparingInt(x -> x.magic.length)).get().magic.length; | ||
buff.mark(maxLen); | ||
byte[] headerBuffer = new byte[maxLen]; | ||
int len = buff.read(headerBuffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this actually doesn't do the right thing, because buff.read can return any number of bytes, it doesn't have to actually fill the header buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be tempted to write a method like:
private static boolean startsWith(InputStream stream, byte[] maybePrefix) {
stream.mark();
try {
for (int i = 0; i < maybePrefix.length; i++) {
if (stream.read() != maybePrefix[i]) {
return false;
}
}
return true;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
stream.reset();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the code does not expect the headerBuffer to be filled. that is why we have the len parameter which is passed to matchMagic method later on.
on the other hand, as we are abstracting inputstream, read() and read(byte[]) has different performances when it comes to fileinputstream(at least in the sense of # of system calls even if we consider file system cache)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
guess I'm saying you have a correctness bug right now. Also, you're only calling the single byte read method like 8 times, so the performance difference is ~zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got what you mean. what do you think about using the ByteStreams.read(buff, headerBuffer, 0, maxLen);
and also adding a filter for the magicchars larger than the bytes read from the stream compressors.stream().filter( t -> t.magic.length <= len
atlasdb-commons/src/main/java/com/palantir/common/compression/ClientCompressor.java
Outdated
Show resolved
Hide resolved
atlasdb-commons/src/main/java/com/palantir/common/compression/ClientCompressor.java
Outdated
Show resolved
Hide resolved
GZIP(GzipCompressingInputStream.class, GZIPInputStream.class, GzipCompressingInputStream.GZIP_HEADER), | ||
LZ4(LZ4CompressingInputStream.class, LZ4BlockInputStream.class, | ||
new byte[] {'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k'}), | ||
NONE(null, null, new byte[] {}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code is quite scary, because the prefix you provide actually matches all magic byte sequences
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things that avoids this situation. (if the comment above was for the NONE (byte[]{}) part.
1- there is magic.length > 0;
in the return statement of the matchMagic method.
2- values() method returns the enum's in the order they are defined. And the NONE is defined last.
...b-commons/src/main/java/com/palantir/common/compression/CompressorForwardingInputStream.java
Outdated
Show resolved
Hide resolved
atlasdb-commons/src/test/java/com/palantir/common/compression/NotCompressedStreamTests.java
Outdated
Show resolved
Hide resolved
atlasdb-commons/src/main/java/com/palantir/common/compression/ClientCompressor.java
Show resolved
Hide resolved
...b-commons/src/main/java/com/palantir/common/compression/CompressorForwardingInputStream.java
Show resolved
Hide resolved
Comparator.comparingInt((ClientCompressor t) -> t.magic.length).reversed() | ||
).collect( | ||
Collectors.toList()); | ||
int maxLen = compressors.get(0).magic.length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int maxLen = Arrays.stream(ClientCompressor.values()).mapToInt(c -> c.magic.length).max()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can avoid all of the sorting and reversing stuff :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey @j-baker , but i need to start from the longest prefix, otherwise a shorter magic char which is a substring of another magic char would cause a bug. the purpose of this code was not only to find the maxlen.
This was merged via #4311 |
Goals (and why): Adding Gzip support to the schema. Currently atlasdb supports Lz4 algorithm and in some cases, gzip might be more useful as it is more than two times more space effective (specially in json streams)
Implementation Description (bullets): Lz4 and Gzip compressions now derives from the same abstract class.
Testing (What was existing testing like? What have you done to improve it?): The existing unit test class for lz4 has been generalized now to support Gzip compression
Concerns (what feedback would you like?):
Where should we start reviewing?:
Priority (whenever / two weeks / yesterday): two weeks