Skip to content

Commit

Permalink
Support quota user overrides
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Mattingly committed Sep 20, 2023
1 parent 94e4055 commit 7c99721
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -35,8 +36,11 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -57,6 +61,11 @@ public class QuotaCache implements Stoppable {
private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);

public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";

// defines the request attribute key which, when provided, will override the request's username
// from the perspective of user quotas. This is also the default request attribute key
public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
"hbase.quota.user.override.key";
private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD

Expand All @@ -74,12 +83,15 @@ public class QuotaCache implements Stoppable {
private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
new ConcurrentHashMap<>();
private final RegionServerServices rsServices;
private final String userOverrideRequestAttributeKey;

private QuotaRefresherChore refreshChore;
private boolean stopped = true;

public QuotaCache(final RegionServerServices rsServices) {
this.rsServices = rsServices;
this.userOverrideRequestAttributeKey = rsServices.getConfiguration()
.get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY, QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
}

public void start() throws IOException {
Expand Down Expand Up @@ -125,7 +137,7 @@ public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableNa
* @return the quota info associated to specified user
*/
public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new,
return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), UserQuotaState::new,
this::triggerCacheRefresh);
}

Expand Down Expand Up @@ -160,6 +172,23 @@ protected boolean isExceedThrottleQuotaEnabled() {
return exceedThrottleQuotaEnabled;
}

/**
* Applies a request attribute user override if available, otherwise returns the UGI's short
* username
* @param ugi The request's UserGroupInformation
*/
private String getQuotaUserName(final UserGroupInformation ugi) {
Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
if (
rpcCall.isPresent()
&& rpcCall.get().getRequestAttributes().containsKey(userOverrideRequestAttributeKey)
) {
return Bytes
.toString(rpcCall.get().getRequestAttributes().get(userOverrideRequestAttributeKey));
}
return ugi.getShortUserName();
}

/**
* Returns the QuotaState requested. If the quota info is not in cache an empty one will be
* returned and the quota request will be enqueued for the next cache refresh.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ RegionServerTests.class, MediumTests.class })
public class TestQuotaUserOverride {

private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final byte[] FAMILY = Bytes.toBytes("cf");
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static final int NUM_SERVERS = 1;

private static final TableName TABLE_NAME = TableName.valueOf("TestQuotaUserOverride");

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1_000);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 1);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
TEST_UTIL.startMiniCluster(NUM_SERVERS);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
QuotaCache.TEST_FORCE_REFRESH = true;
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
EnvironmentEdgeManager.reset();
TEST_UTIL.shutdownMiniCluster();
}

@Test
public void testUserGlobalThrottleWithOverride() throws Exception {
final Admin admin = TEST_UTIL.getAdmin();
final String userOverrideWithQuota = User.getCurrent().getShortName() + "123";
final String userOverrideWithoutQuota = User.getCurrent().getShortName() + "456";

// Add 6req/min limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota,
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
Thread.sleep(60_000);

Table tableWithThrottle = admin.getConnection().getTableBuilder(TABLE_NAME, null)
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
Bytes.toBytes(userOverrideWithQuota))
.build();
Table tableWithoutThrottle = admin.getConnection().getTableBuilder(TABLE_NAME, null)
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
Bytes.toBytes(userOverrideWithoutQuota))
.build();

// warm things up
doPuts(10, FAMILY, QUALIFIER, tableWithThrottle);
doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle);

// should reject some requests
assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
// should accept all puts
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));

// Remove all the limits
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota));
Thread.sleep(60_000);
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ RegionServerTests.class, MediumTests.class })
public class TestQuotaUserOverrideConfiguration {

private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final byte[] FAMILY = Bytes.toBytes("cf");
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static final int NUM_SERVERS = 1;
private static final String CUSTOM_OVERRIDE_KEY = "foo";

private static final TableName TABLE_NAME =
TableName.valueOf("TestQuotaUserOverrideConfiguration");

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1_000);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 1);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
TEST_UTIL.getConfiguration().set(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
CUSTOM_OVERRIDE_KEY);
TEST_UTIL.startMiniCluster(NUM_SERVERS);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
QuotaCache.TEST_FORCE_REFRESH = true;
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
EnvironmentEdgeManager.reset();
TEST_UTIL.shutdownMiniCluster();
}

@Test
public void testUserGlobalThrottleWithCustomOverride() throws Exception {
final Admin admin = TEST_UTIL.getAdmin();
final String userOverrideWithQuota = User.getCurrent().getShortName() + "123";

// Add 6req/min limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota,
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
Thread.sleep(60_000);

Table tableWithThrottle = admin.getConnection().getTableBuilder(TABLE_NAME, null)
.setRequestAttribute(CUSTOM_OVERRIDE_KEY, Bytes.toBytes(userOverrideWithQuota)).build();
Table tableWithoutThrottle = admin.getConnection().getTableBuilder(TABLE_NAME, null)
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
Bytes.toBytes(userOverrideWithQuota))
.build();

// warm things up
doPuts(10, FAMILY, QUALIFIER, tableWithThrottle);
doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle);

// should reject some requests
assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
// should accept all puts
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));

// Remove all the limits
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota));
Thread.sleep(60_000);
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
}

}

0 comments on commit 7c99721

Please sign in to comment.