Skip to content

Commit

Permalink
HBASE-27570 Unify tracking of block IO across all read request types (#…
Browse files Browse the repository at this point in the history
…5004)

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
bbeaudreault committed Feb 4, 2023
1 parent f619001 commit 8809c88
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -48,7 +47,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.CacheEvictionStats;
import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -767,7 +765,6 @@ private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
List<ClientProtos.Action> mutations = null;
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
IOException sizeIOE = null;
Object lastBlock = null;
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
ResultOrException.newBuilder();
boolean hasResultOrException = false;
Expand Down Expand Up @@ -892,7 +889,7 @@ private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
} else {
pbResult = ProtobufUtil.toResult(r);
}
lastBlock = addSize(context, r, lastBlock);
addSize(context, r);
hasResultOrException = true;
resultOrExceptionBuilder.setResult(pbResult);
}
Expand Down Expand Up @@ -1378,44 +1375,17 @@ long getScannerVirtualTime(long scannerId) {
}

/**
* Method to account for the size of retained cells and retained data blocks.
* @param context rpc call context
* @param r result to add size.
* @param lastBlock last block to check whether we need to add the block size in context.
* Method to account for the size of retained cells.
* @param context rpc call context
* @param r result to add size.
* @return an object that represents the last referenced block from this response.
*/
Object addSize(RpcCallContext context, Result r, Object lastBlock) {
void addSize(RpcCallContext context, Result r) {
if (context != null && r != null && !r.isEmpty()) {
for (Cell c : r.rawCells()) {
context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));

// Since byte buffers can point all kinds of crazy places it's harder to keep track
// of which blocks are kept alive by what byte buffer.
// So we make a guess.
if (c instanceof ByteBufferExtendedCell) {
ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c;
ByteBuffer bb = bbCell.getValueByteBuffer();
if (bb != lastBlock) {
context.incrementResponseBlockSize(bb.capacity());
lastBlock = bb;
}
} else {
// We're using the last block being the same as the current block as
// a proxy for pointing to a new block. This won't be exact.
// If there are multiple gets that bounce back and forth
// Then it's possible that this will over count the size of
// referenced blocks. However it's better to over count and
// use two rpcs than to OOME the regionserver.
byte[] valueArray = c.getValueArray();
if (valueArray != lastBlock) {
context.incrementResponseBlockSize(valueArray.length);
lastBlock = valueArray;
}
}

}
}
return lastBlock;
}

/** Returns Remote client's ip and port else null if can't be determined. */
Expand Down Expand Up @@ -2602,7 +2572,7 @@ public GetResponse get(final RpcController controller, final GetRequest request)
pbr = ProtobufUtil.toResultNoData(r);
((HBaseRpcController) controller)
.setCellScanner(CellUtil.createCellScanner(r.rawCells()));
addSize(context, r, null);
addSize(context, r);
} else {
pbr = ProtobufUtil.toResult(r);
}
Expand Down Expand Up @@ -2989,7 +2959,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, result.getResult(), controller, clientCellBlockSupported);
if (clientCellBlockSupported) {
addSize(context, result.getResult(), null);
addSize(context, result.getResult());
}
} else {
Result r = null;
Expand Down Expand Up @@ -3021,7 +2991,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, r, controller, clientCellBlockSupported);
if (clientCellBlockSupported) {
addSize(context, r, null);
addSize(context, r);
}
}
return builder.build();
Expand Down Expand Up @@ -3373,9 +3343,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
// of heap size occupied by cells(being read). Cell data means its key and value parts.
// maxQuotaResultSize - max results just from server side configuration and quotas, without
// user's specified max. We use this for evaluating limits based on blocks (not cells).
// We may have accumulated some results in coprocessor preScannerNext call. We estimate
// block and cell size of those using call to addSize. Update our maximums for scanner
// context so we can account for them in the real scan.
// We may have accumulated some results in coprocessor preScannerNext call. Subtract any
// cell or block size from maximum here so we adhere to total limits of request.
// Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
// have accumulated block bytes. If not, this will be 0 for block size.
long maxCellSize = maxResultSize;
long maxBlockSize = maxQuotaResultSize;
if (rpcCall != null) {
Expand Down Expand Up @@ -3491,7 +3462,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
values.clear();
}
if (rpcCall != null) {
rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress());
rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
}
builder.setMoreResultsInRegion(moreRows);
Expand Down Expand Up @@ -3664,18 +3634,11 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
if (region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
if (!results.isEmpty()) {
// If scanner CP added results to list, we want to account for cell and block size of
// that work. We estimate this using addSize, since CP does not get ScannerContext. If
// !done, the actual scan call below will use more accurate ScannerContext block and
// cell size tracking for the rest of the request. The two result sets will be added
// together in the RpcCall accounting.
// This here is just an estimate (see addSize for more details on estimation). We don't
// pass lastBlock to the scan call below because the real scan uses ScannerContext,
// which does not use lastBlock tracking. This may result in over counting by 1 block,
// but that is unlikely since addSize is already a rough estimate.
Object lastBlock = null;
for (Result r : results) {
lastBlock = addSize(rpcCall, r, lastBlock);
// add cell size from CP results so we can track response size and update limits
// when calling scan below if !done. We'll also have tracked block size if the CP
// got results from hbase, since StoreScanner tracks that for all calls automatically.
addSize(rpcCall, r);
}
}
if (bypass != null && bypass.booleanValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -36,6 +37,8 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
Expand Down Expand Up @@ -573,6 +576,9 @@ public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws
scannerContext.clearProgress();
}

Optional<RpcCall> rpcCall =
matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();

int count = 0;
long totalBytesRead = 0;
boolean onlyFromMemstore = matcher.isUserScan();
Expand Down Expand Up @@ -612,7 +618,12 @@ public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws
scannerContext.returnImmediately();
}

