diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 6240e68d4927..4522efddc406 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -26,7 +26,6 @@ import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -48,7 +47,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ByteBufferExtendedCell; import org.apache.hadoop.hbase.CacheEvictionStats; import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; import org.apache.hadoop.hbase.Cell; @@ -764,7 +762,6 @@ private List doNonAtomicRegionMutation(final HRegion region, List mutations = null; long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); IOException sizeIOE = null; - Object lastBlock = null; ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder(); boolean hasResultOrException = false; @@ -889,7 +886,7 @@ private List doNonAtomicRegionMutation(final HRegion region, } else { pbResult = ProtobufUtil.toResult(r); } - lastBlock = addSize(context, r, lastBlock); + addSize(context, r); hasResultOrException = true; resultOrExceptionBuilder.setResult(pbResult); } @@ -1375,44 +1372,17 @@ long getScannerVirtualTime(long scannerId) { } /** - * Method to account for the size of retained cells and retained data blocks. - * @param context rpc call context - * @param r result to add size. - * @param lastBlock last block to check whether we need to add the block size in context. + * Method to account for the size of retained cells. + * @param context rpc call context + * @param r result to add size. * @return an object that represents the last referenced block from this response. */ - Object addSize(RpcCallContext context, Result r, Object lastBlock) { + void addSize(RpcCallContext context, Result r) { if (context != null && r != null && !r.isEmpty()) { for (Cell c : r.rawCells()) { context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c)); - - // Since byte buffers can point all kinds of crazy places it's harder to keep track - // of which blocks are kept alive by what byte buffer. - // So we make a guess. - if (c instanceof ByteBufferExtendedCell) { - ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c; - ByteBuffer bb = bbCell.getValueByteBuffer(); - if (bb != lastBlock) { - context.incrementResponseBlockSize(bb.capacity()); - lastBlock = bb; - } - } else { - // We're using the last block being the same as the current block as - // a proxy for pointing to a new block. This won't be exact. - // If there are multiple gets that bounce back and forth - // Then it's possible that this will over count the size of - // referenced blocks. However it's better to over count and - // use two rpcs than to OOME the regionserver. - byte[] valueArray = c.getValueArray(); - if (valueArray != lastBlock) { - context.incrementResponseBlockSize(valueArray.length); - lastBlock = valueArray; - } - } - } } - return lastBlock; } /** Returns Remote client's ip and port else null if can't be determined. */ @@ -2599,7 +2569,7 @@ public GetResponse get(final RpcController controller, final GetRequest request) pbr = ProtobufUtil.toResultNoData(r); ((HBaseRpcController) controller) .setCellScanner(CellUtil.createCellScanner(r.rawCells())); - addSize(context, r, null); + addSize(context, r); } else { pbr = ProtobufUtil.toResult(r); } @@ -2986,7 +2956,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque boolean clientCellBlockSupported = isClientCellBlockSupport(context); addResult(builder, result.getResult(), controller, clientCellBlockSupported); if (clientCellBlockSupported) { - addSize(context, result.getResult(), null); + addSize(context, result.getResult()); } } else { Result r = null; @@ -3018,7 +2988,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque boolean clientCellBlockSupported = isClientCellBlockSupport(context); addResult(builder, r, controller, clientCellBlockSupported); if (clientCellBlockSupported) { - addSize(context, r, null); + addSize(context, r); } } return builder.build(); @@ -3370,9 +3340,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan // of heap size occupied by cells(being read). Cell data means its key and value parts. // maxQuotaResultSize - max results just from server side configuration and quotas, without // user's specified max. We use this for evaluating limits based on blocks (not cells). - // We may have accumulated some results in coprocessor preScannerNext call. We estimate - // block and cell size of those using call to addSize. Update our maximums for scanner - // context so we can account for them in the real scan. + // We may have accumulated some results in coprocessor preScannerNext call. Subtract any + // cell or block size from maximum here so we adhere to total limits of request. + // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will + // have accumulated block bytes. If not, this will be 0 for block size. long maxCellSize = maxResultSize; long maxBlockSize = maxQuotaResultSize; if (rpcCall != null) { @@ -3488,7 +3459,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan values.clear(); } if (rpcCall != null) { - rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress()); rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress()); } builder.setMoreResultsInRegion(moreRows); @@ -3661,18 +3631,11 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque if (region.getCoprocessorHost() != null) { Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows); if (!results.isEmpty()) { - // If scanner CP added results to list, we want to account for cell and block size of - // that work. We estimate this using addSize, since CP does not get ScannerContext. If - // !done, the actual scan call below will use more accurate ScannerContext block and - // cell size tracking for the rest of the request. The two result sets will be added - // together in the RpcCall accounting. - // This here is just an estimate (see addSize for more details on estimation). We don't - // pass lastBlock to the scan call below because the real scan uses ScannerContext, - // which does not use lastBlock tracking. This may result in over counting by 1 block, - // but that is unlikely since addSize is already a rough estimate. - Object lastBlock = null; for (Result r : results) { - lastBlock = addSize(rpcCall, r, lastBlock); + // add cell size from CP results so we can track response size and update limits + // when calling scan below if !done. We'll also have tracked block size if the CP + // got results from hbase, since StoreScanner tracks that for all calls automatically. + addSize(rpcCall, r); } } if (bypass != null && bypass.booleanValue()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 45fdb3e70a9c..94166bc52ec6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.Cell; @@ -36,6 +37,8 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; @@ -573,6 +576,9 @@ public boolean next(List outResult, ScannerContext scannerContext) throws scannerContext.clearProgress(); } + Optional rpcCall = + matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty(); + int count = 0; long totalBytesRead = 0; boolean onlyFromMemstore = matcher.isUserScan(); @@ -612,7 +618,12 @@ public boolean next(List outResult, ScannerContext scannerContext) throws scannerContext.returnImmediately(); } - heap.recordBlockSize(scannerContext::incrementBlockProgress); + heap.recordBlockSize(blockSize -> { + if (rpcCall.isPresent()) { + rpcCall.get().incrementResponseBlockSize(blockSize); + } + scannerContext.incrementBlockProgress(blockSize); + }); prevCell = cell; scannerContext.setLastPeekedCell(cell); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java index c377714853f0..16d373191a3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiRespectsLimits.java @@ -19,11 +19,13 @@ import static junit.framework.TestCase.assertEquals; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.CompatibilityFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -32,10 +34,13 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.logging.Log4jUtils; import org.apache.hadoop.hbase.metrics.BaseSource; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -64,7 +69,7 @@ public class TestMultiRespectsLimits { private static final MetricsAssertHelper METRICS_ASSERT = CompatibilityFactory.getInstance(MetricsAssertHelper.class); private final static byte[] FAMILY = Bytes.toBytes("D"); - public static final int MAX_SIZE = 100; + public static final int MAX_SIZE = 90; private static String LOG_LEVEL; @Rule @@ -152,6 +157,10 @@ public void testBlockMultiLimits() throws Exception { Bytes.toBytes("3"), // Get This Bytes.toBytes("4"), // Buffer Bytes.toBytes("5"), // Buffer + Bytes.toBytes("6"), // Buffer + Bytes.toBytes("7"), // Get This + Bytes.toBytes("8"), // Buffer + Bytes.toBytes("9"), // Buffer }; // Set the value size so that one result will be less than the MAX_SIZE @@ -160,7 +169,12 @@ public void testBlockMultiLimits() throws Exception { byte[] value = new byte[1]; Bytes.random(value); - for (byte[] col : cols) { + for (int i = 0; i < cols.length; i++) { + if (i == 6) { + // do a flush here so we end up with 2 blocks, 55 and 45 bytes + flush(regionServer, tableName); + } + byte[] col = cols[i]; Put p = new Put(row); p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row).setFamily(FAMILY) .setQualifier(col).setTimestamp(p.getTimestamp()).setType(Cell.Type.Put).setValue(value) @@ -169,28 +183,43 @@ public void testBlockMultiLimits() throws Exception { } // Make sure that a flush happens - try (final Admin admin = TEST_UTIL.getAdmin()) { - admin.flush(tableName); - TEST_UTIL.waitFor(60000, new Waiter.Predicate() { - @Override - public boolean evaluate() throws Exception { - return regionServer.getRegions(tableName).get(0).getMaxFlushedSeqId() > 3; - } - }); - } - - List gets = new ArrayList<>(2); - Get g0 = new Get(row); - g0.addColumn(FAMILY, cols[0]); + flush(regionServer, tableName); + + List gets = new ArrayList<>(4); + // This get returns nothing since the filter doesn't match. Filtered cells still retain + // blocks, and this is a full row scan of both blocks. This equals 100 bytes so we should + // throw a multiResponseTooLarge after this get if we are counting filtered cells correctly. + Get g0 = new Get(row).addFamily(FAMILY).setFilter( + new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("sdf")))); gets.add(g0); + // g1 and g2 each count the first 55 byte block, so we end up with block size of 110 + // after g2 and throw a multiResponseTooLarge before g3 + Get g1 = new Get(row); + g1.addColumn(FAMILY, cols[0]); + gets.add(g1); + Get g2 = new Get(row); g2.addColumn(FAMILY, cols[3]); gets.add(g2); + Get g3 = new Get(row); + g3.addColumn(FAMILY, cols[7]); + gets.add(g3); + Result[] results = t.get(gets); - assertEquals(2, results.length); - METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions, s); - METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge", startingMultiExceptions, s); + assertEquals(4, results.length); + // Expect 2 exceptions (thus 3 rpcs) -- one for g0, then another for g1 + g2, final rpc for g3. + // If we tracked lastBlock we could squeeze g3 into the second rpc because g2 would be "free" + // since it's in the same block as g1. + METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions + 1, s); + METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge", startingMultiExceptions + 1, + s); + } + + private void flush(HRegionServer regionServer, TableName tableName) throws IOException { + for (HRegion region : regionServer.getRegions(tableName)) { + region.flush(true); + } } }