diff --git a/hbase-assembly/pom.xml b/hbase-assembly/pom.xml
index f5d8a786c953..4f5c45b0176c 100644
--- a/hbase-assembly/pom.xml
+++ b/hbase-assembly/pom.xml
@@ -314,6 +314,10 @@
org.apache.hbase
hbase-compression-aircompressor
+
+ org.apache.hbase
+ hbase-compression-brotli
+
org.apache.hbase
hbase-compression-lz4
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
index 8bff2944cccd..e60b9ce3b7d1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/Compression.java
@@ -45,7 +45,6 @@
public final class Compression {
private static final Logger LOG = LoggerFactory.getLogger(Compression.class);
-
// LZO
public static final String LZO_CODEC_CLASS_KEY =
@@ -97,6 +96,13 @@ public final class Compression {
public static final String LZMA_CODEC_CLASS_DEFAULT =
"org.apache.hadoop.hbase.io.compress.xz.LzmaCodec";
+ // Brotli
+
+ public static final String BROTLI_CODEC_CLASS_KEY =
+ "hbase.io.compress.brotli.codec";
+ public static final String BROTLI_CODEC_CLASS_DEFAULT =
+ "org.apache.hadoop.hbase.io.compress.brotli.BrotliCodec";
+
/**
* Prevent the instantiation of class.
*/
@@ -148,6 +154,7 @@ private static ClassLoader getClassLoaderForCodec() {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="SE_TRANSIENT_FIELD_NOT_RESTORED",
justification="We are not serializing so doesn't apply (not sure why transient though)")
+ @SuppressWarnings("ImmutableEnumChecker")
@InterfaceAudience.Public
public static enum Algorithm {
// LZO is GPL and requires extra install to setup. See
@@ -352,6 +359,31 @@ public CompressionCodec reload(Configuration conf) {
return lzmaCodec;
}
}
+ },
+
+ BROTLI("brotli", BROTLI_CODEC_CLASS_KEY, BROTLI_CODEC_CLASS_DEFAULT) {
+ // Use base type to avoid compile-time dependencies.
+ private volatile transient CompressionCodec brotliCodec;
+ private final transient Object lock = new Object();
+ @Override
+ CompressionCodec getCodec(Configuration conf) {
+ if (brotliCodec == null) {
+ synchronized (lock) {
+ if (brotliCodec == null) {
+ brotliCodec = buildCodec(conf, this);
+ }
+ }
+ }
+ return brotliCodec;
+ }
+ @Override
+ public CompressionCodec reload(Configuration conf) {
+ synchronized (lock) {
+ brotliCodec = buildCodec(conf, this);
+ LOG.warn("Reloaded configuration for {}", name());
+ return brotliCodec;
+ }
+ }
};
private final Configuration conf;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java
index 70b959a1172a..718cc70f1e45 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/compress/CompressionUtil.java
@@ -35,4 +35,16 @@ public static int roundInt2(int v) {
return v;
}
+ /**
+ * Most compression algorithms can be presented with pathological input that causes an
+ * expansion rather than a compression. Hadoop's compression API requires that we calculate
+ * additional buffer space required for the worst case. There is a formula developed for
+ * gzip that applies as a ballpark to all LZ variants. It should be good enough for now and
+ * has been tested as such with a range of different inputs.
+ */
+ public static int compressionOverhead(int bufferSize) {
+ // Given an input buffer of 'buffersize' bytes we presume a worst case expansion of
+ // 32 bytes (block header) and addition 1/6th of the input size.
+ return (bufferSize / 6) + 32;
+ }
}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java
index c448f58dbf55..d5fd3cfdd3d4 100644
--- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopCompressor.java
@@ -57,7 +57,7 @@ public int compress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
- LOG.trace("compress: {} bytes from outBuf", n);
+ LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java
index f5f5b83ab300..868094f32fc6 100644
--- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/HadoopDecompressor.java
@@ -51,7 +51,7 @@ public int decompress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
- LOG.trace("decompress: {} bytes from outBuf", n);
+ LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java
index c1766dc0456a..81199531ad94 100644
--- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/Lz4Codec.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -91,8 +92,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
- int compressionOverhead = (bufferSize / 6) + 32;
- return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+ return new BlockCompressorStream(out, c, bufferSize,
+ CompressionUtil.compressionOverhead(bufferSize));
}
@Override
@@ -149,10 +150,9 @@ public class HadoopLz4Decompressor extends HadoopDecompressor {
// Package private
static int getBufferSize(Configuration conf) {
- int size = conf.getInt(LZ4_BUFFER_SIZE_KEY,
+ return conf.getInt(LZ4_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
- return size > 0 ? size : 256 * 1024; // Don't change this default
}
}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java
index 3e5ab049e954..57ac8daada76 100644
--- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/LzoCodec.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -91,8 +92,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
- int compressionOverhead = (bufferSize / 6) + 32;
- return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+ return new BlockCompressorStream(out, c, bufferSize,
+ CompressionUtil.compressionOverhead(bufferSize));
}
@Override
@@ -149,10 +150,9 @@ public class HadoopLzoDecompressor extends HadoopDecompressor {
// Package private
static int getBufferSize(Configuration conf) {
- int size = conf.getInt(LZO_BUFFER_SIZE_KEY,
+ return conf.getInt(LZO_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT));
- return size > 0 ? size : 256 * 1024; // Don't change this default
}
}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java
index e325b8b625aa..3669b1d9d2a2 100644
--- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/SnappyCodec.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -91,8 +92,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
- int compressionOverhead = (bufferSize / 6) + 32;
- return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+ return new BlockCompressorStream(out, c, bufferSize,
+ CompressionUtil.compressionOverhead(bufferSize));
}
@Override
@@ -149,10 +150,9 @@ public class HadoopSnappyDecompressor extends HadoopDecompressor 0 ? size : 256 * 1024; // Don't change this default
}
}
diff --git a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java
index a25943fbb483..f653dc0f6767 100644
--- a/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java
+++ b/hbase-compression/hbase-compression-aircompressor/src/main/java/org/apache/hadoop/hbase/io/compress/aircompressor/ZstdCodec.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -52,6 +53,7 @@
public class ZstdCodec implements Configurable, CompressionCodec {
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
+ public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
private Configuration conf;
@@ -99,8 +101,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
- int compressionOverhead = (bufferSize / 6) + 32;
- return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+ return new BlockCompressorStream(out, c, bufferSize,
+ CompressionUtil.compressionOverhead(bufferSize));
}
@Override
@@ -157,10 +159,10 @@ public class HadoopZstdDecompressor extends HadoopDecompressor
// Package private
static int getBufferSize(Configuration conf) {
- int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
+ return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
- CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
- return size > 0 ? size : 256 * 1024; // Don't change this default
+ // IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
+ ZSTD_BUFFER_SIZE_DEFAULT));
}
}
diff --git a/hbase-compression/hbase-compression-brotli/pom.xml b/hbase-compression/hbase-compression-brotli/pom.xml
new file mode 100644
index 000000000000..1df99586cb5a
--- /dev/null
+++ b/hbase-compression/hbase-compression-brotli/pom.xml
@@ -0,0 +1,167 @@
+
+
+
+ 4.0.0
+
+ hbase-compression
+ org.apache.hbase
+ 2.6.0-SNAPSHOT
+ ..
+
+ hbase-compression-brotli
+ Apache HBase - Compression - Brotli
+ Compression support using Brotli4j
+
+
+
+
+ maven-surefire-plugin
+
+
+ net.revelc.code
+ warbucks-maven-plugin
+
+
+
+
+
+
+ maven-assembly-plugin
+
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+ true
+
+
+
+ net.revelc.code
+ warbucks-maven-plugin
+
+
+
+
+
+
+
+ org.apache.hbase
+ hbase-common
+
+
+ org.apache.hbase
+ hbase-logging
+ test-jar
+ test
+
+
+ org.apache.hbase
+ hbase-common
+ test-jar
+ test
+
+
+ org.apache.hbase
+ hbase-testing-util
+ test
+
+
+ org.apache.hbase
+ hbase-annotations
+ test-jar
+ test
+
+
+ org.slf4j
+ slf4j-api
+
+
+ com.github.stephenc.findbugs
+ findbugs-annotations
+ compile
+ true
+
+
+
+ com.aayushatharva.brotli4j
+ brotli4j
+ ${brotli4j.version}
+
+
+
+ org.slf4j
+ jcl-over-slf4j
+ test
+
+
+ org.slf4j
+ jul-to-slf4j
+ test
+
+
+ org.apache.logging.log4j
+ log4j-api
+ test
+
+
+ org.apache.logging.log4j
+ log4j-core
+ test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ test
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+ test
+
+
+ org.hamcrest
+ hamcrest-library
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+
+ build-with-jdk11
+
+ [1.11,)
+
+
+
+ javax.annotation
+ javax.annotation-api
+
+
+
+
+
diff --git a/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCodec.java b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCodec.java
new file mode 100644
index 000000000000..d052d6a08389
--- /dev/null
+++ b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCodec.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.brotli;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.hadoop.io.compress.BlockCompressorStream;
+import org.apache.hadoop.io.compress.BlockDecompressorStream;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Hadoop brotli codec implemented with Brotli4j
+ */
+@InterfaceAudience.Private
+public class BrotliCodec implements Configurable, CompressionCodec {
+
+ public static final String BROTLI_LEVEL_KEY = "hbase.io.compress.brotli.level";
+ // Our default is 6, based on https://blog.cloudflare.com/results-experimenting-brotli/
+ public static final int BROTLI_LEVEL_DEFAULT = 6; // [0,11] or -1
+ public static final String BROTLI_WINDOW_KEY = "hbase.io.compress.brotli.window";
+ public static final int BROTLI_WINDOW_DEFAULT = -1; // [10-24] or -1
+ public static final String BROTLI_BUFFERSIZE_KEY = "hbase.io.compress.brotli.buffersize";
+ public static final int BROTLI_BUFFERSIZE_DEFAULT = 256 * 1024;
+
+ private Configuration conf;
+
+ public BrotliCodec() {
+ conf = new Configuration();
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Compressor createCompressor() {
+ return new BrotliCompressor(getLevel(conf), getWindow(conf), getBufferSize(conf));
+ }
+
+ @Override
+ public Decompressor createDecompressor() {
+ return new BrotliDecompressor(getBufferSize(conf));
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream in) throws IOException {
+ return createInputStream(in, createDecompressor());
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream in, Decompressor d)
+ throws IOException {
+ return new BlockDecompressorStream(in, d, getBufferSize(conf));
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+ return createOutputStream(out, createCompressor());
+ }
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
+ throws IOException {
+ int bufferSize = getBufferSize(conf);
+ return new BlockCompressorStream(out, c, bufferSize,
+ CompressionUtil.compressionOverhead(bufferSize));
+ }
+
+ @Override
+ public Class extends Compressor> getCompressorType() {
+ return BrotliCompressor.class;
+ }
+
+ @Override
+ public Class extends Decompressor> getDecompressorType() {
+ return BrotliDecompressor.class;
+ }
+
+ @Override
+ public String getDefaultExtension() {
+ return ".br";
+ }
+
+ // Package private
+
+ static int getLevel(Configuration conf) {
+ return conf.getInt(BROTLI_LEVEL_KEY, BROTLI_LEVEL_DEFAULT);
+ }
+
+ static int getWindow(Configuration conf) {
+ return conf.getInt(BROTLI_WINDOW_KEY, BROTLI_WINDOW_DEFAULT);
+ }
+
+ static int getBufferSize(Configuration conf) {
+ return conf.getInt(BROTLI_BUFFERSIZE_KEY, BROTLI_BUFFERSIZE_DEFAULT);
+ }
+
+}
diff --git a/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCompressor.java b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCompressor.java
new file mode 100644
index 000000000000..c45eb0d1401a
--- /dev/null
+++ b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliCompressor.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.brotli;
+
+import com.aayushatharva.brotli4j.Brotli4jLoader;
+import com.aayushatharva.brotli4j.encoder.Encoder;
+import com.aayushatharva.brotli4j.encoder.Encoders;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.compress.CanReinit;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Hadoop compressor glue for Brotli4j
+ */
+@InterfaceAudience.Private
+public class BrotliCompressor implements CanReinit, Compressor {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(BrotliCompressor.class);
+ protected ByteBuffer inBuf, outBuf;
+ protected int bufferSize;
+ protected boolean finish, finished;
+ protected long bytesRead, bytesWritten;
+ protected Encoder.Parameters params;
+
+ static {
+ Brotli4jLoader.ensureAvailability();
+ }
+
+ BrotliCompressor(int level, int window, int bufferSize) {
+ this.bufferSize = bufferSize;
+ this.inBuf = ByteBuffer.allocate(bufferSize);
+ this.outBuf = ByteBuffer.allocate(bufferSize);
+ this.outBuf.position(bufferSize);
+ params = new Encoder.Parameters();
+ params.setQuality(level);
+ params.setWindow(window);
+ }
+
+ @Override
+ public int compress(byte[] b, int off, int len) throws IOException {
+ // If we have previously compressed our input and still have some buffered bytes
+ // remaining, provide them to the caller.
+ if (outBuf.hasRemaining()) {
+ int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+ outBuf.get(b, off, n);
+ LOG.trace("compress: read {} remaining bytes from outBuf", n);
+ return n;
+ }
+ // We don't actually begin compression until our caller calls finish().
+ if (finish) {
+ if (inBuf.position() > 0) {
+ inBuf.flip();
+ int uncompressed = inBuf.remaining();
+ // If we don't have enough capacity in our currently allocated output buffer,
+ // allocate a new one which does.
+ int needed = maxCompressedLength(uncompressed);
+ // Can we compress directly into the provided array?
+ boolean direct = false;
+ ByteBuffer writeBuf;
+ if (len <= needed) {
+ direct = true;
+ writeBuf = ByteBuffer.wrap(b, off, len);
+ } else {
+ if (outBuf.capacity() < needed) {
+ needed = CompressionUtil.roundInt2(needed);
+ LOG.trace("compress: resize outBuf {}", needed);
+ outBuf = ByteBuffer.allocate(needed);
+ } else {
+ outBuf.clear();
+ }
+ writeBuf = outBuf;
+ }
+ final int oldPos = writeBuf.position();
+ Encoders.compress(inBuf, writeBuf, params);
+ final int written = writeBuf.position() - oldPos;
+ bytesWritten += written;
+ inBuf.clear();
+ LOG.trace("compress: compressed {} -> {}", uncompressed, written);
+ finished = true;
+ if (!direct) {
+ outBuf.flip();
+ int n = Math.min(written, len);
+ outBuf.get(b, off, n);
+ LOG.trace("compress: {} bytes", n);
+ return n;
+ } else {
+ LOG.trace("compress: {} bytes direct", written);
+ return written;
+ }
+ } else {
+ finished = true;
+ }
+ }
+ LOG.trace("No output");
+ return 0;
+ }
+
+ @Override
+ public void end() {
+ LOG.trace("end");
+ }
+
+ @Override
+ public void finish() {
+ LOG.trace("finish");
+ finish = true;
+ }
+
+ @Override
+ public boolean finished() {
+ boolean b = finished && !outBuf.hasRemaining();
+ LOG.trace("finished: {}", b);
+ return b;
+ }
+
+ @Override
+ public long getBytesRead() {
+ return bytesRead;
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ @Override
+ public boolean needsInput() {
+ boolean b = !finished();
+ LOG.trace("needsInput: {}", b);
+ return b;
+ }
+
+ @Override
+ public void reinit(Configuration conf) {
+ LOG.trace("reinit");
+ if (conf != null) {
+ // Quality or window settings might have changed
+ params.setQuality(BrotliCodec.getLevel(conf));
+ params.setWindow(BrotliCodec.getWindow(conf));
+ // Buffer size might have changed
+ int newBufferSize = BrotliCodec.getBufferSize(conf);
+ if (bufferSize != newBufferSize) {
+ bufferSize = newBufferSize;
+ this.inBuf = ByteBuffer.allocateDirect(bufferSize);
+ this.outBuf = ByteBuffer.allocateDirect(bufferSize);
+ }
+ }
+ reset();
+ }
+
+ @Override
+ public void reset() {
+ LOG.trace("reset");
+ inBuf.clear();
+ outBuf.clear();
+ outBuf.position(outBuf.capacity());
+ bytesRead = 0;
+ bytesWritten = 0;
+ finish = false;
+ finished = false;
+ }
+
+ @Override
+ public void setDictionary(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException("setDictionary is not supported");
+ }
+
+ @Override
+ public void setInput(byte[] b, int off, int len) {
+ LOG.trace("setInput: off={} len={}", off, len);
+ if (inBuf.remaining() < len) {
+ // Get a new buffer that can accomodate the accumulated input plus the additional
+ // input that would cause a buffer overflow without reallocation.
+ // This condition should be fortunately rare, because it is expensive.
+ int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+ LOG.trace("setInput: resize inBuf {}", needed);
+ ByteBuffer newBuf = ByteBuffer.allocate(needed);
+ inBuf.flip();
+ newBuf.put(inBuf);
+ inBuf = newBuf;
+ }
+ inBuf.put(b, off, len);
+ bytesRead += len;
+ finished = false;
+ }
+
+ // Package private
+
+ int maxCompressedLength(int len) {
+ return len + CompressionUtil.compressionOverhead(len);
+ }
+
+}
diff --git a/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliDecompressor.java b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliDecompressor.java
new file mode 100644
index 000000000000..8f167cd39608
--- /dev/null
+++ b/hbase-compression/hbase-compression-brotli/src/main/java/org/apache/hadoop/hbase/io/compress/brotli/BrotliDecompressor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.brotli;
+
+import com.aayushatharva.brotli4j.Brotli4jLoader;
+import com.aayushatharva.brotli4j.decoder.Decoder;
+import com.aayushatharva.brotli4j.decoder.DirectDecompress;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Hadoop decompressor glue for Brotli4j
+ */
+@InterfaceAudience.Private
+public class BrotliDecompressor implements Decompressor {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(BrotliDecompressor.class);
+ protected ByteBuffer inBuf, outBuf;
+ protected int inLen;
+ protected boolean finished;
+
+ static {
+ Brotli4jLoader.ensureAvailability();
+ }
+
+ BrotliDecompressor(int bufferSize) {
+ this.inBuf = ByteBuffer.allocate(bufferSize);
+ this.outBuf = ByteBuffer.allocate(bufferSize);
+ this.outBuf.position(bufferSize);
+ }
+
+ @Override
+ public int decompress(byte[] b, int off, int len) throws IOException {
+ if (outBuf.hasRemaining()) {
+ int remaining = outBuf.remaining(), n = Math.min(remaining, len);
+ outBuf.get(b, off, n);
+ LOG.trace("decompress: read {} remaining bytes from outBuf", n);
+ return n;
+ }
+ if (inBuf.position() > 0) {
+ inBuf.flip();
+ int remaining = inBuf.remaining();
+ inLen -= remaining;
+ outBuf.rewind();
+ outBuf.limit(outBuf.capacity());
+
+ // TODO: More inefficient than it could be, but it doesn't impact decompression speed
+ // terribly and the brotli4j API alternatives do not seem to work correctly.
+ // Maybe something more clever can be done as a future improvement.
+ final byte[] inb = new byte[remaining];
+ inBuf.get(inb);
+ DirectDecompress result = Decoder.decompress(inb);
+ outBuf.put(result.getDecompressedDataByteBuf().nioBuffer());
+ final int written = outBuf.position();
+
+ inBuf.rewind();
+ inBuf.limit(inBuf.capacity());
+ LOG.trace("decompress: decompressed {} -> {}", remaining, written);
+ outBuf.flip();
+ int n = Math.min(written, len);
+ outBuf.get(b, off, n);
+ LOG.trace("decompress: {} bytes", n);
+ return n;
+ }
+ LOG.trace("decompress: No output, finished");
+ finished = true;
+ return 0;
+ }
+
+
+ @Override
+ public void end() {
+ LOG.trace("end");
+ }
+
+ @Override
+ public boolean finished() {
+ LOG.trace("finished");
+ return finished;
+ }
+
+ @Override
+ public int getRemaining() {
+ LOG.trace("getRemaining: {}", inLen);
+ return inLen;
+ }
+
+ @Override
+ public boolean needsDictionary() {
+ LOG.trace("needsDictionary");
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ LOG.trace("reset");
+ inBuf.clear();
+ inLen = 0;
+ outBuf.clear();
+ outBuf.position(outBuf.capacity());
+ finished = false;
+ }
+
+ @Override
+ public boolean needsInput() {
+ boolean b = (inBuf.position() == 0);
+ LOG.trace("needsInput: {}", b);
+ return b;
+ }
+
+ @Override
+ public void setDictionary(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException("setDictionary is not supported");
+ }
+
+ @Override
+ public void setInput(byte[] b, int off, int len) {
+ LOG.trace("setInput: off={} len={}", off, len);
+ if (inBuf.remaining() < len) {
+ // Get a new buffer that can accomodate the accumulated input plus the additional
+ // input that would cause a buffer overflow without reallocation.
+ // This condition should be fortunately rare, because it is expensive.
+ int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
+ LOG.trace("setInput: resize inBuf {}", needed);
+ ByteBuffer newBuf = ByteBuffer.allocate(needed);
+ inBuf.flip();
+ newBuf.put(inBuf);
+ inBuf = newBuf;
+ }
+ inBuf.put(b, off, len);
+ inLen += len;
+ finished = false;
+ }
+
+}
diff --git a/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestBrotliCodec.java b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestBrotliCodec.java
new file mode 100644
index 000000000000..50de8aae6077
--- /dev/null
+++ b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestBrotliCodec.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.brotli;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.compress.CompressionTestBase;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestBrotliCodec extends CompressionTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBrotliCodec.class);
+
+ @Test
+ public void testBrotliCodecSmall() throws Exception {
+ codecSmallTest(new BrotliCodec());
+ }
+
+ @Test
+ public void testBrotliCodecLarge() throws Exception {
+ codecLargeTest(new BrotliCodec(), 1.1); // poor compressability
+ codecLargeTest(new BrotliCodec(), 2);
+ codecLargeTest(new BrotliCodec(), 10); // very high compressability
+ }
+
+ @Test
+ public void testBrotliCodecVeryLarge() throws Exception {
+ codecVeryLargeTest(new BrotliCodec(), 3); // like text
+ }
+
+}
diff --git a/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestHFileCompressionBrotli.java b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestHFileCompressionBrotli.java
new file mode 100644
index 000000000000..7feb26ed1f25
--- /dev/null
+++ b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestHFileCompressionBrotli.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.brotli;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.HFileTestBase;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({IOTests.class, SmallTests.class})
+public class TestHFileCompressionBrotli extends HFileTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestHFileCompressionBrotli.class);
+
+ private static Configuration conf;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf = TEST_UTIL.getConfiguration();
+ conf.set(Compression.BROTLI_CODEC_CLASS_KEY, BrotliCodec.class.getCanonicalName());
+ Compression.Algorithm.BROTLI.reload(conf);
+ HFileTestBase.setUpBeforeClass();
+ }
+
+ @Test
+ public void test() throws Exception {
+ Path path = new Path(TEST_UTIL.getDataTestDir(),
+ HBaseTestingUtility.getRandomUUID().toString() + ".hfile");
+ doTest(conf, path, Compression.Algorithm.BROTLI);
+ }
+
+}
diff --git a/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestWALCompressionBrotli.java b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestWALCompressionBrotli.java
new file mode 100644
index 000000000000..ac25951d2d4d
--- /dev/null
+++ b/hbase-compression/hbase-compression-brotli/src/test/java/org/apache/hadoop/hbase/io/compress/brotli/TestWALCompressionBrotli.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.compress.brotli;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.wal.CompressedWALTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestWALCompressionBrotli extends CompressedWALTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestWALCompressionBrotli.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(Compression.BROTLI_CODEC_CLASS_KEY, BrotliCodec.class.getCanonicalName());
+ Compression.Algorithm.BROTLI.reload(conf);
+ conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+ conf.set(CompressionContext.WAL_VALUE_COMPRESSION_TYPE, Compression.Algorithm.BROTLI.getName());
+ TEST_UTIL.startMiniDFSCluster(3);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void test() throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
+ doTest(tableName);
+ }
+
+}
diff --git a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java
index a218954b6f2c..d6b0365d63dc 100644
--- a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java
+++ b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Codec.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -88,8 +89,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
- int compressionOverhead = (bufferSize / 6) + 32;
- return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+ return new BlockCompressorStream(out, c, bufferSize,
+ CompressionUtil.compressionOverhead(bufferSize));
}
@Override
@@ -110,10 +111,9 @@ public String getDefaultExtension() {
// Package private
static int getBufferSize(Configuration conf) {
- int size = conf.getInt(LZ4_BUFFER_SIZE_KEY,
+ return conf.getInt(LZ4_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
- return size > 0 ? size : 256 * 1024; // Don't change this default
}
}
diff --git a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java
index 71b5164f116e..61046cd20503 100644
--- a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java
+++ b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Compressor.java
@@ -58,7 +58,7 @@ public int compress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
- LOG.trace("compress: {} bytes from outBuf", n);
+ LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
diff --git a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Decompressor.java b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Decompressor.java
index efb8c846d923..5c46671ab91f 100644
--- a/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Decompressor.java
+++ b/hbase-compression/hbase-compression-lz4/src/main/java/org/apache/hadoop/hbase/io/compress/lz4/Lz4Decompressor.java
@@ -53,7 +53,7 @@ public int decompress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
- LOG.trace("decompress: {} bytes from outBuf", n);
+ LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
diff --git a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java
index e7c62c507c16..aae07b4d4ed0 100644
--- a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java
+++ b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCodec.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.yetus.audience.InterfaceAudience;
+import org.xerial.snappy.Snappy;
/**
* Hadoop Snappy codec implemented with Xerial Snappy.
@@ -88,8 +89,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
- int compressionOverhead = (bufferSize / 6) + 32;
- return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+ return new BlockCompressorStream(out, c, bufferSize,
+ Snappy.maxCompressedLength(bufferSize) - bufferSize); // overhead only
}
@Override
@@ -110,10 +111,9 @@ public String getDefaultExtension() {
// Package private
static int getBufferSize(Configuration conf) {
- int size = conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
+ return conf.getInt(SNAPPY_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT));
- return size > 0 ? size : 256 * 1024; // Don't change this default
}
}
diff --git a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java
index fd9994265086..2a43ca61dca3 100644
--- a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java
+++ b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyCompressor.java
@@ -55,7 +55,7 @@ public int compress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
- LOG.trace("compress: {} bytes from outBuf", n);
+ LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
diff --git a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyDecompressor.java b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyDecompressor.java
index e9119216168f..0bad64971d6f 100644
--- a/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyDecompressor.java
+++ b/hbase-compression/hbase-compression-snappy/src/main/java/org/apache/hadoop/hbase/io/compress/xerial/SnappyDecompressor.java
@@ -49,7 +49,7 @@ public int decompress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
- LOG.trace("decompress: {} bytes from outBuf", n);
+ LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
diff --git a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java
index 99f29a2695bd..8509aa05ddc3 100644
--- a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java
+++ b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCodec.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -88,8 +89,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
- int compressionOverhead = (bufferSize / 6) + 32;
- return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+ return new BlockCompressorStream(out, c, bufferSize,
+ CompressionUtil.compressionOverhead(bufferSize));
}
@Override
diff --git a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCompressor.java b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCompressor.java
index dd4d9990954c..7174942bc7dc 100644
--- a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCompressor.java
+++ b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaCompressor.java
@@ -68,7 +68,7 @@ public int compress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
- LOG.trace("compress: {} bytes from outBuf", n);
+ LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
@@ -236,7 +236,7 @@ public void setInput(byte[] b, int off, int len) {
// Package private
int maxCompressedLength(int len) {
- return len + 32 + (len/6);
+ return len + CompressionUtil.compressionOverhead(len);
}
}
diff --git a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaDecompressor.java b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaDecompressor.java
index be450b3be162..6c3399dfb264 100644
--- a/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaDecompressor.java
+++ b/hbase-compression/hbase-compression-xz/src/main/java/org/apache/hadoop/hbase/io/compress/xz/LzmaDecompressor.java
@@ -59,7 +59,7 @@ public int decompress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
- LOG.trace("decompress: {} bytes from outBuf", n);
+ LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
index 07b26d0c4bf0..521af5b25dd7 100644
--- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
+++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCodec.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.hbase.io.compress.zstd;
+import com.github.luben.zstd.Zstd;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -44,6 +45,7 @@ public class ZstdCodec implements Configurable, CompressionCodec {
public static final String ZSTD_LEVEL_KEY = "hbase.io.compress.zstd.level";
public static final String ZSTD_BUFFER_SIZE_KEY = "hbase.io.compress.zstd.buffersize";
+ public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;
public static final String ZSTD_DICTIONARY_KEY = "hbase.io.compress.zstd.dictionary";
private Configuration conf;
@@ -92,8 +94,8 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
- int compressionOverhead = (bufferSize / 6) + 32;
- return new BlockCompressorStream(out, c, bufferSize, compressionOverhead);
+ return new BlockCompressorStream(out, c, bufferSize,
+ (int)Zstd.compressBound(bufferSize) - bufferSize); // overhead only
}
@Override
@@ -121,10 +123,10 @@ static int getLevel(Configuration conf) {
}
static int getBufferSize(Configuration conf) {
- int size = conf.getInt(ZSTD_BUFFER_SIZE_KEY,
+ return conf.getInt(ZSTD_BUFFER_SIZE_KEY,
conf.getInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
- CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT));
- return size > 0 ? size : 256 * 1024; // Don't change this default
+ // IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT is 0! We can't allow that.
+ ZSTD_BUFFER_SIZE_DEFAULT));
}
static byte[] getDictionary(final Configuration conf) {
diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
index deaf7e1ea833..ea45414ccb9d 100644
--- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
+++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdCompressor.java
@@ -67,7 +67,7 @@ public int compress(final byte[] b, final int off, final int len) throws IOExcep
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
- LOG.trace("compress: {} bytes from outBuf", n);
+ LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
diff --git a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
index dfa37db636ae..6bfa84e1c598 100644
--- a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
+++ b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdDecompressor.java
@@ -62,7 +62,7 @@ public int decompress(final byte[] b, final int off, final int len) throws IOExc
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
- LOG.trace("decompress: {} bytes from outBuf", n);
+ LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
diff --git a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionary.java b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionary.java
index 0a17ef997d20..5a76a4531f25 100644
--- a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionary.java
+++ b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdDictionary.java
@@ -43,9 +43,9 @@ public class TestZstdDictionary extends CompressionTestBase {
HBaseClassTestRule.forClass(TestZstdDictionary.class);
private static final String DICTIONARY_PATH = DictionaryCache.RESOURCE_SCHEME + "zstd.test.dict";
- // zstd.test.data compressed with zstd.test.dict at level 3 will produce a result of
- // 358555 bytes
- private static final int EXPECTED_COMPRESSED_SIZE = 358555;
+ // zstd.test.data compressed with zstd.test.dict at level 3 with a default buffer size of 262144
+ // will produce a result of 359909 bytes
+ private static final int EXPECTED_COMPRESSED_SIZE = 359909;
private static byte[] TEST_DATA;
diff --git a/hbase-compression/pom.xml b/hbase-compression/pom.xml
index 41f3493e16cf..a53ac5a13502 100644
--- a/hbase-compression/pom.xml
+++ b/hbase-compression/pom.xml
@@ -33,6 +33,7 @@
hbase-compression-aircompressor
+ hbase-compression-brotli
hbase-compression-lz4
hbase-compression-snappy
hbase-compression-xz
diff --git a/hbase-resource-bundle/src/main/resources/supplemental-models.xml b/hbase-resource-bundle/src/main/resources/supplemental-models.xml
index 42cf49fb6734..2c6b2a1a0a25 100644
--- a/hbase-resource-bundle/src/main/resources/supplemental-models.xml
+++ b/hbase-resource-bundle/src/main/resources/supplemental-models.xml
@@ -3378,4 +3378,71 @@ Copyright (c) 2007-2017 The JRuby project
+
+
+
+
+ com.aayushatharva.brotli4j
+ brotli4j
+
+
+ Apache License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
+
+
+ com.aayushatharva.brotli4j
+ native-linux-aarch64
+
+
+ Apache License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
+
+
+ com.aayushatharva.brotli4j
+ native-linux-x86_64
+
+
+ Apache License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
+
+
+ com.aayushatharva.brotli4j
+ native-osx-x86_64
+
+
+ Apache License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
+
+
+ com.aayushatharva.brotli4j
+ native-windows-x86_64
+
+
+ Apache License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
diff --git a/pom.xml b/pom.xml
index 70f2cb8852d1..bb38df623819 100644
--- a/pom.xml
+++ b/pom.xml
@@ -633,6 +633,7 @@
2.21.0
0.21
+ 1.7.1
1.8.0
1.1.8.4
1.9
@@ -1011,6 +1012,11 @@
hbase-compression-aircompressor
${project.version}
+
+ org.apache.hbase
+ hbase-compression-brotli
+ ${project.version}
+
org.apache.hbase
hbase-compression-lz4