heap.recordBlockSize(scannerContext::incrementBlockProgress);
heap.recordBlockSize(blockSize -> {
if (rpcCall.isPresent()) {
rpcCall.get().incrementResponseBlockSize(blockSize);
}
scannerContext.incrementBlockProgress(blockSize);
});

prevCell = cell;
scannerContext.setLastPeekedCell(cell);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import static junit.framework.TestCase.assertEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
Expand All @@ -32,10 +34,13 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.logging.Log4jUtils;
import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.ClientTests;
Expand Down Expand Up @@ -64,7 +69,7 @@ public class TestMultiRespectsLimits {
private static final MetricsAssertHelper METRICS_ASSERT =
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
private final static byte[] FAMILY = Bytes.toBytes("D");
public static final int MAX_SIZE = 100;
public static final int MAX_SIZE = 90;
private static String LOG_LEVEL;

@Rule
Expand Down Expand Up @@ -152,6 +157,10 @@ public void testBlockMultiLimits() throws Exception {
Bytes.toBytes("3"), // Get This
Bytes.toBytes("4"), // Buffer
Bytes.toBytes("5"), // Buffer
Bytes.toBytes("6"), // Buffer
Bytes.toBytes("7"), // Get This
Bytes.toBytes("8"), // Buffer
Bytes.toBytes("9"), // Buffer
};

// Set the value size so that one result will be less than the MAX_SIZE
Expand All @@ -160,7 +169,12 @@ public void testBlockMultiLimits() throws Exception {
byte[] value = new byte[1];
Bytes.random(value);

for (byte[] col : cols) {
for (int i = 0; i < cols.length; i++) {
if (i == 6) {
// do a flush here so we end up with 2 blocks, 55 and 45 bytes
flush(regionServer, tableName);
}
byte[] col = cols[i];
Put p = new Put(row);
p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row).setFamily(FAMILY)
.setQualifier(col).setTimestamp(p.getTimestamp()).setType(Cell.Type.Put).setValue(value)
Expand All @@ -169,28 +183,43 @@ public void testBlockMultiLimits() throws Exception {
}

// Make sure that a flush happens
try (final Admin admin = TEST_UTIL.getAdmin()) {
admin.flush(tableName);
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return regionServer.getRegions(tableName).get(0).getMaxFlushedSeqId() > 3;
}
});
}

List<Get> gets = new ArrayList<>(2);
Get g0 = new Get(row);
g0.addColumn(FAMILY, cols[0]);
flush(regionServer, tableName);

List<Get> gets = new ArrayList<>(4);
// This get returns nothing since the filter doesn't match. Filtered cells still retain
// blocks, and this is a full row scan of both blocks. This equals 100 bytes so we should
// throw a multiResponseTooLarge after this get if we are counting filtered cells correctly.
Get g0 = new Get(row).addFamily(FAMILY).setFilter(
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("sdf"))));
gets.add(g0);

// g1 and g2 each count the first 55 byte block, so we end up with block size of 110
// after g2 and throw a multiResponseTooLarge before g3
Get g1 = new Get(row);
g1.addColumn(FAMILY, cols[0]);
gets.add(g1);

Get g2 = new Get(row);
g2.addColumn(FAMILY, cols[3]);
gets.add(g2);

Get g3 = new Get(row);
g3.addColumn(FAMILY, cols[7]);
gets.add(g3);

Result[] results = t.get(gets);
assertEquals(2, results.length);
METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions, s);
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge", startingMultiExceptions, s);
assertEquals(4, results.length);
// Expect 2 exceptions (thus 3 rpcs) -- one for g0, then another for g1 + g2, final rpc for g3.
// If we tracked lastBlock we could squeeze g3 into the second rpc because g2 would be "free"
// since it's in the same block as g1.
METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions + 1, s);
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge", startingMultiExceptions + 1,
s);
}

private void flush(HRegionServer regionServer, TableName tableName) throws IOException {
for (HRegion region : regionServer.getRegions(tableName)) {
region.flush(true);
}
}
}

0 comments on commit 8809c88

Please sign in to comment.