Skip to content

Commit

Permalink
HubSpot Backport: HBASE-27800: Add support for default user quotas (a…
Browse files Browse the repository at this point in the history
…pache#5666)

Signed-off-by: Bryan Beaudreault <[email protected]>
  • Loading branch information
rmdmattingly authored and bbeaudreault committed Feb 7, 2024
1 parent 7283a80 commit 53f2137
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableNa
* @return the quota info associated to specified user
*/
public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), UserQuotaState::new,
return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi),
() -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration()),
this::triggerCacheRefresh);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
Expand All @@ -48,7 +49,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
Expand All @@ -72,6 +75,26 @@ public class QuotaUtil extends QuotaTableUtil {
// the default one write capacity unit is 1024 bytes (1KB)
public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;

/*
* The below defaults, if configured, will be applied to otherwise unthrottled users. For example,
* set `hbase.quota.default.user.machine.read.size` to `1048576` in your hbase-site.xml to ensure
* that any given user may not query more than 1mb per second from any given machine, unless
* explicitly permitted by a persisted quota. All of these defaults use TimeUnit.SECONDS and
* QuotaScope.MACHINE.
*/
public static final String QUOTA_DEFAULT_USER_MACHINE_READ_NUM =
"hbase.quota.default.user.machine.read.num";
public static final String QUOTA_DEFAULT_USER_MACHINE_READ_SIZE =
"hbase.quota.default.user.machine.read.size";
public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM =
"hbase.quota.default.user.machine.request.num";
public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE =
"hbase.quota.default.user.machine.request.size";
public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM =
"hbase.quota.default.user.machine.write.num";
public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE =
"hbase.quota.default.user.machine.write.size";

/** Table descriptor for Quota internal table */
public static final HTableDescriptor QUOTA_TABLE_DESC = new HTableDescriptor(QUOTA_TABLE_NAME);
static {
Expand Down Expand Up @@ -283,10 +306,14 @@ public static Map<String, UserQuotaState> fetchUserQuotas(final Connection conne
assert isUserRowKey(key);
String user = getUserFromRowKey(key);

if (results[i].isEmpty()) {
userQuotas.put(user, buildDefaultUserQuotaState(connection.getConfiguration()));
continue;
}

final UserQuotaState quotaInfo = new UserQuotaState(nowTs);
userQuotas.put(user, quotaInfo);

if (results[i].isEmpty()) continue;
assert Bytes.equals(key, results[i].getRow());

try {
Expand Down Expand Up @@ -320,6 +347,38 @@ public void visitUserQuotas(String userName, Quotas quotas) {
return userQuotas;
}

protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf) {
QuotaProtos.Throttle.Builder throttleBuilder = QuotaProtos.Throttle.newBuilder();

buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_NUM)
.ifPresent(throttleBuilder::setReadNum);
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_SIZE)
.ifPresent(throttleBuilder::setReadSize);
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM)
.ifPresent(throttleBuilder::setReqNum);
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE)
.ifPresent(throttleBuilder::setReqSize);
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM)
.ifPresent(throttleBuilder::setWriteNum);
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE)
.ifPresent(throttleBuilder::setWriteSize);

UserQuotaState state = new UserQuotaState();
QuotaProtos.Quotas defaultQuotas =
QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build();
state.setQuotas(defaultQuotas);
return state;
}

private static Optional<TimedQuota> buildDefaultTimedQuota(Configuration conf, String key) {
int defaultSoftLimit = conf.getInt(key, -1);
if (defaultSoftLimit == -1) {
return Optional.empty();
}
return Optional.of(ProtobufUtil.toTimedQuota(defaultSoftLimit,
java.util.concurrent.TimeUnit.SECONDS, org.apache.hadoop.hbase.quotas.QuotaScope.MACHINE));
}

public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException {
return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh;
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ RegionServerTests.class, MediumTests.class })
public class TestDefaultQuota {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDefaultQuota.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString());
private static final int REFRESH_TIME = 5000;
private static final byte[] FAMILY = Bytes.toBytes("cf");
private static final byte[] QUALIFIER = Bytes.toBytes("q");

@After
public void tearDown() throws Exception {
ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
EnvironmentEdgeManager.reset();
TEST_UTIL.deleteTable(TABLE_NAME);
TEST_UTIL.shutdownMiniCluster();
}

@BeforeClass
public static void setUpBeforeClass() throws Exception {
// quotas enabled, using block bytes scanned
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME);
TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_READ_NUM, 1);

// don't cache blocks to make IO predictable
TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);

TEST_UTIL.startMiniCluster(1);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
QuotaCache.TEST_FORCE_REFRESH = true;

try (Admin admin = TEST_UTIL.getAdmin()) {
ThrottleQuotaTestUtil.doPuts(1_000, FAMILY, QUALIFIER,
admin.getConnection().getTable(TABLE_NAME));
}
TEST_UTIL.flush(TABLE_NAME);
}

@Test
public void testDefaultUserReadNum() throws Exception {
// Should have a strict throttle by default
TEST_UTIL.waitFor(60_000, () -> runGetsTest(100) < 100);

// Add big quota and should be effectively unlimited
configureLenientThrottle();
refreshQuotas();
// Should run without error
TEST_UTIL.waitFor(60_000, () -> runGetsTest(100) == 100);

// Remove all the limits, and should revert to strict default
unsetQuota();
TEST_UTIL.waitFor(60_000, () -> runGetsTest(100) < 100);
}

private void configureLenientThrottle() throws IOException {
try (Admin admin = TEST_UTIL.getAdmin()) {
admin.setQuota(QuotaSettingsFactory.throttleUser(getUserName(), ThrottleType.READ_NUMBER,
100_000, TimeUnit.SECONDS));
}
}

private static String getUserName() throws IOException {
return User.getCurrent().getShortName();
}

private void refreshQuotas() throws Exception {
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
waitMinuteQuota();
}

private void unsetQuota() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
admin.setQuota(QuotaSettingsFactory.unthrottleUser(getUserName()));
}
refreshQuotas();
}

private long runGetsTest(int attempts) throws Exception {
refreshQuotas();
try (Table table = getTable()) {
return ThrottleQuotaTestUtil.doGets(attempts, FAMILY, QUALIFIER, table);
}
}

private Table getTable() throws IOException {
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250)
.build();
}

}

0 comments on commit 53f2137

Please sign in to comment.