Skip to content

Commit

Permalink
HBASE-27558 Scan quotas and limits should account for total block IO (#…
Browse files Browse the repository at this point in the history
…4967)

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
bbeaudreault committed Jan 30, 2023
1 parent 3a98d87 commit 4788722
Show file tree
Hide file tree
Showing 15 changed files with 554 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.function.IntConsumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
Expand Down Expand Up @@ -277,6 +278,11 @@ public void close() {
public void shipped() throws IOException {
this.delegate.shipped();
}

@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
this.delegate.recordBlockSize(blockSizeConsumer);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Optional;
import java.util.function.IntConsumer;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -336,6 +337,9 @@ protected static class HFileScannerImpl implements HFileScanner {
// RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the
// unreferenced block please.
protected HFileBlock curBlock;
// Whether we returned a result for curBlock's size in recordBlockSize().
// gets reset whenever curBlock is changed.
private boolean providedCurrentBlockSize = false;
// Previous blocks that were used in the course of the read
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();

Expand All @@ -355,6 +359,7 @@ void updateCurrBlockRef(HFileBlock block) {
prevBlocks.add(this.curBlock);
}
this.curBlock = block;
this.providedCurrentBlockSize = false;
}

void reset() {
Expand Down Expand Up @@ -415,6 +420,14 @@ public void close() {
this.returnBlocks(true);
}

@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
if (!providedCurrentBlockSize && curBlock != null) {
providedCurrentBlockSize = true;
blockSizeConsumer.accept(curBlock.getUncompressedSizeWithoutHeader());
}
}

// Returns the #bytes in HFile for the current cell. Used to skip these many bytes in current
// HFile block's buffer so as to position to the next cell.
private int getCurCellSerializedSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.IntConsumer;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.regionserver.Shipper;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -140,4 +141,11 @@ public interface HFileScanner extends Shipper, Closeable {
*/
@Override
void close();

/**
* Record the size of the current block in bytes, passing as an argument to the blockSizeConsumer.
* Implementations should ensure that blockSizeConsumer is only called once per block.
* @param blockSizeConsumer to be called with block size in bytes, once per block.
*/
void recordBlockSize(IntConsumer blockSizeConsumer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.function.IntConsumer;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
Expand Down Expand Up @@ -104,6 +105,11 @@ boolean isLatestCellFromMemstore() {
return !this.current.isFileScanner();
}

@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
this.current.recordBlockSize(blockSizeConsumer);
}

@Override
public Cell next() throws IOException {
if (this.current == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.function.IntConsumer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
Expand Down Expand Up @@ -125,6 +126,13 @@ default long getScannerOrder() {
/** Returns true if this is a file scanner. Otherwise a memory scanner is assumed. */
boolean isFileScanner();

/**
* Record the size of the current block in bytes, passing as an argument to the blockSizeConsumer.
* Implementations should ensure that blockSizeConsumer is only called once per block.
* @param blockSizeConsumer to be called with block size in bytes, once per block.
*/
void recordBlockSize(IntConsumer blockSizeConsumer);

/**
* @return the file path if this is a file scanner, otherwise null.
* @see #isFileScanner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.function.IntConsumer;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -63,6 +64,11 @@ public boolean isFileScanner() {
return false;
}

@Override
public void recordBlockSize(IntConsumer blockSizeConsumer) {
// do nothing
}

@Override
public Path getFilePath() {
// Not a file by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -3312,8 +3311,7 @@ private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean mo
// return whether we have more results in region.
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall)
throws IOException {
ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
HRegion region = rsh.r;
RegionScanner scanner = rsh.s;
long maxResultSize;
Expand Down Expand Up @@ -3373,7 +3371,19 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
// maxResultSize - either we can reach this much size for all cells(being read) data or sum
// of heap size occupied by cells(being read). Cell data means its key and value parts.
contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
// maxQuotaResultSize - max results just from server side configuration and quotas, without
// user's specified max. We use this for evaluating limits based on blocks (not cells).
// We may have accumulated some results in coprocessor preScannerNext call. We estimate
// block and cell size of those using call to addSize. Update our maximums for scanner
// context so we can account for them in the real scan.
long maxCellSize = maxResultSize;
long maxBlockSize = maxQuotaResultSize;
if (rpcCall != null) {
maxBlockSize -= rpcCall.getResponseBlockSize();
maxCellSize -= rpcCall.getResponseCellSize();
}

contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
contextBuilder.setBatchLimit(scanner.getBatch());
contextBuilder.setTimeLimit(timeScope, timeLimit);
contextBuilder.setTrackMetrics(trackMetrics);
Expand Down Expand Up @@ -3428,7 +3438,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
}
boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
results.add(r);
numOfResults++;
if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
Expand Down Expand Up @@ -3457,12 +3466,18 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;

if (limitReached || !moreRows) {
// With block size limit, we may exceed size limit without collecting any results.
// In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
// or cursor if results were collected, for example for cell size or heap size limits.
boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
// We only want to mark a ScanResponse as a heartbeat message in the event that
// there are more values to be read server side. If there aren't more values,
// marking it as a heartbeat is wasteful because the client will need to issue
// another ScanRequest only to realize that they already have all the values
if (moreRows && timeLimitReached) {
// Heartbeat messages occur when the time limit has been reached.
if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
// Heartbeat messages occur when the time limit has been reached, or size limit has
// been reached before collecting any results. This can happen for heavily filtered
// scans which scan over too many blocks.
builder.setHeartbeatMessage(true);
if (rsh.needCursor) {
Cell cursorCell = scannerContext.getLastPeekedCell();
Expand All @@ -3475,6 +3490,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
}
values.clear();
}
if (rpcCall != null) {
rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress());
rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
}
builder.setMoreResultsInRegion(moreRows);
// Check to see if the client requested that we track metrics server side. If the
// client requested metrics, retrieve the metrics from the scanner context.
Expand Down Expand Up @@ -3636,7 +3655,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
} else {
limitOfRows = -1;
}
MutableObject<Object> lastBlock = new MutableObject<>();
boolean scannerClosed = false;
try {
List<Result> results = new ArrayList<>(Math.min(rows, 512));
Expand All @@ -3646,8 +3664,18 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
if (region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
if (!results.isEmpty()) {
// If scanner CP added results to list, we want to account for cell and block size of
// that work. We estimate this using addSize, since CP does not get ScannerContext. If
// !done, the actual scan call below will use more accurate ScannerContext block and
// cell size tracking for the rest of the request. The two result sets will be added
// together in the RpcCall accounting.
// This here is just an estimate (see addSize for more details on estimation). We don't
// pass lastBlock to the scan call below because the real scan uses ScannerContext,
// which does not use lastBlock tracking. This may result in over counting by 1 block,
// but that is unlikely since addSize is already a rough estimate.
Object lastBlock = null;
for (Result r : results) {
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
lastBlock = addSize(rpcCall, r, lastBlock);
}
}
if (bypass != null && bypass.booleanValue()) {
Expand All @@ -3656,7 +3684,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
}
if (!done) {
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
results, builder, lastBlock, rpcCall);
results, builder, rpcCall);
} else {
builder.setMoreResultsInRegion(!results.isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,8 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
results.clear();

// Read nothing as the rowkey was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
Expand Down Expand Up @@ -558,8 +559,9 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!shouldStop) {
// Read nothing as the cells was filtered, but still need to check time limit
if (scannerContext.checkTimeLimit(limitScope)) {
// Read nothing as the cells was filtered, but still need to check time limit.
// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
Expand Down Expand Up @@ -605,6 +607,13 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
if (!shouldStop) {
// We check size limit because we might have read blocks in the nextRow call above, or
// in the call populateResults call. Only scans with hasFilterRow should reach this point,
// and for those scans which filter row _cells_ this is the only place we can actually
// enforce that the scan does not exceed limits since it bypasses all other checks above.
if (scannerContext.checkSizeLimit(limitScope)) {
return true;
}
continue;
}
}
Expand Down Expand Up @@ -702,13 +711,21 @@ public int size() {

protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";

// Enable skipping row mode, which disables limits and skips tracking progress for all
// but block size. We keep tracking block size because skipping a row in this way
// might involve reading blocks along the way.
scannerContext.setSkippingRow(true);

Cell next;
while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
region.checkInterrupt();
this.storeHeap.next(MOCKED_LIST);
this.storeHeap.next(MOCKED_LIST, scannerContext);
}

scannerContext.setSkippingRow(false);
resetFilters();

// Calling the hook in CP which allows it to do a fast forward
Expand Down
Loading

0 comments on commit 4788722

Please sign in to comment.