Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26959 Brotli compression support #4353

Merged
merged 9 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions hbase-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-compression-aircompressor</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-compression-brotli</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-compression-lz4</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
public final class Compression {
private static final Logger LOG = LoggerFactory.getLogger(Compression.class);


// LZO

public static final String LZO_CODEC_CLASS_KEY =
Expand Down Expand Up @@ -97,6 +96,13 @@ public final class Compression {
public static final String LZMA_CODEC_CLASS_DEFAULT =
"org.apache.hadoop.hbase.io.compress.xz.LzmaCodec";

// Brotli

public static final String BROTLI_CODEC_CLASS_KEY =
"hbase.io.compress.brotli.codec";
public static final String BROTLI_CODEC_CLASS_DEFAULT =
"org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec";

/**
* Prevent the instantiation of class.
*/
Expand Down Expand Up @@ -148,6 +154,7 @@ private static ClassLoader getClassLoaderForCodec() {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="SE_TRANSIENT_FIELD_NOT_RESTORED",
justification="We are not serializing so doesn't apply (not sure why transient though)")
@SuppressWarnings("ImmutableEnumChecker")
@InterfaceAudience.Public
public static enum Algorithm {
// LZO is GPL and requires extra install to setup. See
Expand Down Expand Up @@ -352,6 +359,31 @@ public CompressionCodec reload(Configuration conf) {
return lzmaCodec;
}
}
},

BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) {
apurtell marked this conversation as resolved.
Show resolved Hide resolved
// Use base type to avoid compile-time dependencies.
private volatile transient CompressionCodec brotliCodec;
private final transient Object lock = new Object();
@Override
CompressionCodec getCodec(Configuration conf) {
if (brotliCodec == null) {
synchronized (lock) {
if (brotliCodec == null) {
brotliCodec = buildCodec(conf, this);
}
}
}
return brotliCodec;
}
@Override
public CompressionCodec reload(Configuration conf) {
synchronized (lock) {
brotliCodec = buildCodec(conf, this);
LOG.warn("Reloaded configuration for {}", name());
return brotliCodec;
}
}
};

private final Configuration conf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,16 @@ public static int roundInt2(int v) {
return v;
}

/**
* Most compression algorithms can be presented with pathological input that causes an
* expansion rather than a compression. Hadoop's compression API requires that we calculate
* additional buffer space required for the worst case. There is a formula developed for
* gzip that applies as a ballpark to all LZ variants. It should be good enough for now and
* has been tested as such with a range of different inputs.
*/
public static int compressionOverhead(int bufferSize) {
// Given an input buffer of 'buffersize' bytes we presume a worst case expansion of
// 32 bytes (block header) and addition 1/6th of the input size.
return (bufferSize / 6) + 32;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public int compress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("compress: {} bytes from outBuf", n);
LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public int decompress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("decompress: {} bytes from outBuf", n);
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand Down Expand Up @@ -89,8 +90,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
int compressionOverhead = (bufferSize / 6) + 32;
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}

@Override
Expand Down Expand Up @@ -147,10 +148,9 @@ public class HadoopLz4Decompressor extends HadoopDecompressor<Lz4Decompressor> {
// Package private

static int getBufferSize(Configuration conf) {
int size = conf.getInt(LZ4_BUFFER_SIZE_KEY,
return conf.getInt(LZ4_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
return size > 0 ? size : 256 * 1024; // Don't change this default
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand Down Expand Up @@ -89,8 +90,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
int compressionOverhead = (bufferSize / 6) + 32;
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}

@Override
Expand Down Expand Up @@ -147,10 +148,9 @@ public class HadoopLzoDecompressor extends HadoopDecompressor<LzoDecompressor> {
// Package private

static int getBufferSize(Configuration conf) {
int size = conf.getInt(LZO_BUFFER_SIZE_KEY,
return conf.getInt(LZO_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT));
return size > 0 ? size : 256 * 1024; // Don't change this default
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand Down Expand Up @@ -89,8 +90,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
int compressionOverhead = (bufferSize / 6) + 32;
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}

@Override
Expand Down Expand Up @@ -147,10 +148,9 @@ public class HadoopSnappyDecompressor extends HadoopDecompressor<SnappyDecompres
// Package private

static int getBufferSize(Configuration conf) {
int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
return conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
return size > 0 ? size : 256 * 1024; // Don't change this default
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand Down Expand Up @@ -50,6 +51,7 @@
public class ZstdCodec implements Configurable, CompressionCodec {

public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;

private Configuration conf;

Expand Down Expand Up @@ -97,8 +99,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
int compressionOverhead = (bufferSize / 6) + 32;
return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}

@Override
Expand Down Expand Up @@ -155,10 +157,10 @@ public class HadoopZstdDecompressor extends HadoopDecompressor<ZstdDecompressor>
// Package private

static int getBufferSize(Configuration conf) {
int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
return size > 0 ? size : 256 * 1024; // Don't change this default
// IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
ZSTD_BUFFER_SIZE_DEFAULT));
}

}
Loading