From 33f4ddf9d43370a19f295199a35fa041d8c3dd55 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Tue, 9 Jul 2024 14:37:46 -0400 Subject: [PATCH 1/6] Expose checkQuota to Coprocessor Endpoints --- .../RegionCoprocessorEnvironment.java | 25 ++++++ .../hbase/quotas/DefaultOperationQuota.java | 6 ++ .../hbase/quotas/NoopOperationQuota.java | 6 ++ .../hadoop/hbase/quotas/OperationQuota.java | 7 ++ .../apache/hadoop/hbase/quotas/QuotaUtil.java | 8 ++ .../quotas/RegionServerRpcQuotaManager.java | 41 +--------- .../hadoop/hbase/quotas/RpcQuotaManager.java | 78 +++++++++++++++++++ .../regionserver/RegionCoprocessorHost.java | 25 ++++++ 8 files changed, 159 insertions(+), 37 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java index e3232db909c2..40012fc9c230 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java @@ -26,7 +26,11 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.quotas.RpcQuotaManager; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; import org.apache.hadoop.hbase.regionserver.OnlineRegions; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.yetus.audience.InterfaceAudience; @@ -120,4 +124,25 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment results) { operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results); } + @Override + public void addScanResultCells(final List cells) { + operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateCellsSize(cells); + } + @Override public void addMutation(final Mutation mutation) { operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index 736560e6fd17..63cf97188d86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.quotas; import java.util.List; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.yetus.audience.InterfaceAudience; @@ -81,4 +82,9 @@ public long getReadAvailable() { public long getReadConsumed() { return 0L; } + + @Override + public void addScanResultCells(List cells) { + // no-op + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index ef0a35fa5892..0d9b48b6074b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.quotas; import java.util.List; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.yetus.audience.InterfaceAudience; @@ -88,6 +89,12 @@ void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultS */ void addScanResult(List results); + /** + * Add a scan result in the form of cells. This will be used to calculate the exact quota and have + * a better long-read average size for the next time. + */ + void addScanResultCells(List cells); + /** * Add a mutation result. This will be used to calculate the exact quota and have a better * mutation average size for the next time. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java index 0da1aa661658..8e267d4e8bf6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java @@ -591,6 +591,14 @@ public static long calculateResultSize(final List results) { return size; } + public static long calculateCellsSize(final List cells) { + long size = 0; + for (Cell cell : cells) { + size += cell.getSerializedSize(); + } + return size; + } + /** * Method to enable a table, if not already enabled. This method suppresses * {@link TableNotDisabledException} and {@link TableNotFoundException}, if thrown while enabling diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 92a0cfd5c135..4c84fe1ccd5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -43,7 +43,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class RegionServerRpcQuotaManager { +public class RegionServerRpcQuotaManager implements RpcQuotaManager { private static final Logger LOG = LoggerFactory.getLogger(RegionServerRpcQuotaManager.class); private final RegionServerServices rsServices; @@ -154,21 +154,7 @@ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName t return NoopOperationQuota.get(); } - /** - * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the - * available quota and to report the data/usage of the operation. This method is specific to scans - * because estimating a scan's workload is more complicated than estimating the workload of a - * get/put. - * @param region the region where the operation will be performed - * @param scanRequest the scan to be estimated against the quota - * @param maxScannerResultSize the maximum bytes to be returned by the scanner - * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the - * scanner - * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next - * calls - * @return the OperationQuota - * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. - */ + @Override public OperationQuota checkScanQuota(final Region region, final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) @@ -195,16 +181,7 @@ public OperationQuota checkScanQuota(final Region region, return quota; } - /** - * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the - * available quota and to report the data/usage of the operation. This method does not support - * scans because estimating a scan's workload is more complicated than estimating the workload of - * a get/put. - * @param region the region where the operation will be performed - * @param type the operation type - * @return the OperationQuota - * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. - */ + @Override public OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type) throws IOException, RpcThrottlingException { switch (type) { @@ -218,17 +195,7 @@ public OperationQuota checkBatchQuota(final Region region, throw new RuntimeException("Invalid operation type: " + type); } - /** - * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the - * available quota and to report the data/usage of the operation. This method does not support - * scans because estimating a scan's workload is more complicated than estimating the workload of - * a get/put. - * @param region the region where the operation will be performed - * @param actions the "multi" actions to perform - * @param hasCondition whether the RegionAction has a condition - * @return the OperationQuota - * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. - */ + @Override public OperationQuota checkBatchQuota(final Region region, final List actions, boolean hasCondition) throws IOException, RpcThrottlingException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java new file mode 100644 index 000000000000..afddd5341eec --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface RpcQuotaManager { + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. This method is specific to scans + * because estimating a scan's workload is more complicated than estimating the workload of a + * get/put. + * @param region the region where the operation will be performed + * @param scanRequest the scan to be estimated against the quota + * @param maxScannerResultSize the maximum bytes to be returned by the scanner + * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the + * scanner + * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next + * calls + * @return the OperationQuota + * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. + */ + OperationQuota checkScanQuota(final Region region, final ClientProtos.ScanRequest scanRequest, + long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) + throws IOException, RpcThrottlingException; + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. This method does not support + * scans because estimating a scan's workload is more complicated than estimating the workload of + * a get/put. + * @param region the region where the operation will be performed + * @param type the operation type + * @return the OperationQuota + * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. + */ + OperationQuota checkBatchQuota(final Region region, final OperationQuota.OperationType type) + throws IOException, RpcThrottlingException; + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. This method does not support + * scans because estimating a scan's workload is more complicated than estimating the workload of + * a get/put. + * @param region the region where the operation will be performed + * @param actions the "multi" actions to perform + * @param hasCondition whether the RegionAction has a condition + * @return the OperationQuota + * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. + */ + OperationQuota checkBatchQuota(final Region region, final List actions, + boolean hasCondition) throws IOException, RpcThrottlingException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 398c596b63f8..2c3f8a91bbf3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -65,6 +65,9 @@ import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.metrics.MetricRegistry; +import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.quotas.RpcQuotaManager; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -83,6 +86,9 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.AbstractReferenceMap; import org.apache.hbase.thirdparty.org.apache.commons.collections4.map.ReferenceMap; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** * Implements the coprocessor environment and runtime support for coprocessors loaded within a * {@link Region}. @@ -116,6 +122,7 @@ private static class RegionEnvironment extends BaseEnvironment sharedData; private final MetricRegistry metricRegistry; private final RegionServerServices services; + private final RpcQuotaManager rpcQuotaManager; /** * Constructor @@ -131,6 +138,7 @@ public RegionEnvironment(final RegionCoprocessor impl, final int priority, final this.services = services; this.metricRegistry = MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); + this.rpcQuotaManager = services.getRegionServerRpcQuotaManager(); } /** Returns the region */ @@ -186,6 +194,23 @@ public RawCellBuilder getCellBuilder() { // We always do a DEEP_COPY only return RawCellBuilderFactory.create(); } + + @Override + public RpcQuotaManager getRpcQuotaManager() { + return rpcQuotaManager; + } + + @Override + public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned, + long prevBlockBytesScannedDifference) throws IOException, RpcThrottlingException { + ClientProtos.ScanRequest scanRequest = RequestConverter + .buildScanRequest(region.getRegionInfo().getRegionName(), scan, scan.getCaching(), false); + long maxScannerResultSize = + services.getConfiguration().getLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE); + return rpcQuotaManager.checkScanQuota(region, scanRequest, maxScannerResultSize, + maxBlockBytesScanned, prevBlockBytesScannedDifference); + } } /** From e24e79b86c8fe291c22f5e01f2e3606300a65def Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Wed, 10 Jul 2024 18:27:42 -0400 Subject: [PATCH 2/6] Test new behavior, fix existing test --- .../RegionCoprocessorEnvironment.java | 29 +++++ .../regionserver/RegionCoprocessorHost.java | 12 ++ .../TestRegionCoprocessorQuotaUsage.java | 107 ++++++++++++++++++ .../TestRegionObserverScannerOpenHook.java | 4 +- 4 files changed, 151 insertions(+), 1 deletion(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java index 40012fc9c230..4fc4dc8ea26e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; +import java.util.List; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.quotas.RpcThrottlingException; import org.apache.hadoop.hbase.regionserver.OnlineRegions; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -145,4 +147,31 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment actions, + boolean hasCondition) throws IOException, RpcThrottlingException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 2c3f8a91bbf3..4ffd1825bcd6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -211,6 +211,18 @@ public OperationQuota checkScanQuota(Scan scan, long maxBlockBytesScanned, return rpcQuotaManager.checkScanQuota(region, scanRequest, maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); } + + @Override + public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationType type) + throws IOException, RpcThrottlingException { + return rpcQuotaManager.checkBatchQuota(region, type); + } + + @Override + public OperationQuota checkBatchQuota(Region region, List actions, + boolean hasCondition) throws IOException, RpcThrottlingException { + return rpcQuotaManager.checkBatchQuota(region, actions, hasCondition); + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java new file mode 100644 index 000000000000..1049a6f114a7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java @@ -0,0 +1,107 @@ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.quotas.OperationQuota; +import org.apache.hadoop.hbase.quotas.RpcThrottlingException; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import static org.junit.Assert.assertTrue; + +@Category({ MediumTests.class, CoprocessorTests.class }) +public class TestRegionCoprocessorQuotaUsage { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionCoprocessorQuotaUsage.class); + + private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); + private static TableName TABLE_NAME = TableName.valueOf("TestRegionCoprocessorQuotaUsage"); + private static byte[] CF = Bytes.toBytes("CF"); + private static byte[] CQ = Bytes.toBytes("CQ"); + private static Connection CONN; + private static Table TABLE; + private static AtomicBoolean THROTTLING_OCCURRED = new AtomicBoolean(false); + + public static class MyRegionObserver implements RegionObserver { + @Override public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + + // For the purposes of this test, we only need to catch a throttle happening once, then + // let future requests pass through so we don't make this test take any longer than necessary + if (!THROTTLING_OCCURRED.get()) { + try { + c.getEnvironment() + .checkBatchQuota(c.getEnvironment().getRegion(), OperationQuota.OperationType.GET); + } catch (RpcThrottlingException e) { + THROTTLING_OCCURRED.set(true); + throw e; + } + } + } + } + + public static class MyCoprocessor implements RegionCoprocessor { + private RegionCoprocessorEnvironment env; + + RegionObserver observer = new MyRegionObserver(); + + @Override public Optional getRegionObserver() { + return Optional.of(observer); + } + } + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = UTIL.getConfiguration(); + conf.setBoolean("hbase.quota.enabled", true); + conf.setInt("hbase.quota.default.user.machine.read.num", 2); + conf.set("hbase.quota.rate.limiter", "org.apache.hadoop.hbase.quotas.FixedIntervalRateLimiter"); + conf.set("hbase.quota.rate.limiter.refill.interval.ms", "300000"); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MyCoprocessor.class.getName()); + UTIL.startMiniCluster(3); + byte[][] splitKeys = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + UTIL.createTable(TABLE_NAME, CF, splitKeys); + CONN = UTIL.getConnection(); + TABLE = CONN.getTable(TABLE_NAME); + TABLE.put(new Put(Bytes.toBytes(String.format("%d", 0))).addColumn(CF, CQ, Bytes.toBytes(0L))); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + UTIL.shutdownMiniCluster(); + } + + @Test + public void testGet() throws InterruptedException, ExecutionException, IOException { + // Hit the table 5 times which ought to be enough to make a throttle happen + for (int i = 0; i < 5; i++) { + TABLE.get(new Get(Bytes.toBytes("000"))); + } + assertTrue("Throttling did not happen as expected", THROTTLING_OCCURRED.get()); + } +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index a1ab261cb6b0..193c63b5d4f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; import java.io.IOException; import java.util.Collections; @@ -198,7 +199,8 @@ HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf, RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); Path path = new Path(DIR + callingMethod); WAL wal = HBaseTestingUtil.createWal(conf, path, info); - HRegion r = HRegion.createHRegion(info, path, conf, tableDescriptor, wal); + RegionServerServices rsServices = mock(RegionServerServices.class); + HRegion r = HRegion.createHRegion(info, path, conf, tableDescriptor, wal, true, rsServices); // this following piece is a hack. currently a coprocessorHost // is secretly loaded at OpenRegionHandler. we don't really // start a region server here, so just manually create cphost From 3ca3c1367c09a4a6aca4db57ce18c16dcfe5473c Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Wed, 10 Jul 2024 18:28:46 -0400 Subject: [PATCH 3/6] spotless --- .../RegionCoprocessorEnvironment.java | 5 ++- .../TestRegionCoprocessorQuotaUsage.java | 44 +++++++++++++------ 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java index 4fc4dc8ea26e..77e0285151f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java @@ -34,10 +34,11 @@ import org.apache.hadoop.hbase.quotas.RpcThrottlingException; import org.apache.hadoop.hbase.regionserver.OnlineRegions; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment { @@ -137,7 +138,7 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment c, Get get, + @Override + public void preGetOp(ObserverContext c, Get get, List result) throws IOException { // For the purposes of this test, we only need to catch a throttle happening once, then // let future requests pass through so we don't make this test take any longer than necessary if (!THROTTLING_OCCURRED.get()) { try { - c.getEnvironment() - .checkBatchQuota(c.getEnvironment().getRegion(), OperationQuota.OperationType.GET); + c.getEnvironment().checkBatchQuota(c.getEnvironment().getRegion(), + OperationQuota.OperationType.GET); } catch (RpcThrottlingException e) { THROTTLING_OCCURRED.set(true); throw e; @@ -64,7 +83,8 @@ public static class MyCoprocessor implements RegionCoprocessor { RegionObserver observer = new MyRegionObserver(); - @Override public Optional getRegionObserver() { + @Override + public Optional getRegionObserver() { return Optional.of(observer); } } @@ -76,8 +96,7 @@ public static void setUp() throws Exception { conf.setInt("hbase.quota.default.user.machine.read.num", 2); conf.set("hbase.quota.rate.limiter", "org.apache.hadoop.hbase.quotas.FixedIntervalRateLimiter"); conf.set("hbase.quota.rate.limiter.refill.interval.ms", "300000"); - conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - MyCoprocessor.class.getName()); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCoprocessor.class.getName()); UTIL.startMiniCluster(3); byte[][] splitKeys = new byte[8][]; for (int i = 111; i < 999; i += 111) { @@ -104,4 +123,3 @@ public void testGet() throws InterruptedException, ExecutionException, IOExcepti assertTrue("Throttling did not happen as expected", THROTTLING_OCCURRED.get()); } } - From bfbae1c3db7e38bd4ab731af75b5a3d9230d3c37 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Mon, 15 Jul 2024 14:31:17 -0400 Subject: [PATCH 4/6] different strategy for unit tests --- .../hadoop/hbase/regionserver/RegionCoprocessorHost.java | 7 ++++++- .../coprocessor/TestRegionObserverScannerOpenHook.java | 4 +--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 4ffd1825bcd6..e113557e4709 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -138,7 +138,12 @@ public RegionEnvironment(final RegionCoprocessor impl, final int priority, final this.services = services; this.metricRegistry = MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); - this.rpcQuotaManager = services.getRegionServerRpcQuotaManager(); + // lets unit tests through + if (services != null) { + this.rpcQuotaManager = services.getRegionServerRpcQuotaManager(); + } else { + this.rpcQuotaManager = null; + } } /** Returns the region */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 193c63b5d4f1..a1ab261cb6b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.mock; import java.io.IOException; import java.util.Collections; @@ -199,8 +198,7 @@ HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf, RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(); Path path = new Path(DIR + callingMethod); WAL wal = HBaseTestingUtil.createWal(conf, path, info); - RegionServerServices rsServices = mock(RegionServerServices.class); - HRegion r = HRegion.createHRegion(info, path, conf, tableDescriptor, wal, true, rsServices); + HRegion r = HRegion.createHRegion(info, path, conf, tableDescriptor, wal); // this following piece is a hack. currently a coprocessorHost // is secretly loaded at OpenRegionHandler. we don't really // start a region server here, so just manually create cphost From cbc708ba38aa0ab232ef12b0b28b4f2c3c69c4d5 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Wed, 17 Jul 2024 10:59:00 -0400 Subject: [PATCH 5/6] small cleanup --- .../hadoop/hbase/regionserver/RegionCoprocessorHost.java | 3 ++- .../hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java | 3 --- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index e113557e4709..22b6ea4ac366 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -138,7 +138,8 @@ public RegionEnvironment(final RegionCoprocessor impl, final int priority, final this.services = services; this.metricRegistry = MetricsCoprocessor.createRegistryForRegionCoprocessor(impl.getClass().getName()); - // lets unit tests through + // Some unit tests reach this line with services == null, and are okay with rpcQuotaManager + // being null. Let these unit tests succeed. This should not happen in real usage. if (services != null) { this.rpcQuotaManager = services.getRegionServerRpcQuotaManager(); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java index d4d1856566f2..eeb3eb8bb0f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionCoprocessorQuotaUsage.java @@ -79,8 +79,6 @@ public void preGetOp(ObserverContext c, Get get, } public static class MyCoprocessor implements RegionCoprocessor { - private RegionCoprocessorEnvironment env; - RegionObserver observer = new MyRegionObserver(); @Override @@ -110,7 +108,6 @@ public static void setUp() throws Exception { @AfterClass public static void tearDown() throws Exception { - CONN.close(); UTIL.shutdownMiniCluster(); } From 2825e301735cbd7047d870e9b2ebd0e5832efe7d Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Thu, 25 Jul 2024 07:57:17 -0400 Subject: [PATCH 6/6] Remove references to ClientProtos --- .../coprocessor/RegionCoprocessorEnvironment.java | 13 +++++-------- .../hbase/quotas/RegionServerRpcQuotaManager.java | 3 ++- .../hadoop/hbase/quotas/RpcQuotaManager.java | 14 ++++++++++++++ .../hbase/regionserver/RegionCoprocessorHost.java | 6 +++--- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java index 77e0285151f2..1bac7a068bf9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; -import java.util.List; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -37,8 +36,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; - @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment { @@ -167,12 +164,12 @@ OperationQuota checkBatchQuota(final Region region, final OperationQuota.Operati * available quota and to report the data/usage of the operation. This method does not support * scans because estimating a scan's workload is more complicated than estimating the workload of * a get/put. - * @param region the region where the operation will be performed - * @param actions the "multi" actions to perform - * @param hasCondition whether the RegionAction has a condition + * @param region the region where the operation will be performed + * @param numWrites number of writes to count against quota + * @param numReads number of reads to count against quota * @return the OperationQuota * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. */ - OperationQuota checkBatchQuota(final Region region, final List actions, - boolean hasCondition) throws IOException, RpcThrottlingException; + OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads) + throws IOException, RpcThrottlingException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 4c84fe1ccd5e..f9a7ccba401b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -225,7 +225,8 @@ public OperationQuota checkBatchQuota(final Region region, * @return the OperationQuota * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. */ - private OperationQuota checkBatchQuota(final Region region, final int numWrites, + @Override + public OperationQuota checkBatchQuota(final Region region, final int numWrites, final int numReads) throws IOException, RpcThrottlingException { Optional user = RpcServer.getRequestUser(); UserGroupInformation ugi; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java index afddd5341eec..60392ca3b3f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java @@ -75,4 +75,18 @@ OperationQuota checkBatchQuota(final Region region, final OperationQuota.Operati */ OperationQuota checkBatchQuota(final Region region, final List actions, boolean hasCondition) throws IOException, RpcThrottlingException; + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. This method does not support + * scans because estimating a scan's workload is more complicated than estimating the workload of + * a get/put. + * @param region the region where the operation will be performed + * @param numWrites number of writes to count against quota + * @param numReads number of reads to count against quota + * @return the OperationQuota + * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. + */ + OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads) + throws IOException, RpcThrottlingException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 22b6ea4ac366..c4e68c234077 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -225,9 +225,9 @@ public OperationQuota checkBatchQuota(Region region, OperationQuota.OperationTyp } @Override - public OperationQuota checkBatchQuota(Region region, List actions, - boolean hasCondition) throws IOException, RpcThrottlingException { - return rpcQuotaManager.checkBatchQuota(region, actions, hasCondition); + public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads) + throws IOException, RpcThrottlingException { + return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads); } }