diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java index 0555202f88b9..260d6e1a9803 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java @@ -92,11 +92,21 @@ public interface RpcCall extends RpcCallContext { Map getConnectionAttributes(); /** - * Returns the map of attributes specified when building the request. + * Returns the map of attributes specified when building the request. This map is lazily evaluated + * so if you only need a single attribute then it may be cheaper to use + * {@link #getRequestAttribute(String)} * @see org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[]) */ Map getRequestAttributes(); + /** + * Returns a single request attribute value, or null if no value is present. If you need many + * request attributes then you should fetch the lazily evaluated map via + * {@link #getRequestAttributes()} + * @see org.apache.hadoop.hbase.client.TableBuilder#setRequestAttribute(String, byte[]) + */ + byte[] getRequestAttribute(String key); + /** Returns Port of remote address in this call */ int getRemotePort(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 66a2e44fac19..ed688977b963 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -234,6 +234,19 @@ public Map getRequestAttributes() { return this.requestAttributes; } + @Override + public byte[] getRequestAttribute(String key) { + if (this.requestAttributes == null) { + for (HBaseProtos.NameBytesPair nameBytesPair : header.getAttributeList()) { + if (nameBytesPair.getName().equals(key)) { + return nameBytesPair.getValue().toByteArray(); + } + } + return null; + } + return this.requestAttributes.get(key); + } + @Override public int getPriority() { return this.header.getPriority(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index 0f7b5e42e68b..0a57b9fd8f8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -24,6 +24,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -35,8 +36,11 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.RegionStatesCount; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.security.UserGroupInformation; import org.apache.yetus.audience.InterfaceAudience; @@ -57,6 +61,11 @@ public class QuotaCache implements Stoppable { private static final Logger LOG = LoggerFactory.getLogger(QuotaCache.class); public static final String REFRESH_CONF_KEY = "hbase.quota.refresh.period"; + + // defines the request attribute key which, when provided, will override the request's username + // from the perspective of user quotas + public static final String QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY = + "hbase.quota.user.override.key"; private static final int REFRESH_DEFAULT_PERIOD = 5 * 60000; // 5min private static final int EVICT_PERIOD_FACTOR = 5; // N * REFRESH_DEFAULT_PERIOD @@ -74,12 +83,15 @@ public class QuotaCache implements Stoppable { private final ConcurrentHashMap tableMachineQuotaFactors = new ConcurrentHashMap<>(); private final RegionServerServices rsServices; + private final String userOverrideRequestAttributeKey; private QuotaRefresherChore refreshChore; private boolean stopped = true; public QuotaCache(final RegionServerServices rsServices) { this.rsServices = rsServices; + this.userOverrideRequestAttributeKey = + rsServices.getConfiguration().get(QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY); } public void start() throws IOException { @@ -125,7 +137,7 @@ public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableNa * @return the quota info associated to specified user */ public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) { - return computeIfAbsent(userQuotaCache, ugi.getShortUserName(), UserQuotaState::new, + return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), UserQuotaState::new, this::triggerCacheRefresh); } @@ -160,6 +172,28 @@ protected boolean isExceedThrottleQuotaEnabled() { return exceedThrottleQuotaEnabled; } + /** + * Applies a request attribute user override if available, otherwise returns the UGI's short + * username + * @param ugi The request's UserGroupInformation + */ + private String getQuotaUserName(final UserGroupInformation ugi) { + if (userOverrideRequestAttributeKey == null) { + return ugi.getShortUserName(); + } + + Optional rpcCall = RpcServer.getCurrentCall(); + if (!rpcCall.isPresent()) { + return ugi.getShortUserName(); + } + + byte[] override = rpcCall.get().getRequestAttribute(userOverrideRequestAttributeKey); + if (override == null) { + return ugi.getShortUserName(); + } + return Bytes.toString(override); + } + /** * Returns the QuotaState requested. If the quota info is not in cache an empty one will be * returned and the quota request will be enqueued for the next cache refresh. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java index af6c51260fd5..35a1757115c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java @@ -771,6 +771,11 @@ public Map getRequestAttributes() { pair -> pair.getValue().toByteArray())); } + @Override + public byte[] getRequestAttribute(String key) { + return null; + } + @Override public int getRemotePort() { return 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index 83f788ba1518..305f0e29e952 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -232,6 +232,11 @@ public Map getRequestAttributes() { return null; } + @Override + public byte[] getRequestAttribute(String key) { + return null; + } + @Override public int getRemotePort() { return 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java new file mode 100644 index 000000000000..75b3cc3ca84a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaUserOverride.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestQuotaUserOverride { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestQuotaUserOverride.class); + + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final byte[] FAMILY = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + private static final int NUM_SERVERS = 1; + private static final String CUSTOM_OVERRIDE_KEY = "foo"; + + private static final TableName TABLE_NAME = TableName.valueOf("TestQuotaUserOverride"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 1_000); + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 1); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 500); + TEST_UTIL.getConfiguration().set(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY, + CUSTOM_OVERRIDE_KEY); + TEST_UTIL.startMiniCluster(NUM_SERVERS); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + QuotaCache.TEST_FORCE_REFRESH = true; + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + EnvironmentEdgeManager.reset(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testUserGlobalThrottleWithCustomOverride() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final String userOverrideWithQuota = User.getCurrent().getShortName() + "123"; + + // Add 6req/min limit + admin.setQuota(QuotaSettingsFactory.throttleUser(userOverrideWithQuota, + ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES)); + + Table tableWithThrottle = TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null) + .setRequestAttribute(CUSTOM_OVERRIDE_KEY, Bytes.toBytes(userOverrideWithQuota)).build(); + Table tableWithoutThrottle = TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null) + .setRequestAttribute(QuotaCache.QUOTA_USER_REQUEST_ATTRIBUTE_OVERRIDE_KEY, + Bytes.toBytes(userOverrideWithQuota)) + .build(); + Table tableWithoutThrottle2 = + TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).build(); + + // warm things up + doPuts(10, FAMILY, QUALIFIER, tableWithThrottle); + doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle); + doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2); + + // should reject some requests + assertTrue(10 > doPuts(10, FAMILY, QUALIFIER, tableWithThrottle)); + // should accept all puts + assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle)); + // should accept all puts + assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2)); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userOverrideWithQuota)); + Thread.sleep(60_000); + assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithThrottle)); + assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle)); + assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tableWithoutThrottle2)); + } + +}