Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27570 Unify tracking of block IO across all read request types #5004

Merged
merged 2 commits into from
Feb 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -45,7 +44,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.CacheEvictionStats;
import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -710,7 +708,6 @@ private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
List<ClientProtos.Action> mutations = null;
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
IOException sizeIOE = null;
Object lastBlock = null;
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
ResultOrException.newBuilder();
boolean hasResultOrException = false;
Expand Down Expand Up @@ -835,7 +832,7 @@ private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
} else {
pbResult = ProtobufUtil.toResult(r);
}
lastBlock = addSize(context, r, lastBlock);
addSize(context, r);
hasResultOrException = true;
resultOrExceptionBuilder.setResult(pbResult);
}
Expand Down Expand Up @@ -1293,44 +1290,17 @@ long getScannerVirtualTime(long scannerId) {
}

/**
* Method to account for the size of retained cells and retained data blocks.
* @param context rpc call context
* @param r result to add size.
* @param lastBlock last block to check whether we need to add the block size in context.
* Method to account for the size of retained cells.
* @param context rpc call context
* @param r result to add size.
* @return an object that represents the last referenced block from this response.
*/
Object addSize(RpcCallContext context, Result r, Object lastBlock) {
void addSize(RpcCallContext context, Result r) {
if (context != null && r != null && !r.isEmpty()) {
for (Cell c : r.rawCells()) {
context.incrementResponseCellSize(PrivateCellUtil.estimatedSerializedSizeOf(c));

// Since byte buffers can point all kinds of crazy places it's harder to keep track
// of which blocks are kept alive by what byte buffer.
// So we make a guess.
if (c instanceof ByteBufferExtendedCell) {
ByteBufferExtendedCell bbCell = (ByteBufferExtendedCell) c;
ByteBuffer bb = bbCell.getValueByteBuffer();
if (bb != lastBlock) {
context.incrementResponseBlockSize(bb.capacity());
lastBlock = bb;
}
} else {
// We're using the last block being the same as the current block as
// a proxy for pointing to a new block. This won't be exact.
// If there are multiple gets that bounce back and forth
// Then it's possible that this will over count the size of
// referenced blocks. However it's better to over count and
// use two rpcs than to OOME the regionserver.
byte[] valueArray = c.getValueArray();
if (valueArray != lastBlock) {
context.incrementResponseBlockSize(valueArray.length);
lastBlock = valueArray;
}
}

}
}
return lastBlock;
}

/** Returns Remote client's ip and port else null if can't be determined. */
Expand Down Expand Up @@ -2515,7 +2485,7 @@ public GetResponse get(final RpcController controller, final GetRequest request)
pbr = ProtobufUtil.toResultNoData(r);
((HBaseRpcController) controller)
.setCellScanner(CellUtil.createCellScanner(r.rawCells()));
addSize(context, r, null);
addSize(context, r);
} else {
pbr = ProtobufUtil.toResult(r);
}
Expand Down Expand Up @@ -2957,7 +2927,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, result.getResult(), controller, clientCellBlockSupported);
if (clientCellBlockSupported) {
addSize(context, result.getResult(), null);
addSize(context, result.getResult());
}
} else {
Result r = null;
Expand Down Expand Up @@ -2989,7 +2959,7 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque
boolean clientCellBlockSupported = isClientCellBlockSupport(context);
addResult(builder, r, controller, clientCellBlockSupported);
if (clientCellBlockSupported) {
addSize(context, r, null);
addSize(context, r);
}
}
return builder.build();
Expand Down Expand Up @@ -3343,9 +3313,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
// of heap size occupied by cells(being read). Cell data means its key and value parts.
// maxQuotaResultSize - max results just from server side configuration and quotas, without
// user's specified max. We use this for evaluating limits based on blocks (not cells).
// We may have accumulated some results in coprocessor preScannerNext call. We estimate
// block and cell size of those using call to addSize. Update our maximums for scanner
// context so we can account for them in the real scan.
// We may have accumulated some results in coprocessor preScannerNext call. Subtract any
// cell or block size from maximum here so we adhere to total limits of request.
// Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
// have accumulated block bytes. If not, this will be 0 for block size.
long maxCellSize = maxResultSize;
long maxBlockSize = maxQuotaResultSize;
if (rpcCall != null) {
Expand Down Expand Up @@ -3461,7 +3432,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
values.clear();
}
if (rpcCall != null) {
rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress());
rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
}
builder.setMoreResultsInRegion(moreRows);
Expand Down Expand Up @@ -3634,18 +3604,11 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
if (region.getCoprocessorHost() != null) {
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
if (!results.isEmpty()) {
// If scanner CP added results to list, we want to account for cell and block size of
// that work. We estimate this using addSize, since CP does not get ScannerContext. If
// !done, the actual scan call below will use more accurate ScannerContext block and
// cell size tracking for the rest of the request. The two result sets will be added
// together in the RpcCall accounting.
// This here is just an estimate (see addSize for more details on estimation). We don't
// pass lastBlock to the scan call below because the real scan uses ScannerContext,
// which does not use lastBlock tracking. This may result in over counting by 1 block,
// but that is unlikely since addSize is already a rough estimate.
Object lastBlock = null;
for (Result r : results) {
lastBlock = addSize(rpcCall, r, lastBlock);
// add cell size from CP results so we can track response size and update limits
// when calling scan below if !done. We'll also have tracked block size if the CP
// got results from hbase, since StoreScanner tracks that for all calls automatically.
addSize(rpcCall, r);
}
}
if (bypass != null && bypass.booleanValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.Cell;
Expand All @@ -36,6 +37,8 @@
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
Expand Down Expand Up @@ -573,6 +576,9 @@ public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws
scannerContext.clearProgress();
}

Optional<RpcCall> rpcCall =
matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();

int count = 0;
long totalBytesRead = 0;
// track the cells for metrics only if it is a user read request.
Expand Down Expand Up @@ -613,7 +619,12 @@ public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws
scannerContext.returnImmediately();
}

