Skip to content

Commit

Permalink
HubSpot Backport: HBASE-28370 Default user quotas are refreshing too …
Browse files Browse the repository at this point in the history
…frequently (apache#5686) (#83)

Signed-off-by: Bryan Beaudreault <[email protected]>
  • Loading branch information
rmdmattingly authored Mar 25, 2024
1 parent bfa5f56 commit ca4f4bf
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class QuotaCache implements Stoppable {

// for testing purpose only, enforce the cache to be always refreshed
static boolean TEST_FORCE_REFRESH = false;
// for testing purpose only, block cache refreshes to reliably verify state
static boolean TEST_BLOCK_REFRESH = false;

private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache =
new ConcurrentHashMap<>();
Expand Down Expand Up @@ -140,7 +142,7 @@ public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableNa
*/
public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi),
() -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration()),
() -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration(), 0L),
this::triggerCacheRefresh);
}

Expand Down Expand Up @@ -242,6 +244,14 @@ public QuotaRefresherChore(final int period, final Stoppable stoppable) {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "GC_UNRELATED_TYPES",
justification = "I do not understand why the complaints, it looks good to me -- FIX")
protected void chore() {
while (TEST_BLOCK_REFRESH) {
LOG.info("TEST_BLOCK_REFRESH=true, so blocking QuotaCache refresh until it is false");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// Prefetch online tables/namespaces
for (TableName table : ((HRegionServer) QuotaCache.this.rsServices).getOnlineTables()) {
if (table.isSystemTable()) continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public static Map<String, UserQuotaState> fetchUserQuotas(final Connection conne
String user = getUserFromRowKey(key);

if (results[i].isEmpty()) {
userQuotas.put(user, buildDefaultUserQuotaState(connection.getConfiguration()));
userQuotas.put(user, buildDefaultUserQuotaState(connection.getConfiguration(), nowTs));
continue;
}

Expand Down Expand Up @@ -373,7 +373,7 @@ public void visitUserQuotas(String userName, Quotas quotas) {
return userQuotas;
}

protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf) {
protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf, long nowTs) {
QuotaProtos.Throttle.Builder throttleBuilder = QuotaProtos.Throttle.newBuilder();

buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_NUM)
Expand All @@ -389,7 +389,7 @@ protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf) {
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE)
.ifPresent(throttleBuilder::setWriteSize);

UserQuotaState state = new UserQuotaState();
UserQuotaState state = new UserQuotaState(nowTs);
QuotaProtos.Quotas defaultQuotas =
QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build();
state.setQuotas(defaultQuotas);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota;
import static org.junit.Assert.assertEquals;

import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ RegionServerTests.class, MediumTests.class })
public class TestQuotaCache {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestQuotaCache.class);

private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final int REFRESH_TIME = 30_000;

@After
public void tearDown() throws Exception {
ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
EnvironmentEdgeManager.reset();
TEST_UTIL.shutdownMiniCluster();
}

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME);
TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_READ_NUM, 1000);

TEST_UTIL.startMiniCluster(1);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
}

@Test
public void testDefaultUserRefreshFrequency() throws Exception {
QuotaCache.TEST_BLOCK_REFRESH = true;

QuotaCache quotaCache =
ThrottleQuotaTestUtil.getQuotaCaches(TEST_UTIL).stream().findAny().get();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();

UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
assertEquals(userQuotaState.getLastUpdate(), 0);

QuotaCache.TEST_BLOCK_REFRESH = false;
// new user should have refreshed immediately
TEST_UTIL.waitFor(5_000, () -> userQuotaState.getLastUpdate() != 0);
long lastUpdate = userQuotaState.getLastUpdate();

// refresh should not apply to recently refreshed quota
quotaCache.triggerCacheRefresh();
Thread.sleep(250);
long newLastUpdate = userQuotaState.getLastUpdate();
assertEquals(lastUpdate, newLastUpdate);

quotaCache.triggerCacheRefresh();
waitMinuteQuota();
// should refresh after time has passed
TEST_UTIL.waitFor(5_000, () -> lastUpdate != userQuotaState.getLastUpdate());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
Expand Down Expand Up @@ -282,6 +284,16 @@ public String explainFailure() throws Exception {
}
}

static Set<QuotaCache> getQuotaCaches(HBaseTestingUtility testUtil) {
Set<QuotaCache> quotaCaches = new HashSet<>();
for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) {
RegionServerRpcQuotaManager quotaManager =
rst.getRegionServer().getRegionServerRpcQuotaManager();
quotaCaches.add(quotaManager.getQuotaCache());
}
return quotaCaches;
}

static void waitMinuteQuota() {
envEdge.incValue(70000);
}
Expand Down

0 comments on commit ca4f4bf

Please sign in to comment.