Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27227 Long running heavily filtered scans hold up too many ByteBuffAllocator buffers #4940

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,16 @@ public void close() {
public void shipped() throws IOException {
this.delegate.shipped();
}

@Override
public void checkpoint(State state) {
this.delegate.checkpoint(state);
}

@Override
public void retainBlock() {
this.delegate.retainBlock();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2070,7 +2070,8 @@ private static HFileBlock shallowClone(HFileBlock blk, ByteBuff newBuf) {
return createBuilder(blk, newBuf).build();
}

static HFileBlock deepCloneOnHeap(HFileBlock blk) {
// Publicly visible for access in tests
public static HFileBlock deepCloneOnHeap(HFileBlock blk) {
ByteBuff deepCloned = ByteBuff.wrap(ByteBuffer.wrap(blk.buf.toBytes(0, blk.buf.limit())));
return createBuilder(blk, deepCloned).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,18 @@ protected static class HFileScannerImpl implements HFileScanner {
// RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the
// unreferenced block please.
protected HFileBlock curBlock;
// Previous blocks that were used in the course of the read

// Updated to the current prevBlocks size when checkpoint is called. Used to eagerly release
// any blocks accumulated in the fetching of a row, if that row is thrown away due to filterRow.
private int lastCheckpointIndex = -1;

// Updated by retainBlock, when a cell is included from the current block. Is reset whenever
// curBlock gets updated. Only honored when lastCheckpointIndex >= 0, meaning a checkpoint
// has occurred.
private boolean shouldRetainBlock = false;

// Previous blocks that were used in the course of the read, to be released at close,
// checkpoint, or shipped
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();

public HFileScannerImpl(final HFile.Reader reader, final boolean cacheBlocks,
Expand All @@ -351,23 +362,43 @@ void updateCurrBlockRef(HFileBlock block) {
if (block != null && curBlock != null && block.getOffset() == curBlock.getOffset()) {
return;
}
if (this.curBlock != null && this.curBlock.isSharedMem()) {
prevBlocks.add(this.curBlock);
}
handlePrevBlock();
this.curBlock = block;
}

void reset() {
handlePrevBlock();
this.curBlock = null;
}

/**
* Add curBlock to prevBlocks or release it immediately, depending on whether a checkpoint has
* occurred and we've been instructed to retain the block. If no checkpoint has occurred, we use
* original logic to always add to prevBlocks. If checkpoint occurred, release the block unless
* {@link #retainBlock()} has been called.
*/
private void handlePrevBlock() {
// We don't have to keep ref to heap block
if (this.curBlock != null && this.curBlock.isSharedMem()) {
this.prevBlocks.add(this.curBlock);
if (shouldRetainBlock || lastCheckpointIndex < 0) {
prevBlocks.add(this.curBlock);
} else {
this.curBlock.release();
}
}
this.curBlock = null;
shouldRetainBlock = false;
}

private void returnBlocks(boolean returnAll) {
this.prevBlocks.forEach(HFileBlock::release);
this.prevBlocks.forEach((block) -> {
if (block != null) {
block.release();
}
});
this.prevBlocks.clear();
if (lastCheckpointIndex > 0) {
this.lastCheckpointIndex = 0;
}
if (returnAll && this.curBlock != null) {
this.curBlock.release();
this.curBlock = null;
Expand Down Expand Up @@ -1047,6 +1078,28 @@ public int compareKey(CellComparator comparator, Cell key) {
public void shipped() throws IOException {
this.returnBlocks(false);
}

/**
* Sets the last checkpoint index to the current prevBlocks size. If called with State.FILTERED,
* releases and nulls out any prevBlocks entries which were added since the last checkpoint.
* Nulls out instead of removing to avoid unnecessary resizing of the list.
*/
@Override
public void checkpoint(State state) {
if (state == State.FILTERED) {
assert lastCheckpointIndex >= 0;
for (int i = lastCheckpointIndex; i < prevBlocks.size(); i++) {
prevBlocks.get(i).release();
prevBlocks.set(i, null);
}
}
lastCheckpointIndex = prevBlocks.size();
}

@Override
public void retainBlock() {
shouldRetainBlock = true;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,4 +420,32 @@ public void shipped() throws IOException {
}
}
}

@Override
public void checkpoint(State state) {
if (current != null) {
current.checkpoint(state);
}
if (this.heap != null) {
for (KeyValueScanner scanner : this.heap) {
scanner.checkpoint(state);
}
}
// Also checkpoint any scanners for delayed close. These would be exhausted scanners,
// which may contain blocks that were totally filtered during a request. If so, the checkpoint
// will release them.
if (scannersForDelayedClose != null) {
for (KeyValueScanner scanner : scannersForDelayedClose) {
scanner.checkpoint(state);
}
}
}

@Override
public void retainBlock() {
if (current != null) {
current.retainBlock();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,14 @@ public Cell getNextIndexedKey() {
public void shipped() throws IOException {
// do nothing
}

@Override
public void checkpoint(State state) {
// do nothing
}

@Override
public void retainBlock() {
// do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
// Used to check time limit
LimitScope limitScope = LimitScope.BETWEEN_CELLS;

checkpoint(State.START);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just for RegionScannerImpl.filter is not null we should checkpoint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. i think i can do that, let me look into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved the initial checkpoint call into the constructor, so we only have to do it once. The other checkpoint calls have been changed to a new checkpointIfFiltering method which returns early if filter == null.

The initial checkpoint call is still important because it enables retainBlock() functionality in StoreScanner. Someone could submit a scan with addColumn(...) which looks for 1 column in rows with many columns. In which case retainBlock() in StoreScanner would still be very useful. I added a comment to explain that.

Thanks for looking!

Copy link
Contributor

@comnetwork comnetwork Jan 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbeaudreault , thank you very much for detailed reply. Overall LGTM, it is really a very insightful PR, just have one suggestion , FYI.

  • Should we set HFileScannerImpl.lastCheckpointIndex to 0 when initializing ? so we could simplify the
    if (shouldRetainBlock || lastCheckpointIndex < 0) in HFileScannerImpl.handlePrevBlock to
    if (shouldRetainBlock), after all, when we start to scan,
    HFileScannerImpl.lastCheckpointIndex is always >=0, no matter there is filter or not, and we could also
    remove checkpoint(State.START) in RegionScannerImpl ctor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @comnetwork.

I agree ideally we could do what you say. The reason I have the complexity is because I'm concerned about use-cases of HFileScanner or StoreFileScanner which don't go through StoreScanner. For example HFilePrettyPrinter, bulk load verification, etc.

The lastCheckpointIndex < 0 check ensures that we only honor shouldRetainBlock if a call to checkpoint(State) has occurred. The contract is that if you are using checkpointing, you also need to call retainBlock(). If you are not using checkpointing, you don't need to call retainBlock().

Failing to call retainBlock() would result in blocks being released too early, so I only want this to apply for StoreScanner, which is the only place we currently do checkpointing.

A better approach might be to add a new boolean checkpointEnabled in HFileScannerImpl constructor. This is more explicit but involves adding boolean arguments to many various getScanner methods. I can give this a shot, or also open to other ideas if you have them.

Let me know if this wasn't clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@comnetwork I just pushed a commit which adds a boolean checkpointingEnabled to the creation of HFileScannerImpl. This involves minor modifications to many levels of getScanner(...) calls. I added the new param everywhere necessary, defaulting to false everywhere. This retains the old behavior for all usages. I then updated just StoreScanner to pass true, as this is the only place we want to enable this behavior right now.

This is a bunch more small changes, but is probably the safest route. If there were a bug where we accidentally passed false, we'd just be reverting to the original behavior and losing the optimization. You can see how it affects things in HFileReaderImpl.

Let me know what you think. I can revert that commit or we can keep it. As a result I was able to simplify some of the checkpointing (default to 0 instead of -1, no need to call checkpoint on new scanners).

Copy link
Contributor

@comnetwork comnetwork Jan 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbeaudreault , thank you for the elaborate work and I agree with your point that adding a new boolean checkpointEnabled make the logic more clear.

The contract is that if you are using checkpointing, you also need to call retainBlock(). If you are not using checkpointing, you don't need to call retainBlock().

I agree with the first part of the sentence, but have doubts about the second. From you code, I think for scan by RegionScannnerImpl, retainBlock is always needed to release blocks early, only when there is filter(especially row level filter),we need checkpoint further to narrow the blocks which should be retained after a row is filtered?I think we could just only use retainBlock for user scan which specifying columns but not has filters, so we don't need to call checkpoint. If you agree with my point, I think the variable name boolean checkpointingEnabled is not very appropriate, maybe earlyReleaseBlockEnabled is more indicative of intentions?


// The loop here is used only when at some point during the next we determine
// that due to effects of filters or otherwise, we have an empty row in the result.
// Then we loop and try again. Otherwise, we must get out on the first iteration via return,
Expand Down Expand Up @@ -501,6 +503,7 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
results.clear();
checkpoint(State.FILTERED);

// Read nothing as the rowkey was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
Expand Down Expand Up @@ -553,6 +556,7 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
incrementCountOfRowsFilteredMetric(scannerContext);
results.clear();
checkpoint(State.FILTERED);
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
Expand Down Expand Up @@ -602,6 +606,7 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
// Double check to prevent empty rows from appearing in result. It could be
// the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) {
checkpoint(State.FILTERED);
incrementCountOfRowsFilteredMetric(scannerContext);
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
Expand Down Expand Up @@ -783,6 +788,21 @@ public void shipped() throws IOException {
}
}

@Override
public void checkpoint(State state) {
if (storeHeap != null) {
storeHeap.checkpoint(state);
}
if (joinedHeap != null) {
joinedHeap.checkpoint(state);
}
}

@Override
public void retainBlock() {
// do nothing. this is really only called in StoreScanner
}

@Override
public void run() throws IOException {
// This is the RPC callback method executed. We do the close in of the scanner in this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,16 @@ public void shipped() throws IOException {
// do nothing
}

@Override
public void checkpoint(State state) {
// do nothing
}

@Override
public void retainBlock() {
// do nothing
}

// debug method
@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
/**
* This interface denotes a scanner as one which can ship cells. Scan operation do many RPC requests
* to server and fetch N rows/RPC. These are then shipped to client. At the end of every such batch
* {@link #shipped()} will get called.
* {@link #shipped()} will get called. <br>
* Scans of large numbers of fully filtered blocks (due to Filter, or sparse columns, etc) can cause
* excess memory to be held while waiting for {@link #shipped()} to be called. Therefore, there's a
* checkpoint mechanism via {@link #checkpoint(State)}. These enable fully filtered blocks to be
* eagerly released, since they are not referenced by cells being returned to clients.
*/
@InterfaceAudience.Private
public interface Shipper {
Expand All @@ -33,4 +37,27 @@ public interface Shipper {
* can be done here.
*/
void shipped() throws IOException;

enum State {
START,
FILTERED
}

/**
* Called during processing of a batch of scanned rows, before returning to the client. Allows
* releasing of blocks which have been totally skipped in the result set due to filters. <br>
* Should be called with {@link State#START} at the beginning of a request for a row. This will
* set state necessary to handle {@link State#FILTERED}. Calling with {@link State#FILTERED} will
* release any blocks which have been fully processed since the last call to
* {@link #checkpoint(State)}. Calling again with {@link State#START} will reset the pointers.
*/
void checkpoint(State state);

/**
* Used by upstream callers to notify the shipper that the current block should be retained for
* shipping when {@link #shipped()} or {@link #checkpoint(State)} are called. Otherwise, the block
* will be released immediately once it's no longer needed. Only has an effect after
* {@link #checkpoint(State)} has been called at least once.
*/
void retainBlock();
}
Original file line number Diff line number Diff line change
Expand Up @@ -558,4 +558,14 @@ public Cell getNextIndexedKey() {
public void shipped() throws IOException {
this.hfs.shipped();
}

@Override
public void checkpoint(State state) {
this.hfs.checkpoint(state);
}

@Override
public void retainBlock() {
this.hfs.retainBlock();
}
}
Loading