Skip to content

Commit

Permalink
HBASE-28385 Improve scan quota estimates when using block bytes scann…
Browse files Browse the repository at this point in the history
…ed (apache#5713)

Signed-off-by: Bryan Beaudreault <[email protected]>
  • Loading branch information
rmdmattingly authored and bbeaudreault committed Mar 13, 2024
1 parent 56870b5 commit 8e98301
Show file tree
Hide file tree
Showing 12 changed files with 427 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DefaultOperationQuota implements OperationQuota {

// a single scan estimate can consume no more than this proportion of the limiter's limit
// this prevents a long-running scan from being estimated at, say, 100MB of IO against
// a <100MB/IO throttle (because this would never succeed)
private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9;

protected final List<QuotaLimiter> limiters;
private final long writeCapacityUnit;
private final long readCapacityUnit;
Expand All @@ -53,13 +60,17 @@ public class DefaultOperationQuota implements OperationQuota {
protected long readCapacityUnitDiff = 0;
private boolean useResultSizeBytes;
private long blockSizeBytes;
private long maxScanEstimate;

public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes,
final QuotaLimiter... limiters) {
this(conf, Arrays.asList(limiters));
this.useResultSizeBytes =
conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT);
this.blockSizeBytes = blockSizeBytes;
long readSizeLimit =
Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE);
maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit);
}

/**
Expand All @@ -80,21 +91,34 @@ public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter>
}

@Override
public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
updateEstimateConsumeQuota(numWrites, numReads, numScans);
public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
updateEstimateConsumeBatchQuota(numWrites, numReads);
checkQuota(numWrites, numReads);
}

@Override
public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned,
prevBlockBytesScannedDifference);
checkQuota(0, 1);
}

private void checkQuota(long numWrites, long numReads) throws RpcThrottlingException {
readAvailable = Long.MAX_VALUE;
for (final QuotaLimiter limiter : limiters) {
if (limiter.isBypass()) continue;
if (limiter.isBypass()) {
continue;
}

limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
}

for (final QuotaLimiter limiter : limiters) {
limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed,
readCapacityUnitConsumed);
}
}

Expand Down Expand Up @@ -158,24 +182,69 @@ public void addMutation(final Mutation mutation) {
* Update estimate quota(read/write size/capacityUnits) which will be consumed
* @param numWrites the number of write requests
* @param numReads the number of read requests
* @param numScans the number of scan requests
*/
protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) {
protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) {
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);

if (useResultSizeBytes) {
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
} else {
// assume 1 block required for reads. this is probably a low estimate, which is okay
readConsumed = numReads > 0 ? blockSizeBytes : 0;
readConsumed += numScans > 0 ? blockSizeBytes : 0;
}

writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
}

/**
* Update estimate quota(read/write size/capacityUnits) which will be consumed
* @param scanRequest the scan to be executed
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
* scanner
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
* calls
*/
protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest,
long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) {
if (useResultSizeBytes) {
readConsumed = estimateConsume(OperationType.SCAN, 1, 1000);
} else {
long estimate = getScanReadConsumeEstimate(blockSizeBytes, scanRequest.getNextCallSeq(),
maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference);
readConsumed = Math.min(maxScanEstimate, estimate);
}

readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
}

protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq,
long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) {
/*
* Estimating scan workload is more complicated, and if we severely underestimate workloads then
* throttled clients will exhaust retries too quickly, and could saturate the RPC layer
*/
if (nextCallSeq == 0) {
// start scanners with an optimistic 1 block IO estimate
// it is better to underestimate a large scan in the beginning
// than to overestimate, and block, a small scan
return blockSizeBytes;
}

boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes;
if (isWorkloadGrowing) {
// if nextCallSeq > 0 and the workload is growing then our estimate
// should consider that the workload may continue to increase
return Math.min(maxScannerResultSize, nextCallSeq * maxBlockBytesScanned);
} else {
// if nextCallSeq > 0 and the workload is shrinking or flat
// then our workload has likely plateaued. We can just rely on the existing
// maxBlockBytesScanned as our estimate in this case.
return maxBlockBytesScanned;
}
}

