Skip to content

Commit

Permalink
HBASE-28346: Expose checkQuota to Coprocessor Endpoints (#6066)
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Dimiduk <[email protected]>
  • Loading branch information
charlesconnell authored Aug 6, 2024
1 parent edbb145 commit 158b6d0
Show file tree
Hide file tree
Showing 9 changed files with 342 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.regionserver.OnlineRegions;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -120,4 +124,52 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment<Reg
* @return the RawCellBuilder
*/
RawCellBuilder getCellBuilder();

/**
* Returns an RpcQuotaManager that can be used to apply quota checks against the workloads
* generated by the coprocessor.
* @return the RpcQuotaManager
*/
RpcQuotaManager getRpcQuotaManager();

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation.
* @param scan the scan to be estimated against the quota
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
* scanner
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
* calls
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException;

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param type the operation type
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type)
throws IOException, RpcThrottlingException;

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param numWrites number of writes to count against quota
* @param numReads number of reads to count against quota
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
throws IOException, RpcThrottlingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.RpcCall;
Expand Down Expand Up @@ -181,6 +182,11 @@ public void addScanResult(final List<Result> results) {
operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
}

@Override
public void addScanResultCells(final List<Cell> cells) {
operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells);
}

@Override
public void addMutation(final Mutation mutation) {
operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.quotas;

import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -81,4 +82,9 @@ public long getReadAvailable() {
public long getReadConsumed() {
return 0L;
}

@Override
public void addScanResultCells(List<Cell> cells) {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.quotas;

import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -88,6 +89,12 @@ void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultS
*/
void addScanResult(List<Result> results);

/**
* Add a scan result in the form of cells. This will be used to calculate the exact quota and have
* a better long-read average size for the next time.
*/
void addScanResultCells(List<Cell> cells);

/**
* Add a mutation result. This will be used to calculate the exact quota and have a better
* mutation average size for the next time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,14 @@ public static long calculateResultSize(final List<Result> results) {
return size;
}

public static long calculateCellsSize(final List<Cell> cells) {
long size = 0;
for (Cell cell : cells) {
size += cell.getSerializedSize();
}
return size;
}

/**
* Method to enable a table, if not already enabled. This method suppresses
* {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RegionServerRpcQuotaManager {
public class RegionServerRpcQuotaManager implements RpcQuotaManager {
private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class);

private final RegionServerServices rsServices;
Expand Down Expand Up @@ -154,21 +154,7 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t
return NoopOperationQuota.get();
}

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method is specific to scans
* because estimating a scan's workload is more complicated than estimating the workload of a
* get/put.
* @param region the region where the operation will be performed
* @param scanRequest the scan to be estimated against the quota
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
* scanner
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
* calls
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
@Override
public OperationQuota checkScanQuota(final Region region,
final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
Expand All @@ -195,16 +181,7 @@ public OperationQuota checkScanQuota(final Region region,
return quota;
}

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param type the operation type
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
@Override
public OperationQuota checkBatchQuota(final Region region,
final OperationQuota.OperationType type) throws IOException, RpcThrottlingException {
switch (type) {
Expand All @@ -218,17 +195,7 @@ public OperationQuota checkBatchQuota(final Region region,
throw new RuntimeException("Invalid operation type: " + type);
}

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param actions the "multi" actions to perform
* @param hasCondition whether the RegionAction has a condition
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
@Override
public OperationQuota checkBatchQuota(final Region region,
final List<ClientProtos.Action> actions, boolean hasCondition)
throws IOException, RpcThrottlingException {
Expand Down Expand Up @@ -258,7 +225,8 @@ public OperationQuota checkBatchQuota(final Region region,
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
private OperationQuota checkBatchQuota(final Region region, final int numWrites,
@Override
public OperationQuota checkBatchQuota(final Region region, final int numWrites,
final int numReads) throws IOException, RpcThrottlingException {
Optional<User> user = RpcServer.getRequestUser();
UserGroupInformation ugi;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public interface RpcQuotaManager {

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method is specific to scans
* because estimating a scan's workload is more complicated than estimating the workload of a
* get/put.
* @param region the region where the operation will be performed
* @param scanRequest the scan to be estimated against the quota
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
* scanner
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
* calls
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkScanQuota(final Region region, final ClientProtos.ScanRequest scanRequest,
long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
throws IOException, RpcThrottlingException;

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param type the operation type
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type)
throws IOException, RpcThrottlingException;

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param actions the "multi" actions to perform
* @param hasCondition whether the RegionAction has a condition
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkBatchQuota(final Region region, final List<ClientProtos.Action> actions,
boolean hasCondition) throws IOException, RpcThrottlingException;

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param numWrites number of writes to count against quota
* @param numReads number of reads to count against quota
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
throws IOException, RpcThrottlingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
Expand All @@ -83,6 +86,9 @@
import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

/**
* Implements the coprocessor environment and runtime support for coprocessors loaded within a
* {@link Region}.
Expand Down Expand Up @@ -116,6 +122,7 @@ private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor
ConcurrentMap<String, Object> sharedData;
private final MetricRegistry metricRegistry;
private final RegionServerServices services;
private final RpcQuotaManager rpcQuotaManager;

/**
* Constructor
Expand All @@ -131,6 +138,13 @@ public RegionEnvironment(final RegionCoprocessor impl, final int priority, final
this.services = services;
this.metricRegistry =
MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
// Some unit tests reach this line with services == null, and are okay with rpcQuotaManager
// being null. Let these unit tests succeed. This should not happen in real usage.
if (services != null) {
this.rpcQuotaManager = services.getRegionServerRpcQuotaManager();
} else {
this.rpcQuotaManager = null;
}
}

/** Returns the region */
Expand Down Expand Up @@ -186,6 +200,35 @@ public RawCellBuilder getCellBuilder() {
// We always do a DEEP_COPY only
return RawCellBuilderFactory.create();
}

@Override
public RpcQuotaManager getRpcQuotaManager() {
return rpcQuotaManager;
}

@Override
public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException {
ClientProtos.ScanRequest scanRequest = RequestConverter
.buildScanRequest(region.getRegionInfo().getRegionName(), scan, scan.getCaching(), false);
long maxScannerResultSize =
services.getConfiguration().getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
return rpcQuotaManager.checkScanQuota(region, scanRequest, maxScannerResultSize,
maxBlockBytesScanned, prevBlockBytesScannedDifference);
}

@Override
public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type)
throws IOException, RpcThrottlingException {
return rpcQuotaManager.checkBatchQuota(region, type);
}

@Override
public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
throws IOException, RpcThrottlingException {
return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads);
}
}

/**
Expand Down
Loading

0 comments on commit 158b6d0

Please sign in to comment.