Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27227 Long running heavily filtered scans hold up too many ByteBuffAllocator buffers #4940

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void initialize(InputSplit split, TaskAttemptContext context)

// The file info must be loaded before the scanner can be used.
// This seems like a bug in HBase, but it's easily worked around.
this.scanner = in.getScanner(conf, false, false);
this.scanner = in.getScanner(conf, false, false, false);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ public void testWritingPEData() throws Exception {
LocatedFileStatus keyFileStatus = iterator.next();
HFile.Reader reader =
HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
HFileScanner scanner = reader.getScanner(conf, false, false, false);
HFileScanner scanner = reader.getScanner(conf, false, false, false, false);

kvCount += reader.getEntries();
scanner.seekTo();
Expand Down Expand Up @@ -516,7 +516,7 @@ public void test_WritingTagData() throws Exception {
LocatedFileStatus keyFileStatus = iterator.next();
HFile.Reader reader =
HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
HFileScanner scanner = reader.getScanner(conf, false, false, false);
HFileScanner scanner = reader.getScanner(conf, false, false, false, false);
scanner.seekTo();
Cell cell = scanner.getCell();
List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ private static void validateTable(Configuration conf, TableName tableName, Strin
private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
Configuration conf = util.getConfiguration();
HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
HFileScanner scanner = reader.getScanner(conf, false, false);
HFileScanner scanner = reader.getScanner(conf, false, false, false);
scanner.seekTo();
int count = 0;
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ private static void validateHFiles(FileSystem fs, String outputPath, String fami
private static int getKVCountFromHfile(FileSystem fs, Path p) throws IOException {
Configuration conf = util.getConfiguration();
HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);
HFileScanner scanner = reader.getScanner(conf, false, false);
HFileScanner scanner = reader.getScanner(conf, false, false, false);
scanner.seekTo();
int count = 0;
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ protected boolean isTop() {

@Override
public HFileScanner getScanner(final boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction);
final boolean isCompaction, boolean checkpointingEnabled) {
final HFileScanner s = super.getScanner(cacheBlocks, pread, isCompaction, checkpointingEnabled);
return new HFileScanner() {
final HFileScanner delegate = s;
public boolean atEnd = false;
Expand Down Expand Up @@ -277,6 +277,16 @@ public void close() {
public void shipped() throws IOException {
this.delegate.shipped();
}

@Override
public void checkpoint(State state) {
this.delegate.checkpoint(state);
}

@Override
public void retainBlock() {
this.delegate.retainBlock();
}
};
}

Expand Down Expand Up @@ -315,7 +325,7 @@ public Optional<Cell> midKey() throws IOException {
@Override
public Optional<Cell> getFirstKey() {
if (!firstKeySeeked) {
HFileScanner scanner = getScanner(true, true, false);
HFileScanner scanner = getScanner(true, true, false, false);
try {
if (scanner.seekTo()) {
this.firstKey = Optional.ofNullable(scanner.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ public interface Reader extends Closeable, CachingBlockReader {
CellComparator getComparator();

HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread,
boolean isCompaction);
boolean isCompaction, boolean checkpointingEnabled);

HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws IOException;

Expand Down Expand Up @@ -420,7 +420,8 @@ HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread,

HFileBlockIndex.ByteArrayKeyBlockIndexReader getMetaBlockIndexReader();

HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread);
HFileScanner getScanner(Configuration conf, boolean cacheBlocks, boolean pread,
boolean checkpointingEnabled);

/**
* Retrieves general Bloom filter metadata as appropriate for each {@link HFile} version. Knows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2070,7 +2070,8 @@ private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
return createBuilder(blk, newBuf).build();
}

static HFileBlock deepCloneOnHeap(HFileBlock blk) {
// Publicly visible for access in tests
public static HFileBlock deepCloneOnHeap(HFileBlock blk) {
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
return createBuilder(blk, deepCloned).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public int processFile(Path file, boolean checkRootDir) throws IOException {

if (verbose || printKey || checkRow || checkFamily || printStats || checkMobIntegrity) {
// scan over file and read key/value's and check if requested
HFileScanner scanner = reader.getScanner(getConf(), false, false, false);
HFileScanner scanner = reader.getScanner(getConf(), false, false, false, false);
fileStats = new KeyValueStatsCollector();
boolean shouldScanKeysValues;
if (this.isSeekToRow && !Bytes.equals(row, reader.getFirstRowKey().orElse(null))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Shipper;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -311,6 +312,7 @@ protected static class HFileScannerImpl implements HFileScanner {
protected final boolean cacheBlocks;
protected final boolean pread;
protected final boolean isCompaction;
private final boolean checkpointingEnabled;
private int currKeyLen;
private int currValueLen;
private int currMemstoreTSLen;
Expand All @@ -336,38 +338,68 @@ protected static class HFileScannerImpl implements HFileScanner {
// RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the
// unreferenced block please.
protected HFileBlock curBlock;
// Previous blocks that were used in the course of the read

// Updated to the current prevBlocks size when checkpoint is called. Used to eagerly release
// any blocks accumulated in the fetching of a row, if that row is thrown away due to filterRow.
private int lastCheckpointIndex = 0;

// Updated by retainBlock(), when a cell is included from the current block. Is reset whenever
// curBlock gets updated. Only honored when lastCheckpointIndex >= 0, meaning a checkpoint
// has occurred.
private boolean shouldRetainBlock = false;

// Previous blocks that were used in the course of the read, to be released at close,
// checkpoint, or shipped
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();

public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
final boolean pread, final boolean isCompaction, boolean checkpointingEnabled) {
this.reader = reader;
this.cacheBlocks = cacheBlocks;
this.pread = pread;
this.isCompaction = isCompaction;
this.checkpointingEnabled = checkpointingEnabled;
}

void updateCurrBlockRef(HFileBlock block) {
if (block != null && curBlock != null && block.getOffset() == curBlock.getOffset()) {
return;
}
if (this.curBlock != null && this.curBlock.isSharedMem()) {
prevBlocks.add(this.curBlock);
}
handlePrevBlock();
this.curBlock = block;
}

void reset() {
handlePrevBlock();
this.curBlock = null;
}

/**
* Add curBlock to prevBlocks or release it immediately, depending on whether a checkpoint has
* occurred and we've been instructed to retain the block. If no checkpoint has occurred, we use
* original logic to always add to prevBlocks. If checkpoint occurred, release the block unless
* {@link #retainBlock()} has been called.
*/
private void handlePrevBlock() {
// We don't have to keep ref to heap block
if (this.curBlock != null && this.curBlock.isSharedMem()) {
this.prevBlocks.add(this.curBlock);
if (checkpointingEnabled && !shouldRetainBlock) {
this.curBlock.release();
} else {
prevBlocks.add(this.curBlock);
}
}
this.curBlock = null;
shouldRetainBlock = false;
}

private void returnBlocks(boolean returnAll) {
this.prevBlocks.forEach(HFileBlock::release);
this.prevBlocks.forEach((block) -> {
if (block != null) {
block.release();
}
});
this.prevBlocks.clear();
this.lastCheckpointIndex = 0;
if (returnAll && this.curBlock != null) {
this.curBlock.release();
this.curBlock = null;
Expand Down Expand Up @@ -1047,6 +1079,39 @@ public int compareKey(CellComparator comparator, Cell key) {
public void shipped() throws IOException {
this.returnBlocks(false);
}

/**
* Sets the last checkpoint index to the current prevBlocks size. If called with State.FILTERED,
* releases and nulls out any prevBlocks entries which were added since the last checkpoint.
* Nulls out instead of removing to avoid unnecessary resizing of the list.
*/
@Override
public void checkpoint(State state) {
if (!checkpointingEnabled) {
return;
}

if (state == State.FILTERED) {
assert lastCheckpointIndex >= 0;
for (int i = lastCheckpointIndex; i < prevBlocks.size(); i++) {
prevBlocks.get(i).release();
prevBlocks.set(i, null);
}
}
lastCheckpointIndex = prevBlocks.size();
}

/**
* Sets state so that when curBlock is finished, it gets added onto prevBlocks. Otherwise, we
* eagerly release the block when checkpointing is enabled.
*/
@Override
public void retainBlock() {
if (!checkpointingEnabled) {
return;
}
shouldRetainBlock = true;
}
}

@Override
Expand Down Expand Up @@ -1459,8 +1524,8 @@ protected static class EncodedScanner extends HFileScannerImpl {
private final DataBlockEncoder dataBlockEncoder;

public EncodedScanner(HFile.Reader reader, boolean cacheBlocks, boolean pread,
boolean isCompaction, HFileContext meta, Configuration conf) {
super(reader, cacheBlocks, pread, isCompaction);
boolean isCompaction, boolean checkpointingEnabled, Configuration conf, HFileContext meta) {
super(reader, cacheBlocks, pread, isCompaction, checkpointingEnabled);
DataBlockEncoding encoding = reader.getDataBlockEncoding();
dataBlockEncoder = encoding.getEncoder();
decodingCtx = dataBlockEncoder.newDataBlockDecodingContext(conf, meta);
Expand Down Expand Up @@ -1650,36 +1715,46 @@ public boolean prefetchComplete() {
* {@link HFileScanner#seekTo(Cell)} to position an start the read. There is nothing to clean up
* in a Scanner. Letting go of your references to the scanner is sufficient. NOTE: Do not use this
* overload of getScanner for compactions. See
* {@link #getScanner(Configuration, boolean, boolean, boolean)}
* @param conf Store configuration.
* @param cacheBlocks True if we should cache blocks read in by this scanner.
* @param pread Use positional read rather than seek+read if true (pread is better for
* random reads, seek+read is better scanning).
* {@link HFile.Reader#getScanner(Configuration, boolean, boolean, boolean, boolean)}
* @param conf Store configuration.
* @param cacheBlocks True if we should cache blocks read in by this scanner.
* @param pread Use positional read rather than seek+read if true (pread is better
* for random reads, seek+read is better scanning).
* @param checkpointingEnabled if true, blocks will only be retained as they are iterated if
* {@link Shipper#retainBlock()} is called. Further,
* {@link Shipper#checkpoint(Shipper.State)} is enabled so blocks can
* be released early at safe checkpoints.
* @return Scanner on this file.
*/
@Override
public HFileScanner getScanner(Configuration conf, boolean cacheBlocks, final boolean pread) {
return getScanner(conf, cacheBlocks, pread, false);
public HFileScanner getScanner(Configuration conf, boolean cacheBlocks, final boolean pread,
boolean checkpointingEnabled) {
return getScanner(conf, cacheBlocks, pread, false, checkpointingEnabled);
}

/**
* Create a Scanner on this file. No seeks or reads are done on creation. Call
* {@link HFileScanner#seekTo(Cell)} to position an start the read. There is nothing to clean up
* in a Scanner. Letting go of your references to the scanner is sufficient.
* @param conf Store configuration.
* @param cacheBlocks True if we should cache blocks read in by this scanner.
* @param pread Use positional read rather than seek+read if true (pread is better for
* random reads, seek+read is better scanning).
* @param isCompaction is scanner being used for a compaction?
* @param conf Store configuration.
* @param cacheBlocks True if we should cache blocks read in by this scanner.
* @param pread Use positional read rather than seek+read if true (pread is better
* for random reads, seek+read is better scanning).
* @param isCompaction is scanner being used for a compaction?
* @param checkpointingEnabled if true, blocks will only be retained as they are iterated if
* {@link Shipper#retainBlock()} is called. Further,
* {@link Shipper#checkpoint(Shipper.State)} is enabled so blocks can
* be released early at safe checkpoints.
* @return Scanner on this file.
*/
@Override
public HFileScanner getScanner(Configuration conf, boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
final boolean isCompaction, boolean checkpointingEnabled) {
if (dataBlockEncoder.useEncodedScanner()) {
return new EncodedScanner(this, cacheBlocks, pread, isCompaction, this.hfileContext, conf);
return new EncodedScanner(this, cacheBlocks, pread, isCompaction, checkpointingEnabled, conf,
this.hfileContext);
}
return new HFileScannerImpl(this, cacheBlocks, pread, isCompaction);
return new HFileScannerImpl(this, cacheBlocks, pread, isCompaction, checkpointingEnabled);
}

public int getMajorVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private void processFile(Path file) throws IOException {
out.println("Scanning -> " + file);
FileSystem fs = file.getFileSystem(conf);
try (HFile.Reader reader = HFile.createReader(fs, file, CacheConfig.DISABLED, true, conf);
HFileScanner scanner = reader.getScanner(conf, false, false, false)) {
HFileScanner scanner = reader.getScanner(conf, false, false, false, false)) {
if (procId != null) {
if (
scanner.seekTo(PrivateCellUtil.createFirstOnRow(Bytes.toBytes(procId.longValue()))) != -1
Expand Down
Loading