diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 481a1374acd8..1c36e4b05fda 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -24,7 +24,6 @@ import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -45,7 +44,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ByteBufferExtendedCell; import org.apache.hadoop.hbase.CacheEvictionStats; import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; import org.apache.hadoop.hbase.Cell; @@ -710,7 +708,6 @@ private List doNonAtomicRegionMutation(final HRegion region, List mutations = null; long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); IOException sizeIOE = null; - Object lastBlock = null; ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); boolean hasResultOrException = false; @@ -835,7 +832,7 @@ private List doNonAtomicRegionMutation(final HRegion region, } else { pbResult = ProtobufUtil.toResult(r); } - lastBlock = addSize(context, r, lastBlock); + addSize(context, r); hasResultOrException = true; resultOrExceptionBuilder.setResult(pbResult); } @@ -1293,44 +1290,17 @@ long getScannerVirtualTime(long scannerId) { } /** - * Method to account for the size of retained cells and retained data blocks. - * @param context rpc call context - * @param r result to add size. - * @param lastBlock last block to check whether we need to add the block size in context. + * Method to account for the size of retained cells. + * @param context rpc call context + * @param r result to add size. * @return an object that represents the last referenced block from this response. */ - Object addSize(RpcCallContext context, Result r, Object lastBlock) { + void addSize(RpcCallContext context, Result r) { if (context != null && r != null && !r.isEmpty()) { for (Cell c : r.rawCells()) { context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); - - // Since byte buffers can point all kinds of crazy places it's harder to keep track - // of which blocks are kept alive by what byte buffer. - // So we make a guess. - if (c instanceof ByteBufferExtendedCell) { - ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c; - ByteBuffer bb = bbCell.getValueByteBuffer(); - if (bb != lastBlock) { - context.incrementResponseBlockSize(bb.capacity()); - lastBlock = bb; - } - } else { - // We're using the last block being the same as the current block as - // a proxy for pointing to a new block. This won't be exact. - // If there are multiple gets that bounce back and forth - // Then it's possible that this will over count the size of - // referenced blocks. However it's better to over count and - // use two rpcs than to OOME the regionserver. - byte[] valueArray = c.getValueArray(); - if (valueArray != lastBlock) { - context.incrementResponseBlockSize(valueArray.length); - lastBlock = valueArray; - } - } - } } - return lastBlock; } /** Returns Remote client's ip and port else null if can't be determined. */ @@ -2515,7 +2485,7 @@ public GetResponse get(final RpcController controller, final GetRequest request) pbr = ProtobufUtil.toResultNoData(r); ((HBaseRpcController) controller) .setCellScanner(CellUtil.createCellScanner(r.rawCells())); - addSize(context, r, null); + addSize(context, r); } else { pbr = ProtobufUtil.toResult(r); } @@ -2957,7 +2927,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque boolean clientCellBlockSupported = isClientCellBlockSupport(context); addResult(builder, result.getResult(), controller, clientCellBlockSupported); if (clientCellBlockSupported) { - addSize(context, result.getResult(), null); + addSize(context, result.getResult()); } } else { Result r = null; @@ -2989,7 +2959,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque boolean clientCellBlockSupported = isClientCellBlockSupport(context); addResult(builder, r, controller, clientCellBlockSupported); if (clientCellBlockSupported) { - addSize(context, r, null); + addSize(context, r); } } return builder.build(); @@ -3343,9 +3313,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan // of heap size occupied by cells(being read). Cell data means its key and value parts. // maxQuotaResultSize - max results just from server side configuration and quotas, without // user's specified max. We use this for evaluating limits based on blocks (not cells). - // We may have accumulated some results in coprocessor preScannerNext call. We estimate - // block and cell size of those using call to addSize. Update our maximums for scanner - // context so we can account for them in the real scan. + // We may have accumulated some results in coprocessor preScannerNext call. Subtract any + // cell or block size from maximum here so we adhere to total limits of request. + // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will + // have accumulated block bytes. If not, this will be 0 for block size. long maxCellSize = maxResultSize; long maxBlockSize = maxQuotaResultSize; if (rpcCall != null) { @@ -3461,7 +3432,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan values.clear(); } if (rpcCall != null) { - rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress()); rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); } builder.setMoreResultsInRegion(moreRows); @@ -3634,18 +3604,11 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque if (region.getCoprocessorHost() != null) { Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); if (!results.isEmpty()) { - // If scanner CP added results to list, we want to account for cell and block size of - // that work. We estimate this using addSize, since CP does not get ScannerContext. If - // !done, the actual scan call below will use more accurate ScannerContext block and - // cell size tracking for the rest of the request. The two result sets will be added - // together in the RpcCall accounting. - // This here is just an estimate (see addSize for more details on estimation). We don't - // pass lastBlock to the scan call below because the real scan uses ScannerContext, - // which does not use lastBlock tracking. This may result in over counting by 1 block, - // but that is unlikely since addSize is already a rough estimate. - Object lastBlock = null; for (Result r : results) { - lastBlock = addSize(rpcCall, r, lastBlock); + // add cell size from CP results so we can track response size and update limits + // when calling scan below if !done. We'll also have tracked block size if the CP + // got results from hbase, since StoreScanner tracks that for all calls automatically. + addSize(rpcCall, r); } } if (bypass != null && bypass.booleanValue()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index d46b28d62cf9..949cb9f54b55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.Cell; @@ -36,6 +37,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; @@ -573,6 +576,9 @@ public boolean next(List outResult, ScannerContext scannerContext) throws scannerContext.clearProgress(); } + Optional rpcCall = + matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty(); + int count = 0; long totalBytesRead = 0; // track the cells for metrics only if it is a user read request. @@ -613,7 +619,12 @@ public boolean next(List outResult, ScannerContext scannerContext) throws scannerContext.returnImmediately(); } - heap.recordBlockSize(scannerContext::incrementBlockProgress); + heap.recordBlockSize(blockSize -> { + if (rpcCall.isPresent()) { + rpcCall.get().incrementResponseBlockSize(blockSize); + } + scannerContext.incrementBlockProgress(blockSize); + }); prevCell = cell; scannerContext.setLastPeekedCell(cell); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java index f57365776033..86847e1b7150 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java @@ -62,7 +62,7 @@ public class TestMultiRespectsLimits { private static final MetricsAssertHelper METRICS_ASSERT = CompatibilityFactory.getInstance(MetricsAssertHelper.class); private final static byte[] FAMILY = Bytes.toBytes("D"); - public static final int MAX_SIZE = 100; + public static final int MAX_SIZE = 50; private static String LOG_LEVEL; @Rule