Skip to content

Commit

Permalink
HBASE-28349 Count atomic operations against read quotas (#5668)
Browse files Browse the repository at this point in the history
Signed-off-by: Bryan Beaudreault <[email protected]>
  • Loading branch information
rmdmattingly authored Feb 8, 2024
1 parent e85557a commit 98eb3e0
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public interface OperationQuota {
public enum OperationType {
MUTATE,
GET,
SCAN
SCAN,
CHECK_AND_MUTATE
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
Expand Down Expand Up @@ -177,6 +178,31 @@ public static void deleteRegionServerQuota(final Connection connection, final St
deleteQuotas(connection, getRegionServerRowKey(regionServer));
}

public static OperationQuota.OperationType getQuotaOperationType(ClientProtos.Action action,
boolean hasCondition) {
if (action.hasMutation()) {
return getQuotaOperationType(action.getMutation(), hasCondition);
}
return OperationQuota.OperationType.GET;
}

public static OperationQuota.OperationType
getQuotaOperationType(ClientProtos.MutateRequest mutateRequest) {
return getQuotaOperationType(mutateRequest.getMutation(), mutateRequest.hasCondition());
}

private static OperationQuota.OperationType
getQuotaOperationType(ClientProtos.MutationProto mutationProto, boolean hasCondition) {
ClientProtos.MutationProto.MutationType mutationType = mutationProto.getMutateType();
if (
hasCondition || mutationType == ClientProtos.MutationProto.MutationType.APPEND
|| mutationType == ClientProtos.MutationProto.MutationType.INCREMENT
) {
return OperationQuota.OperationType.CHECK_AND_MUTATE;
}
return OperationQuota.OperationType.MUTATE;
}

protected static void switchExceedThrottleQuota(final Connection connection,
boolean exceedThrottleQuotaEnabled) throws IOException {
if (exceedThrottleQuotaEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,25 +171,33 @@ public OperationQuota checkQuota(final Region region, final OperationQuota.Opera
return checkQuota(region, 0, 1, 0);
case MUTATE:
return checkQuota(region, 1, 0, 0);
case CHECK_AND_MUTATE:
return checkQuota(region, 1, 1, 0);
}
throw new RuntimeException("Invalid operation type: " + type);
}

/**
* Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the
* available quota and to report the data/usage of the operation.
* @param region the region where the operation will be performed
* @param actions the "multi" actions to perform
* @param region the region where the operation will be performed
* @param actions the "multi" actions to perform
* @param hasCondition whether the RegionAction has a condition
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions)
throws IOException, RpcThrottlingException {
public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions,
boolean hasCondition) throws IOException, RpcThrottlingException {
int numWrites = 0;
int numReads = 0;
for (final ClientProtos.Action action : actions) {
if (action.hasMutation()) {
numWrites++;
OperationQuota.OperationType operationType =
QuotaUtil.getQuotaOperationType(action, hasCondition);
if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) {
numReads++;
}
} else if (action.hasGet()) {
numReads++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2679,7 +2679,8 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)

try {
region = getRegion(regionSpecifier);
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
regionAction.hasCondition());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
return responseBuilder.build();
Expand Down Expand Up @@ -2741,7 +2742,8 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)

try {
region = getRegion(regionSpecifier);
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(),
regionAction.hasCondition());
} catch (IOException e) {
failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
continue; // For this region it's a failure.
Expand Down Expand Up @@ -2924,7 +2926,8 @@ public MutateResponse mutate(final RpcController rpcc, final MutateRequest reque
server.getMemStoreFlusher().reclaimMemStoreMemory();
}
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request);
quota = getRpcQuotaManager().checkQuota(region, operationType);
ActivePolicyEnforcement spaceQuotaEnforcement =
getSpaceQuotaManager().getActiveEnforcements();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ RegionServerTests.class, MediumTests.class })
public class TestAtomicReadQuota {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAtomicReadQuota.class);
private static final Logger LOG = LoggerFactory.getLogger(TestAtomicReadQuota.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString());
private static final byte[] FAMILY = Bytes.toBytes("cf");
private static final byte[] QUALIFIER = Bytes.toBytes("q");

@AfterClass
public static void tearDown() throws Exception {
ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
EnvironmentEdgeManager.reset();
TEST_UTIL.deleteTable(TABLE_NAME);
TEST_UTIL.shutdownMiniCluster();
}

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1000);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
QuotaCache.TEST_FORCE_REFRESH = true;
}

@Test
public void testIncrementCountedAgainstReadCapacity() throws Exception {
setupQuota();

Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
inc.addColumn(FAMILY, QUALIFIER, 1);
testThrottle(table -> table.increment(inc));
}

@Test
public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception {
setupQuota();

byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
Increment inc = new Increment(row);
inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));

RowMutations rowMutations = new RowMutations(row);
rowMutations.add(inc);
rowMutations.add(put);
testThrottle(table -> table.mutateRow(rowMutations));
}

@Test
public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception {
setupQuota();

byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));

RowMutations rowMutations = new RowMutations(row);
rowMutations.add(put);
try (Table table = getTable()) {
for (int i = 0; i < 100; i++) {
table.mutateRow(rowMutations);
}
}
}

@Test
public void testNonAtomicPutOmittedFromReadCapacity() throws Exception {
setupQuota();

byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
try (Table table = getTable()) {
for (int i = 0; i < 100; i++) {
table.put(put);
}
}
}

@Test
public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception {
setupQuota();

Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));
Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v"));

Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString()));
inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);

List<Put> puts = new ArrayList<>(2);
puts.add(put1);
puts.add(put2);

try (Table table = getTable()) {
for (int i = 0; i < 100; i++) {
table.put(puts);
}
}
}

@Test
public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception {
setupQuota();

byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
byte[] value = Bytes.toBytes("v");
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("doot"), value);
CheckAndMutate checkAndMutate =
CheckAndMutate.newBuilder(row).ifEquals(FAMILY, QUALIFIER, value).build(put);

testThrottle(table -> table.checkAndMutate(checkAndMutate));
}

@Test
public void testAtomicBatchCountedAgainstReadCapacity() throws Exception {
setupQuota();

byte[] row = Bytes.toBytes(UUID.randomUUID().toString());
Increment inc = new Increment(row);
inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1);

List<Increment> incs = new ArrayList<>(2);
incs.add(inc);
incs.add(inc);

testThrottle(table -> {
Object[] results = new Object[] {};
table.batch(incs, results);
return results;
});
}

private void setupQuota() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(),
ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES));
}
ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
}

private void cleanupQuota() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName()));
}
ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
}

private void testThrottle(ThrowingFunction<Table, ?> request) throws Exception {
try (Table table = getTable()) {
// we have a read quota configured, so this should fail
TEST_UTIL.waitFor(60_000, () -> {
try {
request.run(table);
return false;
} catch (Exception e) {
boolean success = e.getCause() instanceof RpcThrottlingException;
if (!success) {
LOG.error("Unexpected exception", e);
}
return success;
}
});
} finally {
cleanupQuota();
}
}

private Table getTable() throws IOException {
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250)
.build();
}

@FunctionalInterface
private interface ThrowingFunction<I, O> {
O run(I input) throws Exception;
}

}

0 comments on commit 98eb3e0

Please sign in to comment.