Skip to content

Commit

Permalink
HubSpot Backport: HBASE-27784: support quota user overrides (apache#5424
Browse files Browse the repository at this point in the history
)

Signed-off-by: Nick Dimiduk <[email protected]>
Signed-off-by: Bryan Beaudreault <[email protected]>
  • Loading branch information
rmdmattingly authored and bbeaudreault committed Sep 27, 2023
1 parent 61c109e commit 5d12972
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,21 @@ public interface RpcCall extends RpcCallContext {
Map<String, byte[]> getConnectionAttributes();

/**
* Returns the map of attributes specified when building the request.
* Returns the map of attributes specified when building the request. This map is lazily evaluated
* so if you only need a single attribute then it may be cheaper to use
* {@link #getRequestAttribute(String)}
* @see org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[])
*/
Map<String, byte[]> getRequestAttributes();

/**
* Returns a single request attribute value, or null if no value is present. If you need many
* request attributes then you should fetch the lazily evaluated map via
* {@link #getRequestAttributes()}
* @see org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[])
*/
byte[] getRequestAttribute(String key);

/** Returns Port of remote address in this call */
int getRemotePort();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,19 @@ public Map<String, byte[]> getRequestAttributes() {
return this.requestAttributes;
}

@Override
public byte[] getRequestAttribute(String key) {
if (this.requestAttributes == null) {
for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) {
if (nameBytesPair.getName().equals(key)) {
return nameBytesPair.getValue().toByteArray();
}
}
return null;
}
return this.requestAttributes.get(key);
}

@Override
public int getPriority() {
return this.header.getPriority();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -34,8 +35,11 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -56,6 +60,11 @@ public class QuotaCache implements Stoppable {
private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class);

public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period";

// defines the request attribute key which, when provided, will override the request's username
// from the perspective of user quotas
public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY =
"hbase.quota.user.override.key";
private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min
private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD

Expand All @@ -76,12 +85,15 @@ public class QuotaCache implements Stoppable {
private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
new ConcurrentHashMap<>();
private final RegionServerServices rsServices;
private final String userOverrideRequestAttributeKey;

private QuotaRefresherChore refreshChore;
private boolean stopped = true;

public QuotaCache(final RegionServerServices rsServices) {
this.rsServices = rsServices;
this.userOverrideRequestAttributeKey =
rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY);
}

public void start() throws IOException {
Expand Down Expand Up @@ -127,7 +139,7 @@ public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableNa
* @return the quota info associated to specified user
*/
public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new,
return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), UserQuotaState::new,
this::triggerCacheRefresh);
}

Expand Down Expand Up @@ -162,6 +174,28 @@ protected boolean isExceedThrottleQuotaEnabled() {
return exceedThrottleQuotaEnabled;
}

/**
* Applies a request attribute user override if available, otherwise returns the UGI's short
* username
* @param ugi The request's UserGroupInformation
*/
private String getQuotaUserName(final UserGroupInformation ugi) {
if (userOverrideRequestAttributeKey == null) {
return ugi.getShortUserName();
}

Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
if (!rpcCall.isPresent()) {
return ugi.getShortUserName();
}

byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey);
if (override == null) {
return ugi.getShortUserName();
}
return Bytes.toString(override);
}

/**
* Returns the QuotaState requested. If the quota info is not in cache an empty one will be
* returned and the quota request will be enqueued for the next cache refresh.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,11 @@ public Map<String, byte[]> getRequestAttributes() {
pair -> pair.getValue().toByteArray()));
}

@Override
public byte[] getRequestAttribute(String key) {
return null;
}

@Override
public int getRemotePort() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ public Map<String, byte[]> getRequestAttributes() {
return null;
}

@Override
public byte[] getRequestAttribute(String key) {
return null;
}

@Override
public int getRemotePort() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ RegionServerTests.class, MediumTests.class })
public class TestQuotaUserOverride {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestQuotaUserOverride.class);

private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] FAMILY = Bytes.toBytes("cf");
private static final byte[] QUALIFIER = Bytes.toBytes("q");
private static final int NUM_SERVERS = 1;
private static final String CUSTOM_OVERRIDE_KEY = "foo";

private static final TableName TABLE_NAME = TableName.valueOf("TestQuotaUserOverride");

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1_000);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 1);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500);
TEST_UTIL.getConfiguration().set(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
CUSTOM_OVERRIDE_KEY);
TEST_UTIL.startMiniCluster(NUM_SERVERS);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
QuotaCache.TEST_FORCE_REFRESH = true;
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
EnvironmentEdgeManager.reset();
TEST_UTIL.shutdownMiniCluster();
}

@Test
public void testUserGlobalThrottleWithCustomOverride() throws Exception {
final Admin admin = TEST_UTIL.getAdmin();
final String userOverrideWithQuota = User.getCurrent().getShortName() + "123";

// Add 6req/min limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota,
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));

Table tableWithThrottle = TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
.setRequestAttribute(CUSTOM_OVERRIDE_KEY, Bytes.toBytes(userOverrideWithQuota)).build();
Table tableWithoutThrottle = TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null)
.setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY,
Bytes.toBytes(userOverrideWithQuota))
.build();
Table tableWithoutThrottle2 =
TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).build();

// warm things up
doPuts(10, FAMILY, QUALIFIER, tableWithThrottle);
doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle);
doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2);

// should reject some requests
assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
// should accept all puts
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
// should accept all puts
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));

// Remove all the limits
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota));
Thread.sleep(60_000);
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle));
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle));
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2));
}

}

0 comments on commit 5d12972

Please sign in to comment.