Skip to content

Commit

Permalink
HBASE-27570 Unify tracking of block IO across all read request types
Browse files Browse the repository at this point in the history
  • Loading branch information
bbeaudreault committed Jan 31, 2023
1 parent 8e72cc6 commit 786f2b4
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -45,7 +44,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.CacheEvictionStats;
import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -710,7 +708,6 @@ private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
List<ClientProtos.Action> mutations = null;
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
IOException sizeIOE = null;
Object lastBlock = null;
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
ResultOrException.newBuilder();
boolean hasResultOrException = false;
Expand Down Expand Up @@ -835,7 +832,7 @@ private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
} else {
pbResult = ProtobufUtil.toResult(r);
}
lastBlock = addSize(context, r, lastBlock);
addSize(context, r);
hasResultOrException = true;
resultOrExceptionBuilder.setResult(pbResult);
}
Expand Down Expand Up @@ -1293,44 +1290,17 @@ long getScannerVirtualTime(long scannerId) {
}

/**
* Method to account for the size of retained cells and retained data blocks.
* @param context rpc call context
* @param r result to add size.
* @param lastBlock last block to check whether we need to add the block size in context.
* Method to account for the size of retained cells.
* @param context rpc call context
* @param r result to add size.
* @return an object that represents the last referenced block from this response.
*/
Object addSize(RpcCallContext context, Result r, Object lastBlock) {
void addSize(RpcCallContext context, Result r) {
if (context != null && r != null && !r.isEmpty()) {
for (Cell c : r.rawCells()) {
context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));

// Since byte buffers can point all kinds of crazy places it's harder to keep track
// of which blocks are kept alive by what byte buffer.
// So we make a guess.
if (c instanceof ByteBufferExtendedCell) {
ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c;
ByteBuffer bb = bbCell.getValueByteBuffer();
if (bb != lastBlock) {
context.incrementResponseBlockSize(bb.capacity());
lastBlock = bb;
}
} else {
// We're using the last block being the same as the current block as
// a proxy for pointing to a new block. This won't be exact.
// If there are multiple gets that bounce back and forth
// Then it's possible that this will over count the size of
// referenced blocks. However it's better to over count and
// use two rpcs than to OOME the regionserver.
byte[] valueArray = c.getValueArray();
if (valueArray != lastBlock) {
context.incrementResponseBlockSize(valueArray.length);
lastBlock = valueArray;
}
}

}
}
return lastBlock;
}

/** Returns Remote client's ip and port else null if can't be determined. */
Expand Down Expand Up @@ -2515,7 +2485,7 @@ public GetResponse get(final RpcController controller, final GetRequest request)
pbr = ProtobufUtil.toResultNoData(r);
((HBaseRpcController) controller)
.setCellScanner(CellUtil.createCellScanner(r.rawCells()));
addSize(context, r, null);
addSize(context, r);
} else {
pbr = ProtobufUtil.toResult(r);
}
Expand Down Expand Up @@ -2957,7 +2927,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, result.getResult(), controller, clientCellBlockSupported);
if (clientCellBlockSupported) {
addSize(context, result.getResult(), null);
addSize(context, result.getResult());
}
} else {
Result r = null;
Expand Down Expand Up @@ -2989,7 +2959,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, r, controller, clientCellBlockSupported);
if (clientCellBlockSupported) {
addSize(context, r, null);
addSize(context, r);
}
}
return builder.build();
Expand Down Expand Up @@ -3343,9 +3313,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
// of heap size occupied by cells(being read). Cell data means its key and value parts.
// maxQuotaResultSize - max results just from server side configuration and quotas, without
// user's specified max. We use this for evaluating limits based on blocks (not cells).
// We may have accumulated some results in coprocessor preScannerNext call. We estimate
// block and cell size of those using call to addSize. Update our maximums for scanner
// context so we can account for them in the real scan.
// We may have accumulated some results in coprocessor preScannerNext call. Subtract any
// cell or block size from maximum here so we adhere to total limits of request.
// Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
// have accumulated block bytes. If not, this will be 0 for block size.
long maxCellSize = maxResultSize;
long maxBlockSize = maxQuotaResultSize;
if (rpcCall != null) {
Expand Down Expand Up @@ -3461,7 +3432,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
values.clear();
}
if (rpcCall != null) {
rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress());
rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
}
builder.setMoreResultsInRegion(moreRows);
Expand Down Expand Up @@ -3634,18 +3604,11 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
if (region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
if (!results.isEmpty()) {
// If scanner CP added results to list, we want to account for cell and block size of
// that work. We estimate this using addSize, since CP does not get ScannerContext. If
// !done, the actual scan call below will use more accurate ScannerContext block and
// cell size tracking for the rest of the request. The two result sets will be added
// together in the RpcCall accounting.
// This here is just an estimate (see addSize for more details on estimation). We don't
// pass lastBlock to the scan call below because the real scan uses ScannerContext,
// which does not use lastBlock tracking. This may result in over counting by 1 block,
// but that is unlikely since addSize is already a rough estimate.
Object lastBlock = null;
for (Result r : results) {
lastBlock = addSize(rpcCall, r, lastBlock);
// add cell size from CP results so we can track response size and update limits
// when calling scan below if !done. We'll also have tracked block size if the CP
// got results from hbase, since StoreScanner tracks that for all calls automatically.
addSize(rpcCall, r);
}
}
if (bypass != null && bypass.booleanValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -36,6 +37,8 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
Expand Down Expand Up @@ -573,6 +576,9 @@ public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws
scannerContext.clearProgress();
}

Optional<RpcCall> rpcCall =
matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();

int count = 0;
long totalBytesRead = 0;
// track the cells for metrics only if it is a user read request.
Expand Down Expand Up @@ -613,7 +619,12 @@ public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws
scannerContext.returnImmediately();
}

heap.recordBlockSize(scannerContext::incrementBlockProgress);
heap.recordBlockSize(blockSize -> {
if (rpcCall.isPresent()) {
rpcCall.get().incrementResponseBlockSize(blockSize);
}
scannerContext.incrementBlockProgress(blockSize);
});

prevCell = cell;
scannerContext.setLastPeekedCell(cell);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class TestMultiRespectsLimits {
private static final MetricsAssertHelper METRICS_ASSERT =
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
private final static byte[] FAMILY = Bytes.toBytes("D");
public static final int MAX_SIZE = 100;
public static final int MAX_SIZE = 50;
private static String LOG_LEVEL;

@Rule
Expand Down

0 comments on commit 786f2b4

Please sign in to comment.