Skip to content

Commit

Permalink
GH-34749 : [Java] Make Zstd compression level configurable (#34873)
Browse files Browse the repository at this point in the history
### Rationale for this change

Closes: #34749

### What changes are included in this PR?

Make compression level configurable for Zstd

### Are these changes tested?

Yes

### Are there any user-facing changes?

No
* Closes: #34749

Lead-authored-by: david dali susanibar arce <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
davisusanibar and wgtmac authored Jun 5, 2023
1 parent f5a5729 commit 87d0824
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,16 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) {
throw new IllegalArgumentException("Compression type not supported: " + codecType);
}
}

@Override
public CompressionCodec createCodec(CompressionUtil.CodecType codecType, int compressionLevel) {
switch (codecType) {
case LZ4_FRAME:
return new Lz4CompressionCodec();
case ZSTD:
return new ZstdCompressionCodec(compressionLevel);
default:
throw new IllegalArgumentException("Compression type not supported: " + codecType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
*/
public class ZstdCompressionCodec extends AbstractCompressionCodec {

private int compressionLevel;
private static final int DEFAULT_COMPRESSION_LEVEL = 3;

public ZstdCompressionCodec() {
this.compressionLevel = DEFAULT_COMPRESSION_LEVEL;
}

public ZstdCompressionCodec(int compressionLevel) {
this.compressionLevel = compressionLevel;
}

@Override
protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBuffer) {
long maxSize = Zstd.compressBound(uncompressedBuffer.writerIndex());
Expand All @@ -38,7 +49,7 @@ protected ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf uncompressedBu
long bytesWritten = Zstd.compressUnsafe(
compressedBuffer.memoryAddress() + CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, dstSize,
/*src*/uncompressedBuffer.memoryAddress(), /*srcSize=*/uncompressedBuffer.writerIndex(),
/*level=*/3);
/*level=*/this.compressionLevel);
if (Zstd.isError(bytesWritten)) {
compressedBuffer.close();
throw new RuntimeException("Error compressing: " + Zstd.getErrorName(bytesWritten));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void testArrowFileZstdRoundTrip() throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (final ArrowFileWriter writer =
new ArrowFileWriter(root, null, Channels.newChannel(out), new HashMap<>(),
IpcOption.DEFAULT, CommonsCompressionFactory.INSTANCE, CompressionUtil.CodecType.ZSTD)) {
IpcOption.DEFAULT, CommonsCompressionFactory.INSTANCE, CompressionUtil.CodecType.ZSTD, Optional.of(7))) {
writer.start();
writer.writeBatch();
writer.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Stream;

Expand Down Expand Up @@ -93,6 +94,9 @@ static Collection<Arguments> codecs() {

CompressionCodec zstdCodec = new ZstdCompressionCodec();
params.add(Arguments.arguments(len, zstdCodec));

CompressionCodec zstdCodecAndCompressionLevel = new ZstdCompressionCodec(7);
params.add(Arguments.arguments(len, zstdCodecAndCompressionLevel));
}
return params;
}
Expand Down Expand Up @@ -235,7 +239,7 @@ void testReadWriteStream(CompressionUtil.CodecType codec) throws Exception {
try (final ArrowStreamWriter writer = new ArrowStreamWriter(
root, new DictionaryProvider.MapDictionaryProvider(),
Channels.newChannel(compressedStream),
IpcOption.DEFAULT, factory, codec)) {
IpcOption.DEFAULT, factory, codec, Optional.of(7))) {
writer.start();
writer.writeBatch();
writer.end();
Expand All @@ -262,7 +266,7 @@ void testReadWriteFile(CompressionUtil.CodecType codec) throws Exception {
try (final ArrowFileWriter writer = new ArrowFileWriter(
root, new DictionaryProvider.MapDictionaryProvider(),
Channels.newChannel(compressedStream),
new HashMap<>(), IpcOption.DEFAULT, factory, codec)) {
new HashMap<>(), IpcOption.DEFAULT, factory, codec, Optional.of(7))) {
writer.start();
writer.writeBatch();
writer.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,10 @@ interface Factory {
* Creates the codec based on the codec type.
*/
CompressionCodec createCodec(CompressionUtil.CodecType codecType);

/**
* Creates the codec based on the codec type and compression level.
*/
CompressionCodec createCodec(CompressionUtil.CodecType codecType, int compressionLevel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,10 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType) {
throw new IllegalArgumentException("Unsupported codec type: " + codecType);
}
}

@Override
public CompressionCodec createCodec(CompressionUtil.CodecType codecType, int compressionLevel) {
return createCodec(codecType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand Down Expand Up @@ -74,7 +75,13 @@ public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, Writa
public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
Map<String, String> metaData, IpcOption option, CompressionCodec.Factory compressionFactory,
CompressionUtil.CodecType codecType) {
super(root, provider, out, option, compressionFactory, codecType);
this(root, provider, out, metaData, option, compressionFactory, codecType, Optional.empty());
}

public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
Map<String, String> metaData, IpcOption option, CompressionCodec.Factory compressionFactory,
CompressionUtil.CodecType codecType, Optional<Integer> compressionLevel) {
super(root, provider, out, option, compressionFactory, codecType, compressionLevel);
this.metaData = metaData;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Optional;

import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.compression.CompressionCodec;
Expand Down Expand Up @@ -81,7 +82,25 @@ public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, Wri
public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
IpcOption option, CompressionCodec.Factory compressionFactory,
CompressionUtil.CodecType codecType) {
super(root, provider, out, option, compressionFactory, codecType);
this(root, provider, out, option, compressionFactory, codecType, Optional.empty());
}

/**
* Construct an ArrowStreamWriter with compression enabled.
*
* @param root Existing VectorSchemaRoot with vectors to be written.
* @param provider DictionaryProvider for any vectors that are dictionary encoded.
* (Optional, can be null)
* @param option IPC write options
* @param compressionFactory Compression codec factory
* @param codecType Codec type
* @param compressionLevel Compression level
* @param out WritableByteChannel for writing.
*/
public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
IpcOption option, CompressionCodec.Factory compressionFactory,
CompressionUtil.CodecType codecType, Optional<Integer> compressionLevel) {
super(root, provider, out, option, compressionFactory, codecType, compressionLevel);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import org.apache.arrow.util.AutoCloseables;
Expand Down Expand Up @@ -72,7 +73,8 @@ protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, Writab
}

protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option) {
this(root, provider, out, option, NoCompressionCodec.Factory.INSTANCE, CompressionUtil.CodecType.NO_COMPRESSION);
this(root, provider, out, option, NoCompressionCodec.Factory.INSTANCE, CompressionUtil.CodecType.NO_COMPRESSION,
Optional.empty());
}

/**
Expand All @@ -84,11 +86,17 @@ protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, Writab
* @param option IPC write options
* @param compressionFactory Compression codec factory
* @param codecType Compression codec
* @param compressionLevel Compression level
*/
protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option,
CompressionCodec.Factory compressionFactory, CompressionUtil.CodecType codecType) {
CompressionCodec.Factory compressionFactory, CompressionUtil.CodecType codecType,
Optional<Integer> compressionLevel) {
this.unloader = new VectorUnloader(
root, /*includeNullCount*/ true, compressionFactory.createCodec(codecType), /*alignBuffers*/ true);
root, /*includeNullCount*/ true,
compressionLevel.isPresent() ?
compressionFactory.createCodec(codecType, compressionLevel.get()) :
compressionFactory.createCodec(codecType),
/*alignBuffers*/ true);
this.out = new WriteChannel(out);
this.option = option;

Expand Down

0 comments on commit 87d0824

Please sign in to comment.