private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
if (numReqs > 0) {
return avgSize * numReqs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

/*
* Internal class used to check and consume quota if exceed throttle quota is enabled. Exceed
* throttle quota means, user can over consume user/namespace/table quota if region server has
Expand All @@ -47,27 +49,44 @@ public ExceedOperationQuota(final Configuration conf, int blockSizeBytes,
}

@Override
public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
Runnable estimateQuota = () -> updateEstimateConsumeBatchQuota(numWrites, numReads);
CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, numReads);
checkQuota(estimateQuota, checkQuota, numWrites, numReads, 0);
}

@Override
public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
Runnable estimateQuota = () -> updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize,
maxBlockBytesScanned, prevBlockBytesScannedDifference);
CheckQuotaRunnable checkQuota = () -> super.checkScanQuota(scanRequest, maxScannerResultSize,
maxBlockBytesScanned, prevBlockBytesScannedDifference);
checkQuota(estimateQuota, checkQuota, 0, 0, 1);
}

private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, int numWrites,
int numReads, int numScans) throws RpcThrottlingException {
if (regionServerLimiter.isBypass()) {
// If region server limiter is bypass, which means no region server quota is set, check and
// throttle by all other quotas. In this condition, exceed throttle quota will not work.
LOG.warn("Exceed throttle quota is enabled but no region server quotas found");
super.checkQuota(numWrites, numReads, numScans);
checkQuota.run();
} else {
// 1. Update estimate quota which will be consumed
updateEstimateConsumeQuota(numWrites, numReads, numScans);
estimateQuota.run();
// 2. Check if region server limiter is enough. If not, throw RpcThrottlingException.
regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
// 3. Check if other limiters are enough. If not, exceed other limiters because region server
// limiter is enough.
boolean exceed = false;
try {
super.checkQuota(numWrites, numReads, numScans);
checkQuota.run();
} catch (RpcThrottlingException e) {
exceed = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{} scan:{}, "
LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{}, scans:{}, "
+ "try use region server quota", numWrites, numReads, numScans);
}
}
Expand Down Expand Up @@ -96,4 +115,8 @@ public void close() {
regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff);
}
}

private interface CheckQuotaRunnable {
void run() throws RpcThrottlingException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

/**
* Noop operation quota returned when no quota is associated to the user/table
*/
Expand All @@ -40,7 +42,13 @@ public static OperationQuota get() {
}

@Override
public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException {
// no-op
}

@Override
public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException {
// no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public long getReadAvailable() {
throw new UnsupportedOperationException();
}

@Override
public long getReadLimit() {
return Long.MAX_VALUE;
}

@Override
public String toString() {
return "NoopQuotaLimiter";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

/**
* Interface that allows to check the quota available for an operation.
*/
Expand Down Expand Up @@ -51,11 +53,25 @@ public enum OperationType {
* on the number of operations to perform and the average size accumulated during time.
* @param numWrites number of write operation that will be performed
* @param numReads number of small-read operation that will be performed
* @param numScans number of long-read operation that will be performed
* @throws RpcThrottlingException if the operation cannot be performed because RPC quota is
* exceeded.
*/
void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException;
void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException;

/**
* Checks if it is possible to execute the scan. The quota will be estimated based on the
* composition of the scan.
* @param scanRequest the given scan operation
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
* scanner
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
* calls
* @throws RpcThrottlingException if the operation cannot be performed because RPC quota is
* exceeded.
*/
void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException;

/** Cleanup method on operation completion */
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
/** Returns the number of bytes available to read to avoid exceeding the quota */
long getReadAvailable();

/** Returns the maximum number of bytes ever available to read */
long getReadLimit();

/** Returns the number of bytes available to write to avoid exceeding the quota */
long getWriteAvailable();
}
Loading

0 comments on commit 8e98301

Please sign in to comment.