diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java index 8967f596379f..923a719264b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java @@ -26,7 +26,11 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.quotas.RpcQuotaManager; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; import org.apache.hadoop.hbase.regionserver.OnlineRegions; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.security.User; @@ -127,4 +131,52 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment results) { operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results); } + @Override + public void addScanResultCells(final List cells) { + operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells); + } + @Override public void addMutation(final Mutation mutation) { operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index 736560e6fd17..63cf97188d86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.quotas; import java.util.List; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.yetus.audience.InterfaceAudience; @@ -81,4 +82,9 @@ public long getReadAvailable() { public long getReadConsumed() { return 0L; } + + @Override + public void addScanResultCells(List cells) { + // no-op + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index ef0a35fa5892..0d9b48b6074b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.quotas; import java.util.List; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.yetus.audience.InterfaceAudience; @@ -88,6 +89,12 @@ void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultS */ void addScanResult(List results); + /** + * Add a scan result in the form of cells. This will be used to calculate the exact quota and have + * a better long-read average size for the next time. + */ + void addScanResultCells(List cells); + /** * Add a mutation result. This will be used to calculate the exact quota and have a better * mutation average size for the next time. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java index 8ced76e39632..b4887392196d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java @@ -590,6 +590,14 @@ public static long calculateResultSize(final List results) { return size; } + public static long calculateCellsSize(final List cells) { + long size = 0; + for (Cell cell : cells) { + size += cell.getSerializedSize(); + } + return size; + } + /** * Method to enable a table, if not already enabled. This method suppresses * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 92a0cfd5c135..f9a7ccba401b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -43,7 +43,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class RegionServerRpcQuotaManager { +public class RegionServerRpcQuotaManager implements RpcQuotaManager { private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class); private final RegionServerServices rsServices; @@ -154,21 +154,7 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t return NoopOperationQuota.get(); } - /** - * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the - * available quota and to report the data/usage of the operation. This method is specific to scans - * because estimating a scan's workload is more complicated than estimating the workload of a - * get/put. - * @param region the region where the operation will be performed - * @param scanRequest the scan to be estimated against the quota - * @param maxScannerResultSize the maximum bytes to be returned by the scanner - * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the - * scanner - * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next - * calls - * @return the OperationQuota - * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. - */ + @Override public OperationQuota checkScanQuota(final Region region, final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) @@ -195,16 +181,7 @@ public OperationQuota checkScanQuota(final Region region, return quota; } - /** - * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the - * available quota and to report the data/usage of the operation. This method does not support - * scans because estimating a scan's workload is more complicated than estimating the workload of - * a get/put. - * @param region the region where the operation will be performed - * @param type the operation type - * @return the OperationQuota - * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. - */ + @Override public OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type) throws IOException, RpcThrottlingException { switch (type) { @@ -218,17 +195,7 @@ public OperationQuota checkBatchQuota(final Region region, throw new RuntimeException("Invalid operation type: " + type); } - /** - * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the - * available quota and to report the data/usage of the operation. This method does not support - * scans because estimating a scan's workload is more complicated than estimating the workload of - * a get/put. - * @param region the region where the operation will be performed - * @param actions the "multi" actions to perform - * @param hasCondition whether the RegionAction has a condition - * @return the OperationQuota - * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. - */ + @Override public OperationQuota checkBatchQuota(final Region region, final List actions, boolean hasCondition) throws IOException, RpcThrottlingException { @@ -258,7 +225,8 @@ public OperationQuota checkBatchQuota(final Region region, * @return the OperationQuota * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. */ - private OperationQuota checkBatchQuota(final Region region, final int numWrites, + @Override + public OperationQuota checkBatchQuota(final Region region, final int numWrites, final int numReads) throws IOException, RpcThrottlingException { Optional user = RpcServer.getRequestUser(); UserGroupInformation ugi; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java new file mode 100644 index 000000000000..60392ca3b3f6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface RpcQuotaManager { + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. This method is specific to scans + * because estimating a scan's workload is more complicated than estimating the workload of a + * get/put. + * @param region the region where the operation will be performed + * @param scanRequest the scan to be estimated against the quota + * @param maxScannerResultSize the maximum bytes to be returned by the scanner + * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the + * scanner + * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next + * calls + * @return the OperationQuota + * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. + */ + OperationQuota checkScanQuota(final Region region, final ClientProtos.ScanRequest scanRequest, + long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) + throws IOException, RpcThrottlingException; + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. This method does not support + * scans because estimating a scan's workload is more complicated than estimating the workload of + * a get/put. + * @param region the region where the operation will be performed + * @param type the operation type + * @return the OperationQuota + * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. + */ + OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type) + throws IOException, RpcThrottlingException; + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. This method does not support + * scans because estimating a scan's workload is more complicated than estimating the workload of + * a get/put. + * @param region the region where the operation will be performed + * @param actions the "multi" actions to perform + * @param hasCondition whether the RegionAction has a condition + * @return the OperationQuota + * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. + */ + OperationQuota checkBatchQuota(final Region region, final List actions, + boolean hasCondition) throws IOException, RpcThrottlingException; + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. This method does not support + * scans because estimating a scan's workload is more complicated than estimating the workload of + * a get/put. + * @param region the region where the operation will be performed + * @param numWrites number of writes to count against quota + * @param numReads number of reads to count against quota + * @return the OperationQuota + * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. + */ + OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads) + throws IOException, RpcThrottlingException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 0e608fc8dcd2..929b24e521a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -69,6 +69,9 @@ import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.quotas.RpcQuotaManager; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -85,6 +88,9 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** * Implements the coprocessor environment and runtime support for coprocessors loaded within a * {@link Region}. @@ -118,6 +124,7 @@ private static class RegionEnvironment extends BaseEnvironment sharedData; private final MetricRegistry metricRegistry; private final RegionServerServices services; + private final RpcQuotaManager rpcQuotaManager; /** * Constructor @@ -133,6 +140,13 @@ public RegionEnvironment(final RegionCoprocessor impl, final int priority, final this.services = services; this.metricRegistry = MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); + // Some unit tests reach this line with services == null, and are okay with rpcQuotaManager + // being null. Let these unit tests succeed. This should not happen in real usage. + if (services != null) { + this.rpcQuotaManager = services.getRegionServerRpcQuotaManager(); + } else { + this.rpcQuotaManager = null; + } } /** Returns the region */ @@ -188,6 +202,35 @@ public RawCellBuilder getCellBuilder() { // We always do a DEEP_COPY only return RawCellBuilderFactory.create(); } + + @Override + public RpcQuotaManager getRpcQuotaManager() { + return rpcQuotaManager; + } + + @Override + public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned, + long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException { + ClientProtos.ScanRequest scanRequest = RequestConverter + .buildScanRequest(region.getRegionInfo().getRegionName(), scan, scan.getCaching(), false); + long maxScannerResultSize = + services.getConfiguration().getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); + return rpcQuotaManager.checkScanQuota(region, scanRequest, maxScannerResultSize, + maxBlockBytesScanned, prevBlockBytesScannedDifference); + } + + @Override + public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type) + throws IOException, RpcThrottlingException { + return rpcQuotaManager.checkBatchQuota(region, type); + } + + @Override + public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads) + throws IOException, RpcThrottlingException { + return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads); + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java new file mode 100644 index 000000000000..4a638d965b38 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, CoprocessorTests.class }) +public class TestRegionCoprocessorQuotaUsage { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionCoprocessorQuotaUsage.class); + + private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static TableName TABLE_NAME = TableName.valueOf("TestRegionCoprocessorQuotaUsage"); + private static byte[] CF = Bytes.toBytes("CF"); + private static byte[] CQ = Bytes.toBytes("CQ"); + private static Connection CONN; + private static Table TABLE; + private static AtomicBoolean THROTTLING_OCCURRED = new AtomicBoolean(false); + + public static class MyRegionObserver implements RegionObserver { + @Override + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + + // For the purposes of this test, we only need to catch a throttle happening once, then + // let future requests pass through so we don't make this test take any longer than necessary + if (!THROTTLING_OCCURRED.get()) { + try { + c.getEnvironment().checkBatchQuota(c.getEnvironment().getRegion(), + OperationQuota.OperationType.GET); + } catch (RpcThrottlingException e) { + THROTTLING_OCCURRED.set(true); + throw e; + } + } + } + } + + public static class MyCoprocessor implements RegionCoprocessor { + RegionObserver observer = new MyRegionObserver(); + + @Override + public Optional getRegionObserver() { + return Optional.of(observer); + } + } + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setBoolean("hbase.quota.enabled", true); + conf.setInt("hbase.quota.default.user.machine.read.num", 2); + conf.set("hbase.quota.rate.limiter", "org.apache.hadoop.hbase.quotas.FixedIntervalRateLimiter"); + conf.set("hbase.quota.rate.limiter.refill.interval.ms", "300000"); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCoprocessor.class.getName()); + UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + UTIL.createTable(TABLE_NAME, CF, splitKeys); + CONN = UTIL.getConnection(); + TABLE = CONN.getTable(TABLE_NAME); + TABLE.put(new Put(Bytes.toBytes(String.format("%d", 0))).addColumn(CF, CQ, Bytes.toBytes(0L))); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testGet() throws InterruptedException, ExecutionException, IOException { + // Hit the table 5 times which ought to be enough to make a throttle happen + for (int i = 0; i < 5; i++) { + TABLE.get(new Get(Bytes.toBytes("000"))); + } + assertTrue("Throttling did not happen as expected", THROTTLING_OCCURRED.get()); + } +}