From 7f7c8b6c9887cadcb94b9bc3a6f4ca50db5c12fb Mon Sep 17 00:00:00 2001 From: Bryan Beaudreault Date: Tue, 20 Dec 2022 08:42:29 -0500 Subject: [PATCH] HBASE-27558 Scan quotas and limits should account for total block IO --- .../hadoop/hbase/io/HalfStoreFileReader.java | 5 + .../hbase/io/hfile/HFileReaderImpl.java | 13 + .../hadoop/hbase/io/hfile/HFileScanner.java | 6 + .../hbase/regionserver/KeyValueHeap.java | 5 + .../hbase/regionserver/KeyValueScanner.java | 6 + .../regionserver/NonLazyKeyValueScanner.java | 6 + .../hbase/regionserver/RSRpcServices.java | 25 +- .../hbase/regionserver/RegionScannerImpl.java | 23 +- .../hbase/regionserver/ScannerContext.java | 115 ++++++-- .../hbase/regionserver/SegmentScanner.java | 5 + .../hbase/regionserver/StoreFileScanner.java | 5 + .../hbase/regionserver/StoreScanner.java | 12 +- .../hbase/io/hfile/TestHFileReaderImpl.java | 35 +++ .../DelegatingKeyValueScanner.java | 5 + .../TestScannerBlockSizeLimits.java | 253 ++++++++++++++++++ 15 files changed, 489 insertions(+), 30 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index cc680173a4e3..98b54360983f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -277,6 +277,11 @@ public void close() { public void shipped() throws IOException { this.delegate.shipped(); } + + @Override + public int getCurrentBlockSizeOnce() { + return this.delegate.getCurrentBlockSizeOnce(); + } }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index af3a9b960aa3..fa7e1afc55b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -336,6 +336,9 @@ protected static class HFileScannerImpl implements HFileScanner { // RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the // unreferenced block please. protected HFileBlock curBlock; + // Whether we returned a result for curBlock's size in getCurrentBlockSizeOnce(). + // gets reset whenever curBlock is changed. + private boolean providedCurrentBlockSize = false; // Previous blocks that were used in the course of the read protected final ArrayList prevBlocks = new ArrayList<>(); @@ -355,6 +358,7 @@ void updateCurrBlockRef(HFileBlock block) { prevBlocks.add(this.curBlock); } this.curBlock = block; + this.providedCurrentBlockSize = false; } void reset() { @@ -415,6 +419,15 @@ public void close() { this.returnBlocks(true); } + @Override + public int getCurrentBlockSizeOnce() { + if (providedCurrentBlockSize || curBlock == null) { + return 0; + } + providedCurrentBlockSize = true; + return curBlock.getUncompressedSizeWithoutHeader(); + } + // Returns the #bytes in HFile for the current cell. Used to skip these many bytes in current // HFile block's buffer so as to position to the next cell. private int getCurCellSerializedSize() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java index fd5c66b126b2..cdc0105bfeb0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java @@ -140,4 +140,10 @@ public interface HFileScanner extends Shipper, Closeable { */ @Override void close(); + + /** + * Returns the block size in bytes for the current block. Will only return a value once per block, + * otherwise 0. Used for calculating block IO in ScannerContext. + */ + int getCurrentBlockSizeOnce(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 1fe80bc58b01..29d913a90afc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -104,6 +104,11 @@ boolean isLatestCellFromMemstore() { return !this.current.isFileScanner(); } + @Override + public int getCurrentBlockSizeOnce() { + return this.current.getCurrentBlockSizeOnce(); + } + @Override public Cell next() throws IOException { if (this.current == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index d90cf78dda54..5fed3cabc851 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -125,6 +125,12 @@ default long getScannerOrder() { /** Returns true if this is a file scanner. Otherwise a memory scanner is assumed. */ boolean isFileScanner(); + /** + * Returns the block size in bytes for the current block. Will only return a value once per block, + * otherwise 0. Used for calculating block IO in ScannerContext. + */ + int getCurrentBlockSizeOnce(); + /** * @return the file path if this is a file scanner, otherwise null. * @see #isFileScanner() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java index 02d4d85d7e13..b1ed355afc49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java @@ -63,6 +63,12 @@ public boolean isFileScanner() { return false; } + @Override + public int getCurrentBlockSizeOnce() { + // No block size by default. + return 0; + } + @Override public Path getFilePath() { // Not a file by default. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 4201a6d65aef..1071fad3de1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -42,7 +42,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; -import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -3282,8 +3281,7 @@ private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean mo // return whether we have more results in region. private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh, long maxQuotaResultSize, int maxResults, int limitOfRows, List results, - ScanResponse.Builder builder, MutableObject lastBlock, RpcCall rpcCall) - throws IOException { + ScanResponse.Builder builder, RpcCall rpcCall) throws IOException { HRegion region = rsh.r; RegionScanner scanner = rsh.s; long maxResultSize; @@ -3343,7 +3341,9 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); // maxResultSize - either we can reach this much size for all cells(being read) data or sum // of heap size occupied by cells(being read). Cell data means its key and value parts. - contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize); + // maxQuotaResultSize - max results just from server side configuration and quotas, without + // user's specified max. We use this for evaluating limits based on blocks (not cells). + contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize, maxQuotaResultSize); contextBuilder.setBatchLimit(scanner.getBatch()); contextBuilder.setTimeLimit(timeScope, timeLimit); contextBuilder.setTrackMetrics(trackMetrics); @@ -3398,7 +3398,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan } boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow(); Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow); - lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue())); results.add(r); numOfResults++; if (!mayHaveMoreCellsInRow && limitOfRows > 0) { @@ -3431,8 +3430,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan // there are more values to be read server side. If there aren't more values, // marking it as a heartbeat is wasteful because the client will need to issue // another ScanRequest only to realize that they already have all the values - if (moreRows && timeLimitReached) { - // Heartbeat messages occur when the time limit has been reached. + if (moreRows && (timeLimitReached || (sizeLimitReached && results.isEmpty()))) { + // Heartbeat messages occur when the time limit has been reached, or size limit has + // been reached before collecting any results. This can happen for heavily filtered + // scans which scan over too many blocks. builder.setHeartbeatMessage(true); if (rsh.needCursor) { Cell cursorCell = scannerContext.getLastPeekedCell(); @@ -3445,6 +3446,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan } values.clear(); } + if (rpcCall != null) { + rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress()); + rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); + } builder.setMoreResultsInRegion(moreRows); // Check to see if the client requested that we track metrics server side. If the // client requested metrics, retrieve the metrics from the scanner context. @@ -3606,7 +3611,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } else { limitOfRows = -1; } - MutableObject lastBlock = new MutableObject<>(); boolean scannerClosed = false; try { List results = new ArrayList<>(Math.min(rows, 512)); @@ -3616,8 +3620,9 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque if (region.getCoprocessorHost() != null) { Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); if (!results.isEmpty()) { + Object lastBlock = null; for (Result r : results) { - lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue())); + lastBlock = addSize(rpcCall, r, lastBlock); } } if (bypass != null && bypass.booleanValue()) { @@ -3626,7 +3631,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } if (!done) { scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, - results, builder, lastBlock, rpcCall); + results, builder, rpcCall); } else { builder.setMoreResultsInRegion(!results.isEmpty()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index 11d4c20f581b..ab3e8b207b15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -503,7 +503,8 @@ private boolean nextInternal(List results, ScannerContext scannerContext) results.clear(); // Read nothing as the rowkey was filtered, but still need to check time limit - if (scannerContext.checkTimeLimit(limitScope)) { + // We also check size limit because we might have read blocks in getting to this point. + if (scannerContext.checkAnyLimitReached(limitScope)) { return true; } continue; @@ -561,8 +562,9 @@ private boolean nextInternal(List results, ScannerContext scannerContext) // This row was totally filtered out, if this is NOT the last row, // we should continue on. Otherwise, nothing else to do. if (!shouldStop) { - // Read nothing as the cells was filtered, but still need to check time limit - if (scannerContext.checkTimeLimit(limitScope)) { + // Read nothing as the cells was filtered, but still need to check time limit. + // We also check size limit because we might have read blocks in getting to this point. + if (scannerContext.checkAnyLimitReached(limitScope)) { return true; } continue; @@ -608,6 +610,11 @@ private boolean nextInternal(List results, ScannerContext scannerContext) return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } if (!shouldStop) { + // Read nothing as the cells were filtered, but still need to check time limit. + // We also check size limit because we might have read blocks in getting to this point. + if (scannerContext.checkAnyLimitReached(limitScope)) { + return true; + } continue; } } @@ -705,13 +712,21 @@ public int size() { protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException { assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read."; + + // Enable skipping row mode, which disables limits and skips tracking progress for all + // but block size. We keep tracking block size because skipping a row in this way + // might involve reading blocks along the way. + scannerContext.setSkippingRow(true); + Cell next; while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) { // Check for thread interrupt status in case we have been signaled from // #interruptRegionOperation. region.checkInterrupt(); - this.storeHeap.next(MOCKED_LIST); + this.storeHeap.next(MOCKED_LIST, scannerContext); } + + scannerContext.setSkippingRow(false); resetFilters(); // Calling the hook in CP which allows it to do a fast forward diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index c9655d7fafdf..03d84f209b04 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -103,6 +103,13 @@ public class ScannerContext { boolean keepProgress; private static boolean DEFAULT_KEEP_PROGRESS = false; + /** + * Allows temporarily ignoring limits and skipping tracking of batch and size progress. Used when + * skipping to the next row, in which case all processed cells are thrown away so should not count + * towards progress. + */ + boolean skippingRow = false; + private Cell lastPeekedCell = null; // Set this to true will have the same behavior with reaching the time limit. @@ -123,7 +130,7 @@ public class ScannerContext { } // Progress fields are initialized to 0 - progress = new ProgressFields(0, 0, 0); + progress = new ProgressFields(0, 0, 0, 0); this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; @@ -148,7 +155,9 @@ public ServerSideScanMetrics getMetrics() { * @return true if the progress tracked so far in this instance will be considered during an * invocation of {@link InternalScanner#next(java.util.List)} or * {@link RegionScanner#next(java.util.List)}. false when the progress tracked so far - * should not be considered and should instead be wiped away via {@link #clearProgress()} + * should not be considered and should instead be wiped away via {@link #clearProgress()}. + * This only applies to per-row progress, like batch and data/heap size. Block size is + * never reset because it tracks all of the blocks scanned for an entire request. */ boolean getKeepProgress() { return keepProgress; @@ -158,10 +167,32 @@ void setKeepProgress(boolean keepProgress) { this.keepProgress = keepProgress; } + /** + * In this mode, only block size progress is tracked, and limits are ignored. We set this mode + * when skipping to next row, in which case all cells returned a thrown away so should not count + * towards progress. + * @return true if we are in skipping row mode. + */ + public boolean getSkippingRow() { + return skippingRow; + } + + /** + * @param skippingRow set true to cause disabling of collecting per-cell progress or enforcing any + * limits. This is used when trying to skip over all cells in a row, in which + * case those cells are thrown away so should not count towards progress. + */ + void setSkippingRow(boolean skippingRow) { + this.skippingRow = skippingRow; + } + /** * Progress towards the batch limit has been made. Increment internal tracking of batch progress */ void incrementBatchProgress(int batch) { + if (skippingRow) { + return; + } int currentBatch = progress.getBatch(); progress.setBatch(currentBatch + batch); } @@ -170,12 +201,25 @@ void incrementBatchProgress(int batch) { * Progress towards the size limit has been made. Increment internal tracking of size progress */ void incrementSizeProgress(long dataSize, long heapSize) { + if (skippingRow) { + return; + } long curDataSize = progress.getDataSize(); progress.setDataSize(curDataSize + dataSize); long curHeapSize = progress.getHeapSize(); progress.setHeapSize(curHeapSize + heapSize); } + /** + * Progress towards the block limit has been made. Increment internal track of block progress + */ + void incrementBlockProgress(int blockSize) { + if (blockSize > 0) { + long curBlockSize = progress.getBlockSize(); + progress.setBlockSize(curBlockSize + blockSize); + } + } + int getBatchProgress() { return progress.getBatch(); } @@ -188,6 +232,10 @@ long getHeapSizeProgress() { return progress.getHeapSize(); } + long getBlockSizeProgress() { + return progress.getBlockSize(); + } + void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) { setBatchProgress(batchProgress); setSizeProgress(sizeProgress, heapSizeProgress); @@ -204,10 +252,12 @@ void setBatchProgress(int batchProgress) { /** * Clear away any progress that has been made so far. All progress fields are reset to initial - * values + * values. Only clears progress that should reset between rows. {@link #getBlockSizeProgress()} is + * not reset because it increments for all blocks scanned whether the result is included or + * filtered. */ void clearProgress() { - progress.setFields(0, 0, 0); + progress.setFields(0, 0, 0, getBlockSizeProgress()); } /** @@ -244,7 +294,7 @@ boolean hasBatchLimit(LimitScope checkerScope) { /** Returns true if the size limit can be enforced in the checker's scope */ boolean hasSizeLimit(LimitScope checkerScope) { return limits.canEnforceSizeLimitFromScope(checkerScope) - && (limits.getDataSize() > 0 || limits.getHeapSize() > 0); + && (limits.getDataSize() > 0 || limits.getHeapSize() > 0 || limits.getBlockSize() > 0); } /** Returns true if the time limit can be enforced in the checker's scope */ @@ -289,7 +339,7 @@ long getTimeLimit() { * @return true when the limit is enforceable from the checker's scope and it has been reached */ boolean checkBatchLimit(LimitScope checkerScope) { - return hasBatchLimit(checkerScope) && progress.getBatch() >= limits.getBatch(); + return !skippingRow && hasBatchLimit(checkerScope) && progress.getBatch() >= limits.getBatch(); } /** @@ -297,8 +347,10 @@ boolean checkBatchLimit(LimitScope checkerScope) { * @return true when the limit is enforceable from the checker's scope and it has been reached */ boolean checkSizeLimit(LimitScope checkerScope) { - return hasSizeLimit(checkerScope) && (progress.getDataSize() >= limits.getDataSize() - || progress.getHeapSize() >= limits.getHeapSize()); + return !skippingRow && hasSizeLimit(checkerScope) + && (progress.getDataSize() >= limits.getDataSize() + || progress.getHeapSize() >= limits.getHeapSize() + || progress.getBlockSize() >= limits.getBlockSize()); } /** @@ -307,7 +359,7 @@ boolean checkSizeLimit(LimitScope checkerScope) { * @return true when the limit is enforceable from the checker's scope and it has been reached */ boolean checkTimeLimit(LimitScope checkerScope) { - return hasTimeLimit(checkerScope) + return !skippingRow && hasTimeLimit(checkerScope) && (returnImmediately || EnvironmentEdgeManager.currentTime() >= limits.getTime()); } @@ -383,10 +435,12 @@ public Builder setTrackMetrics(boolean trackMetrics) { return this; } - public Builder setSizeLimit(LimitScope sizeScope, long dataSizeLimit, long heapSizeLimit) { + public Builder setSizeLimit(LimitScope sizeScope, long dataSizeLimit, long heapSizeLimit, + long blockSizeLimit) { limits.setDataSize(dataSizeLimit); limits.setHeapSize(heapSizeLimit); limits.setSizeScope(sizeScope); + limits.setBlockSize(blockSizeLimit); return this; } @@ -532,6 +586,9 @@ private static class LimitFields { // The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as // such AND data cells of Cells which are in on heap area. long heapSize = DEFAULT_SIZE; + // The total amount of block bytes that have been loaded in order to process cells for the + // request. + long blockSize = DEFAULT_SIZE; LimitScope timeScope = DEFAULT_SCOPE; long time = DEFAULT_TIME; @@ -545,19 +602,21 @@ private static class LimitFields { void copy(LimitFields limitsToCopy) { if (limitsToCopy != null) { setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getDataSize(), - limitsToCopy.getHeapSize(), limitsToCopy.getTimeScope(), limitsToCopy.getTime()); + limitsToCopy.getHeapSize(), limitsToCopy.getBlockSize(), limitsToCopy.getTimeScope(), + limitsToCopy.getTime()); } } /** * Set all fields together. */ - void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize, + void setFields(int batch, LimitScope sizeScope, long dataSize, long heapSize, long blockSize, LimitScope timeScope, long time) { setBatch(batch); setSizeScope(sizeScope); setDataSize(dataSize); setHeapSize(heapSize); + setBlockSize(blockSize); setTimeScope(timeScope); setTime(time); } @@ -583,6 +642,10 @@ long getHeapSize() { return this.heapSize; } + long getBlockSize() { + return this.blockSize; + } + void setDataSize(long dataSize) { this.dataSize = dataSize; } @@ -591,6 +654,10 @@ void setHeapSize(long heapSize) { this.heapSize = heapSize; } + void setBlockSize(long blockSize) { + this.blockSize = blockSize; + } + /** Returns {@link LimitScope} indicating scope in which the size limit is enforced */ LimitScope getSizeScope() { return this.sizeScope; @@ -647,6 +714,9 @@ public String toString() { sb.append(", heapSize:"); sb.append(heapSize); + sb.append(", blockSize:"); + sb.append(blockSize); + sb.append(", sizeScope:"); sb.append(sizeScope); @@ -675,18 +745,22 @@ private static class ProgressFields { // The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as // such AND data cells of Cells which are in on heap area. long heapSize = DEFAULT_SIZE; + // The total amount of block bytes that have been loaded in order to process cells for the + // request. + long blockSize = DEFAULT_SIZE; - ProgressFields(int batch, long size, long heapSize) { - setFields(batch, size, heapSize); + ProgressFields(int batch, long size, long heapSize, long blockSize) { + setFields(batch, size, heapSize, blockSize); } /** * Set all fields together. */ - void setFields(int batch, long dataSize, long heapSize) { + void setFields(int batch, long dataSize, long heapSize, long blockSize) { setBatch(batch); setDataSize(dataSize); setHeapSize(heapSize); + setBlockSize(blockSize); } int getBatch() { @@ -705,10 +779,18 @@ long getHeapSize() { return this.heapSize; } + long getBlockSize() { + return this.blockSize; + } + void setDataSize(long dataSize) { this.dataSize = dataSize; } + void setBlockSize(long blockSize) { + this.blockSize = blockSize; + } + void setHeapSize(long heapSize) { this.heapSize = heapSize; } @@ -727,6 +809,9 @@ public String toString() { sb.append(", heapSize:"); sb.append(heapSize); + sb.append(", blockSize:"); + sb.append(blockSize); + sb.append("}"); return sb.toString(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index c5dbca6b6e2b..be6d1aae005b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -286,6 +286,11 @@ public boolean isFileScanner() { return false; } + @Override + public int getCurrentBlockSizeOnce() { + return 0; + } + @Override public Path getFilePath() { return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 74147f8ec059..6ae3cd32d8ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -450,6 +450,11 @@ public boolean isFileScanner() { return true; } + @Override + public int getCurrentBlockSizeOnce() { + return hfs.getCurrentBlockSizeOnce(); + } + @Override public Path getFilePath() { return reader.getHFileReader().getPath(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 5cfe67420074..d5339d81337e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -569,7 +569,7 @@ public boolean next(List outResult, ScannerContext scannerContext) throws } // Clear progress away unless invoker has indicated it should be kept. - if (!scannerContext.getKeepProgress()) { + if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) { scannerContext.clearProgress(); } @@ -612,6 +612,9 @@ public boolean next(List outResult, ScannerContext scannerContext) throws // here, we still need to scan all the qualifiers before returning... scannerContext.returnImmediately(); } + + scannerContext.incrementBlockProgress(heap.getCurrentBlockSizeOnce()); + prevCell = cell; scannerContext.setLastPeekedCell(cell); topChanged = false; @@ -750,6 +753,13 @@ public boolean next(List outResult, ScannerContext scannerContext) throws default: throw new RuntimeException("UNEXPECTED"); } + + // One last chance to break due to size limit. The INCLUDE* cases above already check + // limit and continue. For the various filtered cases, we need to check because block + // size limit may have been exceeded even if we don't add cells to result list. + if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + } } while ((cell = this.heap.peek()) != null); if (count > 0) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java index f16008f29db1..fe0a74058cce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileReaderImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import org.apache.hadoop.conf.Configuration; @@ -81,6 +82,40 @@ Path makeNewFile() throws IOException { return ncTFile; } + /** + * Test that we only count block size once per block while scanning + */ + @Test + public void testGetCurrentBlockSizeOnce() throws IOException { + Path p = makeNewFile(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Configuration conf = TEST_UTIL.getConfiguration(); + HFile.Reader reader = HFile.createReader(fs, p, CacheConfig.DISABLED, true, conf); + + try (HFileReaderImpl.HFileScannerImpl scanner = + (HFileReaderImpl.HFileScannerImpl) reader.getScanner(conf, true, true, false)) { + scanner.seekTo(); + + assertTrue("expected non-zero block size on first request", + scanner.getCurrentBlockSizeOnce() > 0); + assertEquals("expected zero block size on second request", 0, + scanner.getCurrentBlockSizeOnce()); + + int blocks = 0; + while (scanner.next()) { + int blockSize = scanner.getCurrentBlockSizeOnce(); + if (blockSize > 0) { + blocks++; + // there's only 2 cells in the second block + assertTrue("expected remaining block to be less than block size", + blockSize < toKV("a").getLength() * 3); + } + } + + assertEquals("expected only one remaining block but got " + blocks, 1, blocks); + } + } + @Test public void testSeekBefore() throws Exception { Path p = makeNewFile(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java index 373e138a764b..e5d742da1b37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java @@ -113,4 +113,9 @@ public boolean seekToLastRow() throws IOException { public Cell getNextIndexedKey() { return delegate.getNextIndexedKey(); } + + @Override + public int getCurrentBlockSizeOnce() { + return delegate.getCurrentBlockSizeOnce(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java new file mode 100644 index 000000000000..ff3d83ea2f5c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ColumnPaginationFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.SkipFilter; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class }) +public class TestScannerBlockSizeLimits { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestScannerBlockSizeLimits.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final TableName TABLE = TableName.valueOf("TestScannerBlockSizeLimits"); + private static final byte[] FAMILY1 = Bytes.toBytes("0"); + private static final byte[] FAMILY2 = Bytes.toBytes("1"); + + private static final byte[] DATA = new byte[1000]; + private static final byte[][] FAMILIES = new byte[][] { FAMILY1, FAMILY2 }; + + private static final byte[] COLUMN1 = Bytes.toBytes(0); + private static final byte[] COLUMN2 = Bytes.toBytes(1); + private static final byte[] COLUMN3 = Bytes.toBytes(2); + private static final byte[] COLUMN4 = Bytes.toBytes(4); + private static final byte[] COLUMN5 = Bytes.toBytes(5); + + private static final byte[][] COLUMNS = new byte[][] { COLUMN1, COLUMN2 }; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4200); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE, FAMILIES, 1, 2048); + createTestData(); + } + + private static void createTestData() throws IOException, InterruptedException { + RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE); + String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); + HRegion region = TEST_UTIL.getRSForFirstRegionInTable(TABLE).getRegion(regionName); + + for (int i = 1; i < 10; i++) { + // 5 columns per row, in 2 families + // Each column value is 1000 bytes, which is enough to fill a full block with row and header. + // So 5 blocks per row. + Put put = new Put(Bytes.toBytes(i)); + for (int j = 0; j < 6; j++) { + put.addColumn(FAMILY1, Bytes.toBytes(j), DATA); + } + + put.addColumn(FAMILY2, COLUMN1, DATA); + + region.put(put); + + if (i % 2 == 0) { + region.flush(true); + } + } + + // we've created 10 storefiles at this point, 5 per family + region.flush(true); + + } + + /** + * Simplest test that ensures we don't count block sizes too much. These 2 requested cells are in + * the same block, so should be returned in 1 request. If we mis-counted blocks, it'd come in 2 + * requests. + */ + @Test + public void testSingleBlock() throws IOException { + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + ResultScanner scanner = + table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1)).withStopRow(Bytes.toBytes(2)) + .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM)); + + ScanMetrics metrics = scanner.getScanMetrics(); + + scanner.next(100); + + assertEquals(1, metrics.countOfRowsScanned.get()); + assertEquals(1, metrics.countOfRPCcalls.get()); + } + + /** + * Tests that we check size limit after filterRowKey. When filterRowKey, we call nextRow to skip + * to next row. This should be efficient in this case, but we still need to check size limits + * after each row is processed. So in this test, we accumulate some block IO reading row 1, then + * skip row 2 and should return early at that point. The next rpc call starts with row3 blocks + * loaded, so can return the whole row in one rpc. If we were not checking size limits, we'd have + * been able to load an extra row 3 cell into the first rpc and thus split row 3 across multiple + * Results. + */ + @Test + public void testCheckLimitAfterFilterRowKey() throws IOException { + + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + ResultScanner scanner = table.getScanner(getBaseScan().addColumn(FAMILY1, COLUMN1) + .addColumn(FAMILY1, COLUMN2).addColumn(FAMILY1, COLUMN3).addFamily(FAMILY2) + .setFilter(new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(2))))); + + boolean foundRow3 = false; + for (Result result : scanner) { + Set rows = new HashSet<>(); + for (Cell cell : result.rawCells()) { + rows.add(Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + } + if (rows.contains(3)) { + assertFalse("expected row3 to come all in one result, but found it in two results", + foundRow3); + assertEquals(1, rows.size()); + foundRow3 = true; + } + } + ScanMetrics metrics = scanner.getScanMetrics(); + + // 4 blocks per row, so 36 blocks. We can scan 3 blocks per RPC, which is 12 RPCs. But we can + // skip 1 row, so skip 2 RPCs. + assertEquals(10, metrics.countOfRPCcalls.get()); + } + + /** + * After RegionScannerImpl.populateResults, row filters are run. If row is excluded, nextRow() is + * called which might accumulate more block IO. Validates that in this case we still honor block + * limits. + */ + @Test + public void testCheckLimitAfterFilteringRowCells() throws IOException { + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + ResultScanner scanner = table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1), true) + .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1, COLUMN2).setReadType(Scan.ReadType.STREAM) + .setFilter(new SkipFilter(new QualifierFilter(CompareOperator.EQUAL, + new BinaryComparator(Bytes.toBytes("dasfasf")))))); + + // Our filter doesn't end up matching any real columns, so expect only cursors + for (Result result : scanner) { + assertTrue(result.isCursor()); + } + + ScanMetrics metrics = scanner.getScanMetrics(); + + // scanning over 9 rows, filtering on 2 contiguous columns each, so 9 blocks total + // limited to 4200 bytes per which is enough for 3 blocks (exceed limit after loading 3rd) + // so that's 3 RPC and the last RPC pulls the cells loaded by the last block + assertEquals(4, metrics.countOfRPCcalls.get()); + } + + /** + * At the end of the loop in StoreScanner, we do one more check of size limits. This is to catch + * block size being exceeded while filtering cells within a store. Test to ensure that we do that, + * otherwise we'd see no cursors below. + */ + @Test + public void testCheckLimitAfterFilteringCell() throws IOException { + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + ResultScanner scanner = table.getScanner(getBaseScan() + .setFilter(new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(COLUMN2)))); + + int cursors = 0; + for (Result result : scanner) { + if (result.isCursor()) { + cursors++; + } + } + ScanMetrics metrics = scanner.getScanMetrics(); + + // We will return 9 rows, but also 2 cursors because we exceed the scan size limit partway + // through. So that accounts for 11 rpcs. + assertEquals(2, cursors); + assertEquals(11, metrics.countOfRPCcalls.get()); + } + + /** + * Tests that when we seek over blocks we dont include them in the block size of the request + */ + @Test + public void testSeekNextUsingHint() throws IOException { + Table table = TEST_UTIL.getConnection().getTable(TABLE); + + ResultScanner scanner = table.getScanner( + getBaseScan().addFamily(FAMILY1).setFilter(new ColumnPaginationFilter(1, COLUMN5))); + + scanner.next(100); + ScanMetrics metrics = scanner.getScanMetrics(); + + // We have to read the first cell/block of each row, then can skip to the last block. So that's + // 2 blocks per row to read (18 total). Our max scan size is enough to read 3 blocks per RPC, + // plus one final RPC to finish region. + assertEquals(7, metrics.countOfRPCcalls.get()); + } + + /** + * We enable cursors and partial results to give us more granularity over counting of results, and + * we enable STREAM so that no auto switching from pread to stream occurs -- this throws off the + * rpc counts. + */ + private Scan getBaseScan() { + return new Scan().setScanMetricsEnabled(true).setNeedCursorResult(true) + .setAllowPartialResults(true).setReadType(Scan.ReadType.STREAM); + } +}