Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27672 Read RPC threads may BLOCKED at the Configuration.get whe… #5075

Merged
merged 1 commit into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ public class Lz4Codec implements Configurable, CompressionCodec {
public static final String LZ4_BUFFER_SIZE_KEY = "hbase.io.compress.lz4.buffersize";

private Configuration conf;
private int bufferSize;

public Lz4Codec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -59,6 +61,7 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -79,7 +82,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -90,7 +93,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ public class LzoCodec implements Configurable, CompressionCodec {
public static final String LZO_BUFFER_SIZE_KEY = "hbase.io.compress.lzo.buffersize";

private Configuration conf;
private int bufferSize;

public LzoCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -59,6 +61,7 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -79,7 +82,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -90,7 +93,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ public class SnappyCodec implements Configurable, CompressionCodec {
public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";

private Configuration conf;
private int bufferSize;

public SnappyCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -59,6 +61,7 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -79,7 +82,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -90,7 +93,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,17 @@ public class ZstdCodec implements Configurable, CompressionCodec {
public static final int ZSTD_BUFFER_SIZE_DEFAULT = 256 * 1024;

private Configuration conf;
private int bufferSize;

public ZstdCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -87,7 +90,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -98,7 +101,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ public class BrotliCodec implements Configurable, CompressionCodec {
public static final int BROTLI_BUFFERSIZE_DEFAULT = 256 * 1024;

private Configuration conf;
private int bufferSize;
private int level;
private int window;

public BrotliCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
level = getLevel(conf);
window = getWindow(conf);
}

@Override
Expand All @@ -60,16 +66,19 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
this.level = getLevel(conf);
this.window = getWindow(conf);
}

@Override
public Compressor createCompressor() {
return new BrotliCompressor(getLevel(conf), getWindow(conf), getBufferSize(conf));
return new BrotliCompressor(level, window, bufferSize);
}

@Override
public Decompressor createDecompressor() {
return new BrotliDecompressor(getBufferSize(conf));
return new BrotliDecompressor(bufferSize);
}

@Override
Expand All @@ -80,7 +89,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -91,7 +100,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ public class Lz4Codec implements Configurable, CompressionCodec {
public static final String LZ4_BUFFER_SIZE_KEY = "hbase.io.compress.lz4.buffersize";

private Configuration conf;
private int bufferSize;

public Lz4Codec() {
conf = new Configuration();
this.bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -57,16 +59,17 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
public Compressor createCompressor() {
return new Lz4Compressor(getBufferSize(conf));
return new Lz4Compressor(bufferSize);
}

@Override
public Decompressor createDecompressor() {
return new Lz4Decompressor(getBufferSize(conf));
return new Lz4Decompressor(bufferSize);
}

@Override
Expand All @@ -77,7 +80,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -88,7 +91,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ public class SnappyCodec implements Configurable, CompressionCodec {
public static final String SNAPPY_BUFFER_SIZE_KEY = "hbase.io.compress.snappy.buffersize";

private Configuration conf;
private int bufferSize;

public SnappyCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
}

@Override
Expand All @@ -57,16 +59,17 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
}

@Override
public Compressor createCompressor() {
return new SnappyCompressor(getBufferSize(conf));
return new SnappyCompressor(bufferSize);
}

@Override
public Decompressor createDecompressor() {
return new SnappyDecompressor(getBufferSize(conf));
return new SnappyDecompressor(bufferSize);
}

@Override
Expand All @@ -77,7 +80,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -88,7 +91,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
Snappy.maxCompressedLength(bufferSize) - bufferSize); // overhead only
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ public class LzmaCodec implements Configurable, CompressionCodec {
public static final int LZMA_BUFFERSIZE_DEFAULT = 256 * 1024;

private Configuration conf;
private int bufferSize;
private int level;

public LzmaCodec() {
conf = new Configuration();
bufferSize = getBufferSize(conf);
level = getLevel(conf);
}

@Override
Expand All @@ -57,16 +61,18 @@ public Configuration getConf() {
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.bufferSize = getBufferSize(conf);
this.level = getLevel(conf);
}

@Override
public Compressor createCompressor() {
return new LzmaCompressor(getLevel(conf), getBufferSize(conf));
return new LzmaCompressor(level, bufferSize);
}

@Override
public Decompressor createDecompressor() {
return new LzmaDecompressor(getBufferSize(conf));
return new LzmaDecompressor(bufferSize);
}

@Override
Expand All @@ -77,7 +83,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti
@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor d)
throws IOException {
return new BlockDecompressorStream(in, d, getBufferSize(conf));
return new BlockDecompressorStream(in, d, bufferSize);
}

@Override
Expand All @@ -88,7 +94,6 @@ public CompressionOutputStream createOutputStream(OutputStream out) throws IOExc
@Override
public CompressionOutputStream createOutputStream(OutputStream out, Compressor c)
throws IOException {
int bufferSize = getBufferSize(conf);
return new BlockCompressorStream(out, c, bufferSize,
CompressionUtil.compressionOverhead(bufferSize));
}
Expand Down
Loading