Skip to content

Commit

Permalink
HBASE-27558 Scan quotas and limits should account for total block IO
Browse files Browse the repository at this point in the history
  • Loading branch information
bbeaudreault committed Jan 16, 2023
1 parent 2a7c69d commit 7f7c8b6
Show file tree
Hide file tree
Showing 15 changed files with 489 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ public void close() {
public void shipped() throws IOException {
this.delegate.shipped();
}

@Override
public int getCurrentBlockSizeOnce() {
return this.delegate.getCurrentBlockSizeOnce();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ protected static class HFileScannerImpl implements HFileScanner {
// RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the
// unreferenced block please.
protected HFileBlock curBlock;
// Whether we returned a result for curBlock's size in getCurrentBlockSizeOnce().
// gets reset whenever curBlock is changed.
private boolean providedCurrentBlockSize = false;
// Previous blocks that were used in the course of the read
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();

Expand All @@ -355,6 +358,7 @@ void updateCurrBlockRef(HFileBlock block) {
prevBlocks.add(this.curBlock);
}
this.curBlock = block;
this.providedCurrentBlockSize = false;
}

void reset() {
Expand Down Expand Up @@ -415,6 +419,15 @@ public void close() {
this.returnBlocks(true);
}

@Override
public int getCurrentBlockSizeOnce() {
if (providedCurrentBlockSize || curBlock == null) {
return 0;
}
providedCurrentBlockSize = true;
return curBlock.getUncompressedSizeWithoutHeader();
}

// Returns the #bytes in HFile for the current cell. Used to skip these many bytes in current
// HFile block's buffer so as to position to the next cell.
private int getCurCellSerializedSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,10 @@ public interface HFileScanner extends Shipper, Closeable {
*/
@Override
void close();

/**
* Returns the block size in bytes for the current block. Will only return a value once per block,
* otherwise 0. Used for calculating block IO in ScannerContext.
*/
int getCurrentBlockSizeOnce();
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ boolean isLatestCellFromMemstore() {
return !this.current.isFileScanner();
}

@Override
public int getCurrentBlockSizeOnce() {
return this.current.getCurrentBlockSizeOnce();
}

@Override
public Cell next() throws IOException {
if (this.current == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ default long getScannerOrder() {
/** Returns true if this is a file scanner. Otherwise a memory scanner is assumed. */
boolean isFileScanner();

/**
* Returns the block size in bytes for the current block. Will only return a value once per block,
* otherwise 0. Used for calculating block IO in ScannerContext.
*/
int getCurrentBlockSizeOnce();

/**
* @return the file path if this is a file scanner, otherwise null.
* @see #isFileScanner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public boolean isFileScanner() {
return false;
}

@Override
public int getCurrentBlockSizeOnce() {
// No block size by default.
return 0;
}

@Override
public Path getFilePath() {
// Not a file by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -3282,8 +3281,7 @@ private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean mo
// return whether we have more results in region.
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall)
throws IOException {
ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
HRegion region = rsh.r;
RegionScanner scanner = rsh.s;
long maxResultSize;
Expand Down Expand Up @@ -3343,7 +3341,9 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
// maxResultSize - either we can reach this much size for all cells(being read) data or sum
// of heap size occupied by cells(being read). Cell data means its key and value parts.
contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
// maxQuotaResultSize - max results just from server side configuration and quotas, without
// user's specified max. We use this for evaluating limits based on blocks (not cells).
contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize, maxQuotaResultSize);
contextBuilder.setBatchLimit(scanner.getBatch());
contextBuilder.setTimeLimit(timeScope, timeLimit);
contextBuilder.setTrackMetrics(trackMetrics);
Expand Down Expand Up @@ -3398,7 +3398,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
}
boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
results.add(r);
numOfResults++;
if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
Expand Down Expand Up @@ -3431,8 +3430,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
// there are more values to be read server side. If there aren't more values,
// marking it as a heartbeat is wasteful because the client will need to issue
// another ScanRequest only to realize that they already have all the values
if (moreRows && timeLimitReached) {
// Heartbeat messages occur when the time limit has been reached.
if (moreRows && (timeLimitReached || (sizeLimitReached && results.isEmpty()))) {
// Heartbeat messages occur when the time limit has been reached, or size limit has
// been reached before collecting any results. This can happen for heavily filtered
// scans which scan over too many blocks.
builder.setHeartbeatMessage(true);
if (rsh.needCursor) {
Cell cursorCell = scannerContext.getLastPeekedCell();
Expand All @@ -3445,6 +3446,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
}
values.clear();
}
if (rpcCall != null) {
rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress());
rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
}
builder.setMoreResultsInRegion(moreRows);
// Check to see if the client requested that we track metrics server side. If the
// client requested metrics, retrieve the metrics from the scanner context.
Expand Down Expand Up @@ -3606,7 +3611,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
} else {
limitOfRows = -1;
}
MutableObject<Object> lastBlock = new MutableObject<>();
boolean scannerClosed = false;
try {
List<Result> results = new ArrayList<>(Math.min(rows, 512));
Expand All @@ -3616,8 +3620,9 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
if (region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
if (!results.isEmpty()) {
Object lastBlock = null;
for (Result r : results) {
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
lastBlock = addSize(rpcCall, r, lastBlock);
}
}
if (bypass != null && bypass.booleanValue()) {
Expand All @@ -3626,7 +3631,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
if (!done) {
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
results, builder, lastBlock, rpcCall);
results, builder, rpcCall);
} else {
builder.setMoreResultsInRegion(!results.isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,8 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
results.clear();

// Read nothing as the rowkey was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
Expand Down Expand Up @@ -561,8 +562,9 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!shouldStop) {
// Read nothing as the cells was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
// Read nothing as the cells was filtered, but still need to check time limit.
// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
Expand Down Expand Up @@ -608,6 +610,11 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
if (!shouldStop) {
// Read nothing as the cells were filtered, but still need to check time limit.
// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
}
}
Expand Down Expand Up @@ -705,13 +712,21 @@ public int size() {

protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";

// Enable skipping row mode, which disables limits and skips tracking progress for all
// but block size. We keep tracking block size because skipping a row in this way
// might involve reading blocks along the way.
scannerContext.setSkippingRow(true);

Cell next;
while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
region.checkInterrupt();
this.storeHeap.next(MOCKED_LIST);
this.storeHeap.next(MOCKED_LIST, scannerContext);
}

scannerContext.setSkippingRow(false);
resetFilters();

// Calling the hook in CP which allows it to do a fast forward
Expand Down
Loading

0 comments on commit 7f7c8b6

Please sign in to comment.