Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28346: Expose checkQuota to Coprocessor Endpoints #6066

Merged
merged 6 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.regionserver.OnlineRegions;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -120,4 +124,52 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment<Reg
* @return the RawCellBuilder
*/
RawCellBuilder getCellBuilder();

/**
* Returns an RpcQuotaManager that can be used to apply quota checks against the workloads
* generated by the coprocessor.
* @return the RpcQuotaManager
*/
RpcQuotaManager getRpcQuotaManager();

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation.
* @param scan the scan to be estimated against the quota
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
* scanner
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
* calls
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException;
Comment on lines +146 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll probably want to add support for checking batch quotas as well. I guess maybe that can be done trivially via the RpcQuotaManager, but could be nice for API consistency maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param type the operation type
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type)
throws IOException, RpcThrottlingException;

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param numWrites number of writes to count against quota
* @param numReads number of reads to count against quota
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
throws IOException, RpcThrottlingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.RpcCall;
Expand Down Expand Up @@ -181,6 +182,11 @@ public void addScanResult(final List<Result> results) {
operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
}

@Override
public void addScanResultCells(final List<Cell> cells) {
operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells);
}

@Override
public void addMutation(final Mutation mutation) {
operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.quotas;

import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -81,4 +82,9 @@ public long getReadAvailable() {
public long getReadConsumed() {
return 0L;
}

@Override
public void addScanResultCells(List<Cell> cells) {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.quotas;

import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -88,6 +89,12 @@ void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultS
*/
void addScanResult(List<Result> results);

/**
* Add a scan result in the form of cells. This will be used to calculate the exact quota and have
* a better long-read average size for the next time.
*/
void addScanResultCells(List<Cell> cells);

/**
* Add a mutation result. This will be used to calculate the exact quota and have a better
* mutation average size for the next time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,14 @@ public static long calculateResultSize(final List<Result> results) {
return size;
}

public static long calculateCellsSize(final List<Cell> cells) {
long size = 0;
for (Cell cell : cells) {
size += cell.getSerializedSize();
}
return size;
}

/**
* Method to enable a table, if not already enabled. This method suppresses
* {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RegionServerRpcQuotaManager {
public class RegionServerRpcQuotaManager implements RpcQuotaManager {
private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class);

private final RegionServerServices rsServices;
Expand Down Expand Up @@ -154,21 +154,7 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t
return NoopOperationQuota.get();
}

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method is specific to scans
* because estimating a scan's workload is more complicated than estimating the workload of a
* get/put.
* @param region the region where the operation will be performed
* @param scanRequest the scan to be estimated against the quota
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
* scanner
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
* calls
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
@Override
public OperationQuota checkScanQuota(final Region region,
final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize,
long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
Expand All @@ -195,16 +181,7 @@ public OperationQuota checkScanQuota(final Region region,
return quota;
}

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param type the operation type
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
@Override
public OperationQuota checkBatchQuota(final Region region,
final OperationQuota.OperationType type) throws IOException, RpcThrottlingException {
switch (type) {
Expand All @@ -218,17 +195,7 @@ public OperationQuota checkBatchQuota(final Region region,
throw new RuntimeException("Invalid operation type: " + type);
}

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param actions the "multi" actions to perform
* @param hasCondition whether the RegionAction has a condition
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
@Override
public OperationQuota checkBatchQuota(final Region region,
final List<ClientProtos.Action> actions, boolean hasCondition)
throws IOException, RpcThrottlingException {
Expand Down Expand Up @@ -258,7 +225,8 @@ public OperationQuota checkBatchQuota(final Region region,
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
private OperationQuota checkBatchQuota(final Region region, final int numWrites,
@Override
public OperationQuota checkBatchQuota(final Region region, final int numWrites,
final int numReads) throws IOException, RpcThrottlingException {
Optional<User> user = RpcServer.getRequestUser();
UserGroupInformation ugi;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public interface RpcQuotaManager {

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method is specific to scans
* because estimating a scan's workload is more complicated than estimating the workload of a
* get/put.
* @param region the region where the operation will be performed
* @param scanRequest the scan to be estimated against the quota
* @param maxScannerResultSize the maximum bytes to be returned by the scanner
* @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the
* scanner
* @param prevBlockBytesScannedDifference the difference between BBS of the previous two next
* calls
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkScanQuota(final Region region, final ClientProtos.ScanRequest scanRequest,
long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference)
throws IOException, RpcThrottlingException;

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param type the operation type
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type)
throws IOException, RpcThrottlingException;

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param actions the "multi" actions to perform
* @param hasCondition whether the RegionAction has a condition
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkBatchQuota(final Region region, final List<ClientProtos.Action> actions,
boolean hasCondition) throws IOException, RpcThrottlingException;

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation. This method does not support
* scans because estimating a scan's workload is more complicated than estimating the workload of
* a get/put.
* @param region the region where the operation will be performed
* @param numWrites number of writes to count against quota
* @param numReads number of reads to count against quota
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
throws IOException, RpcThrottlingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.RpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
Expand All @@ -83,6 +86,9 @@
import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

/**
* Implements the coprocessor environment and runtime support for coprocessors loaded within a
* {@link Region}.
Expand Down Expand Up @@ -116,6 +122,7 @@ private static class RegionEnvironment extends BaseEnvironment<RegionCoprocessor
ConcurrentMap<String, Object> sharedData;
private final MetricRegistry metricRegistry;
private final RegionServerServices services;
private final RpcQuotaManager rpcQuotaManager;

/**
* Constructor
Expand All @@ -131,6 +138,13 @@ public RegionEnvironment(final RegionCoprocessor impl, final int priority, final
this.services = services;
this.metricRegistry =
MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName());
// Some unit tests reach this line with services == null, and are okay with rpcQuotaManager
// being null. Let these unit tests succeed. This should not happen in real usage.
if (services != null) {
this.rpcQuotaManager = services.getRegionServerRpcQuotaManager();
} else {
this.rpcQuotaManager = null;
}
}

/** Returns the region */
Expand Down Expand Up @@ -186,6 +200,35 @@ public RawCellBuilder getCellBuilder() {
// We always do a DEEP_COPY only
return RawCellBuilderFactory.create();
}

@Override
public RpcQuotaManager getRpcQuotaManager() {
return rpcQuotaManager;
}

@Override
public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned,
long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException {
ClientProtos.ScanRequest scanRequest = RequestConverter
.buildScanRequest(region.getRegionInfo().getRegionName(), scan, scan.getCaching(), false);
long maxScannerResultSize =
services.getConfiguration().getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
return rpcQuotaManager.checkScanQuota(region, scanRequest, maxScannerResultSize,
maxBlockBytesScanned, prevBlockBytesScannedDifference);
}

@Override
public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type)
throws IOException, RpcThrottlingException {
return rpcQuotaManager.checkBatchQuota(region, type);
}

@Override
public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads)
throws IOException, RpcThrottlingException {
return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads);
}
}

/**
Expand Down
Loading