heap.recordBlockSize(scannerContext::incrementBlockProgress);
heap.recordBlockSize(blockSize -> {
if (rpcCall.isPresent()) {
rpcCall.get().incrementResponseBlockSize(blockSize);
}
scannerContext.incrementBlockProgress(blockSize);
});

prevCell = cell;
scannerContext.setLastPeekedCell(cell);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,26 @@

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.logging.Log4jUtils;
import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.ClientTests;
Expand Down Expand Up @@ -62,7 +67,7 @@ public class TestMultiRespectsLimits {
private static final MetricsAssertHelper METRICS_ASSERT =
CompatibilityFactory.getInstance(MetricsAssertHelper.class);
private final static byte[] FAMILY = Bytes.toBytes("D");
public static final int MAX_SIZE = 100;
public static final int MAX_SIZE = 90;
private static String LOG_LEVEL;

@Rule
Expand Down Expand Up @@ -148,6 +153,10 @@ public void testBlockMultiLimits() throws Exception {
Bytes.toBytes("3"), // Get This
Bytes.toBytes("4"), // Buffer
Bytes.toBytes("5"), // Buffer
Bytes.toBytes("6"), // Buffer
Bytes.toBytes("7"), // Get This
Bytes.toBytes("8"), // Buffer
Bytes.toBytes("9"), // Buffer
};

// Set the value size so that one result will be less than the MAX_SIZE
Expand All @@ -156,7 +165,12 @@ public void testBlockMultiLimits() throws Exception {
byte[] value = new byte[1];
Bytes.random(value);

for (byte[] col : cols) {
for (int i = 0; i < cols.length; i++) {
if (i == 6) {
// do a flush here so we end up with 2 blocks, 55 and 45 bytes
flush(regionServer, tableName);
}
byte[] col = cols[i];
Put p = new Put(row);
p.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row).setFamily(FAMILY)
.setQualifier(col).setTimestamp(p.getTimestamp()).setType(Cell.Type.Put).setValue(value)
Expand All @@ -165,28 +179,43 @@ public void testBlockMultiLimits() throws Exception {
}

// Make sure that a flush happens
try (final Admin admin = TEST_UTIL.getAdmin()) {
admin.flush(tableName);
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return regionServer.getRegions(tableName).get(0).getMaxFlushedSeqId() > 3;
}
});
}

List<Get> gets = new ArrayList<>(2);
Get g0 = new Get(row);
g0.addColumn(FAMILY, cols[0]);
flush(regionServer, tableName);

List<Get> gets = new ArrayList<>(4);
// This get returns nothing since the filter doesn't match. Filtered cells still retain
// blocks, and this is a full row scan of both blocks. This equals 100 bytes so we should
// throw a multiResponseTooLarge after this get if we are counting filtered cells correctly.
Get g0 = new Get(row).addFamily(FAMILY).setFilter(
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("sdf"))));
Comment on lines +185 to +189
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old method removed from addSize would not count any block io for this get.

gets.add(g0);

// g1 and g2 each count the first 55 byte block, so we end up with block size of 110
// after g2 and throw a multiResponseTooLarge before g3
Get g1 = new Get(row);
g1.addColumn(FAMILY, cols[0]);
gets.add(g1);

Get g2 = new Get(row);
g2.addColumn(FAMILY, cols[3]);
gets.add(g2);
Comment on lines +192 to 200
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old method would have returned these 2 gets in the same rpc, because of lastBlock tracking (both columns in the same block). We could add that functionality back with a change like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but this only works because of the order of the gets g1, g2, g3. if we instead ordered it g1, g3, g2 lastBlock would not help.


Get g3 = new Get(row);
g3.addColumn(FAMILY, cols[7]);
gets.add(g3);

Result[] results = t.get(gets);
assertEquals(2, results.length);
METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions, s);
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge", startingMultiExceptions, s);
assertEquals(4, results.length);
// Expect 2 exceptions (thus 3 rpcs) -- one for g0, then another for g1 + g2, final rpc for g3.
// If we tracked lastBlock we could squeeze g3 into the second rpc because g2 would be "free"
// since it's in the same block as g1.
METRICS_ASSERT.assertCounterGt("exceptions", startingExceptions + 1, s);
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge", startingMultiExceptions + 1,
s);
}

private void flush(HRegionServer regionServer, TableName tableName) throws IOException {
for (HRegion region : regionServer.getRegions(tableName)) {
region.flush(true);
}
}
}