Skip to content

Commit

Permalink
Consider user groups in db resource groups manager
Browse files Browse the repository at this point in the history
  • Loading branch information
posulliv authored and hashhar committed Feb 16, 2022
1 parent d2f91e0 commit 53ad891
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ private synchronized Map.Entry<ManagerSpec, Map<ResourceGroupIdTemplate, Resourc
.map(selectorRecord ->
new SelectorSpec(
selectorRecord.getUserRegex(),
Optional.empty(),
selectorRecord.getUserGroupRegex(),
selectorRecord.getSourceRegex(),
selectorRecord.getQueryType(),
selectorRecord.getClientTags(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public interface ResourceGroupsDao
@UseRowMapper(ResourceGroupSpecBuilder.Mapper.class)
List<ResourceGroupSpecBuilder> getResourceGroups(@Bind("environment") String environment);

@SqlQuery("SELECT S.resource_group_id, S.priority, S.user_regex, S.source_regex, S.query_type, S.client_tags, S.selector_resource_estimate\n" +
@SqlQuery("SELECT S.resource_group_id, S.priority, S.user_regex, S.source_regex, S.query_type, S.client_tags, S.selector_resource_estimate, S.user_group_regex\n" +
"FROM selectors S\n" +
"JOIN resource_groups R ON (S.resource_group_id = R.resource_group_id)\n" +
"WHERE R.environment = :environment\n" +
Expand All @@ -72,6 +72,7 @@ public interface ResourceGroupsDao
" resource_group_id BIGINT NOT NULL,\n" +
" priority BIGINT NOT NULL,\n" +
" user_regex VARCHAR(512),\n" +
" user_group_regex VARCHAR(512),\n" +
" source_regex VARCHAR(512),\n" +
" query_type VARCHAR(512),\n" +
" client_tags VARCHAR(512),\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class SelectorRecord
private final long resourceGroupId;
private final long priority;
private final Optional<Pattern> userRegex;
private final Optional<Pattern> userGroupRegex;
private final Optional<Pattern> sourceRegex;
private final Optional<String> queryType;
private final Optional<List<String>> clientTags;
Expand All @@ -43,6 +44,7 @@ public SelectorRecord(
long resourceGroupId,
long priority,
Optional<Pattern> userRegex,
Optional<Pattern> userGroupRegex,
Optional<Pattern> sourceRegex,
Optional<String> queryType,
Optional<List<String>> clientTags,
Expand All @@ -51,6 +53,7 @@ public SelectorRecord(
this.resourceGroupId = resourceGroupId;
this.priority = priority;
this.userRegex = requireNonNull(userRegex, "userRegex is null");
this.userGroupRegex = requireNonNull(userGroupRegex, "userGroupRegex is null");
this.sourceRegex = requireNonNull(sourceRegex, "sourceRegex is null");
this.queryType = requireNonNull(queryType, "queryType is null");
this.clientTags = requireNonNull(clientTags, "clientTags is null").map(ImmutableList::copyOf);
Expand All @@ -72,6 +75,11 @@ public Optional<Pattern> getUserRegex()
return userRegex;
}

public Optional<Pattern> getUserGroupRegex()
{
return userGroupRegex;
}

public Optional<Pattern> getSourceRegex()
{
return sourceRegex;
Expand Down Expand Up @@ -106,6 +114,7 @@ public SelectorRecord map(ResultSet resultSet, StatementContext context)
resultSet.getLong("resource_group_id"),
resultSet.getLong("priority"),
Optional.ofNullable(resultSet.getString("user_regex")).map(Pattern::compile),
Optional.ofNullable(resultSet.getString("user_group_regex")).map(Pattern::compile),
Optional.ofNullable(resultSet.getString("source_regex")).map(Pattern::compile),
Optional.ofNullable(resultSet.getString("query_type")),
Optional.ofNullable(resultSet.getString("client_tags")).map(LIST_STRING_CODEC::fromJson),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE selectors ADD COLUMN user_group_regex VARCHAR(2048);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE selectors ADD user_group_regex VARCHAR(2048);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE selectors ADD COLUMN user_group_regex VARCHAR(2048);
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ void updateResourceGroup(
void deleteResourceGroup(@Bind("resource_group_id") long resourceGroupId);

@SqlUpdate("INSERT INTO selectors\n" +
"(resource_group_id, priority, user_regex, source_regex, query_type, client_tags, selector_resource_estimate)\n" +
"VALUES (:resource_group_id, :priority, :user_regex, :source_regex, :query_type, :client_tags, :selector_resource_estimate)")
"(resource_group_id, priority, user_regex, user_group_regex, source_regex, query_type, client_tags, selector_resource_estimate)\n" +
"VALUES (:resource_group_id, :priority, :user_regex, :user_group_regex, :source_regex, :query_type, :client_tags, :selector_resource_estimate)")
void insertSelector(
@Bind("resource_group_id") long resourceGroupId,
@Bind("priority") long priority,
@Bind("user_regex") String userRegex,
@Bind("user_group_regex") String userGroupRegex,
@Bind("source_regex") String sourceRegex,
@Bind("query_type") String queryType,
@Bind("client_tags") String clientTags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static io.trino.spi.resourcegroups.SchedulingPolicy.WEIGHTED;
import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -74,8 +75,8 @@ public void testEnvironments()
// two resource groups are the same except the group for the prod environment has a larger softMemoryLimit
dao.insertResourceGroup(1, "prod_global", "10MB", 1000, 100, 100, "weighted", null, true, "1h", "1d", null, prodEnvironment);
dao.insertResourceGroup(2, "dev_global", "1MB", 1000, 100, 100, "weighted", null, true, "1h", "1d", null, devEnvironment);
dao.insertSelector(1, 1, ".*prod_user.*", null, null, null, null);
dao.insertSelector(2, 2, ".*dev_user.*", null, null, null, null);
dao.insertSelector(1, 1, ".*prod_user.*", null, null, null, null, null);
dao.insertSelector(2, 2, ".*dev_user.*", null, null, null, null, null);

// check the prod configuration
DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, new DbResourceGroupConfig(), daoProvider.get(), prodEnvironment);
Expand Down Expand Up @@ -112,7 +113,7 @@ public void testConfiguration()
dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h");
dao.insertResourceGroup(1, "global", "1MB", 1000, 100, 100, "weighted", null, true, "1h", "1d", null, ENVIRONMENT);
dao.insertResourceGroup(2, "sub", "2MB", 4, 3, 3, null, 5, null, null, null, 1L, ENVIRONMENT);
dao.insertSelector(2, 1, null, null, null, null, null);
dao.insertSelector(2, 1, null, null, null, null, null, null);
DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, new DbResourceGroupConfig(), daoProvider.get(), ENVIRONMENT);
AtomicBoolean exported = new AtomicBoolean();
InternalResourceGroup global = new InternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor());
Expand All @@ -138,7 +139,7 @@ public void testDuplicateRoots()
assertTrue(ex.getCause() instanceof org.h2.jdbc.JdbcException);
assertTrue(ex.getCause().getMessage().startsWith("Unique index or primary key violation"));
});
dao.insertSelector(1, 1, null, null, null, null, null);
dao.insertSelector(1, 1, null, null, null, null, null, null);
}

@Test
Expand All @@ -156,7 +157,7 @@ public void testDuplicateGroups()
assertTrue(ex.getCause() instanceof org.h2.jdbc.JdbcException);
assertTrue(ex.getCause().getMessage().startsWith("Unique index or primary key violation"));
});
dao.insertSelector(2, 2, null, null, null, null, null);
dao.insertSelector(2, 2, null, null, null, null, null, null);
}

@Test
Expand All @@ -170,7 +171,7 @@ public void testMissing()
dao.insertResourceGroup(1, "global", "1MB", 1000, 100, 100, "weighted", null, true, "1h", "1d", null, ENVIRONMENT);
dao.insertResourceGroup(2, "sub", "2MB", 4, 3, 3, null, 5, null, null, null, 1L, ENVIRONMENT);
dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h");
dao.insertSelector(2, 1, null, null, null, null, null);
dao.insertSelector(2, 1, null, null, null, null, null, null);
DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, new DbResourceGroupConfig(), daoProvider.get(), ENVIRONMENT);
InternalResourceGroup missing = new InternalResourceGroup("missing", (group, export) -> {}, directExecutor());

Expand All @@ -190,7 +191,7 @@ public void testReconfig()
dao.createSelectorsTable();
dao.insertResourceGroup(1, "global", "1MB", 1000, 100, 100, "weighted", null, true, "1h", "1d", null, ENVIRONMENT);
dao.insertResourceGroup(2, "sub", "2MB", 4, 3, 3, null, 5, null, null, null, 1L, ENVIRONMENT);
dao.insertSelector(2, 1, null, null, null, null, null);
dao.insertSelector(2, 1, null, null, null, null, null, null);
dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h");
DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, new DbResourceGroupConfig(), daoProvider.get(), ENVIRONMENT);
manager.start();
Expand Down Expand Up @@ -228,7 +229,7 @@ public void testExactMatchSelector()
dao.createExactMatchSelectorsTable();
dao.insertResourceGroup(1, "global", "1MB", 1000, 100, 100, "weighted", null, true, "1h", "1d", null, ENVIRONMENT);
dao.insertResourceGroup(2, "sub", "2MB", 4, 3, 3, null, 5, null, null, null, 1L, ENVIRONMENT);
dao.insertSelector(2, 1, null, null, null, null, null);
dao.insertSelector(2, 1, null, null, null, null, null, null);
dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h");
DbResourceGroupConfig config = new DbResourceGroupConfig();
config.setExactMatchSelectorEnabled(true);
Expand Down Expand Up @@ -267,7 +268,7 @@ public void testSelectorPriority()
for (int i = 0; i < numberOfUsers; i++) {
int priority = randomPriorities[i];
String user = String.valueOf(priority);
dao.insertSelector(1, priority, user, ".*", null, null, null);
dao.insertSelector(1, priority, user, null, ".*", null, null, null);
expectedUsers.add(user);
}

Expand Down Expand Up @@ -335,6 +336,53 @@ public void testRefreshInterval()
manager.destroy();
}

@Test
public void testMatchByUserGroups()
{
H2DaoProvider daoProvider = setup("selectors");
H2ResourceGroupsDao dao = daoProvider.get();
dao.createResourceGroupsGlobalPropertiesTable();
dao.createResourceGroupsTable();
dao.createSelectorsTable();
dao.insertResourceGroup(1, "group", "100%", 100, 100, 100, null, null, null, null, null, null, ENVIRONMENT);
dao.insertSelector(1, 1, null, "first matching|second matching", null, null, null, null);

DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager(
(poolId, listener) -> {},
new DbResourceGroupConfig().setMaxRefreshInterval(new io.airlift.units.Duration(1, MILLISECONDS)),
daoProvider.get(),
ENVIRONMENT);

assertThat(manager.match(userGroupsSelectionCriteria("not matching"))).isEmpty();
assertThat(manager.match(userGroupsSelectionCriteria("first matching")))
.map(SelectionContext::getContext)
.isEqualTo(Optional.of(new ResourceGroupIdTemplate("group")));
}

@Test
public void testMatchByUsersAndGroups()
{
H2DaoProvider daoProvider = setup("selectors");
H2ResourceGroupsDao dao = daoProvider.get();
dao.createResourceGroupsGlobalPropertiesTable();
dao.createResourceGroupsTable();
dao.createSelectorsTable();
dao.insertResourceGroup(1, "group", "100%", 100, 100, 100, null, null, null, null, null, null, ENVIRONMENT);
dao.insertSelector(1, 1, "Matching user", "Matching group", null, null, null, null);

DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager(
(poolId, listener) -> {},
new DbResourceGroupConfig().setMaxRefreshInterval(new io.airlift.units.Duration(1, MILLISECONDS)),
daoProvider.get(),
ENVIRONMENT);

assertThat(manager.match(userAndUserGroupsSelectionCriteria("Matching user", "Not matching group"))).isEmpty();
assertThat(manager.match(userAndUserGroupsSelectionCriteria("Not matching user", "Matching group"))).isEmpty();
assertThat(manager.match(userAndUserGroupsSelectionCriteria("Matching user", "Matching group")))
.map(SelectionContext::getContext)
.isEqualTo(Optional.of(new ResourceGroupIdTemplate("group")));
}

private static void assertEqualsResourceGroup(
InternalResourceGroup group,
String softMemoryLimit,
Expand All @@ -357,4 +405,23 @@ private static void assertEqualsResourceGroup(
assertEquals(group.getSoftCpuLimit(), softCpuLimit);
assertEquals(group.getHardCpuLimit(), hardCpuLimit);
}

private static SelectionCriteria userGroupsSelectionCriteria(String... groups)
{
return new SelectionCriteria(true, "test_user", ImmutableSet.copyOf(groups), Optional.empty(), ImmutableSet.of(), EMPTY_RESOURCE_ESTIMATES, Optional.empty());
}

private static SelectionCriteria userAndUserGroupsSelectionCriteria(String user, String group, String... groups)
{
return new SelectionCriteria(
true,
user,
ImmutableSet.<String>builder()
.add(group)
.add(groups).build(),
Optional.empty(),
ImmutableSet.of(),
EMPTY_RESOURCE_ESTIMATES,
Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private static void testSelectorInsert(H2ResourceGroupsDao dao, Map<Long, Select
2L,
1L,
Optional.of(Pattern.compile("ping_user")),
Optional.of(Pattern.compile("ping_user_group")),
Optional.of(Pattern.compile(".*")),
Optional.empty(),
Optional.empty(),
Expand All @@ -135,6 +136,7 @@ private static void testSelectorInsert(H2ResourceGroupsDao dao, Map<Long, Select
3L,
2L,
Optional.of(Pattern.compile("admin_user")),
Optional.of(Pattern.compile("admin_group")),
Optional.of(Pattern.compile(".*")),
Optional.of(EXPLAIN.name()),
Optional.of(ImmutableList.of("tag1", "tag2")),
Expand All @@ -147,16 +149,17 @@ private static void testSelectorInsert(H2ResourceGroupsDao dao, Map<Long, Select
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.of(SELECTOR_RESOURCE_ESTIMATE)));

dao.insertResourceGroup(1, "admin", "100%", 100, 100, 100, null, null, null, null, null, null, ENVIRONMENT);
dao.insertResourceGroup(2, "ping_query", "50%", 50, 50, 50, null, null, null, null, null, 1L, ENVIRONMENT);
dao.insertResourceGroup(3, "config", "50%", 50, 50, 50, null, null, null, null, null, 1L, ENVIRONMENT);
dao.insertResourceGroup(4, "config", "50%", 50, 50, 50, null, null, null, null, null, 1L, ENVIRONMENT);

dao.insertSelector(2, 1, "ping_user", ".*", null, null, null);
dao.insertSelector(3, 2, "admin_user", ".*", EXPLAIN.name(), LIST_STRING_CODEC.toJson(ImmutableList.of("tag1", "tag2")), null);
dao.insertSelector(4, 0, null, null, null, null, SELECTOR_RESOURCE_ESTIMATE_JSON_CODEC.toJson(SELECTOR_RESOURCE_ESTIMATE));
dao.insertSelector(2, 1, "ping_user", null, ".*", null, null, null);
dao.insertSelector(3, 2, "admin_user", null, ".*", EXPLAIN.name(), LIST_STRING_CODEC.toJson(ImmutableList.of("tag1", "tag2")), null);
dao.insertSelector(4, 0, null, null, null, null, null, SELECTOR_RESOURCE_ESTIMATE_JSON_CODEC.toJson(SELECTOR_RESOURCE_ESTIMATE));
List<SelectorRecord> records = dao.getSelectors(ENVIRONMENT);
compareSelectors(map, records);
}
Expand All @@ -168,6 +171,7 @@ private static void testSelectorUpdate(H2ResourceGroupsDao dao, Map<Long, Select
2,
1L,
Optional.of(Pattern.compile("ping.*")),
Optional.empty(),
Optional.of(Pattern.compile("ping_source")),
Optional.empty(),
Optional.of(ImmutableList.of("tag1")),
Expand All @@ -178,14 +182,15 @@ private static void testSelectorUpdate(H2ResourceGroupsDao dao, Map<Long, Select

private static void testSelectorUpdateNull(H2ResourceGroupsDao dao, Map<Long, SelectorRecord> map)
{
SelectorRecord updated = new SelectorRecord(2, 3L, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
SelectorRecord updated = new SelectorRecord(2, 3L, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
map.put(2L, updated);
dao.updateSelector(2, null, null, null, "ping.*", "ping_source", LIST_STRING_CODEC.toJson(ImmutableList.of("tag1")));
compareSelectors(map, dao.getSelectors(ENVIRONMENT));
updated = new SelectorRecord(
2,
2L,
Optional.of(Pattern.compile("ping.*")),
Optional.empty(),
Optional.of(Pattern.compile("ping_source")),
Optional.of(EXPLAIN.name()),
Optional.of(ImmutableList.of("tag1", "tag2")),
Expand All @@ -205,7 +210,7 @@ private static void testSelectorDelete(H2ResourceGroupsDao dao, Map<Long, Select
private static void testSelectorDeleteNull(H2ResourceGroupsDao dao, Map<Long, SelectorRecord> map)
{
dao.updateSelector(3, null, null, null, "admin_user", ".*", LIST_STRING_CODEC.toJson(ImmutableList.of("tag1", "tag2")));
SelectorRecord nullRegexes = new SelectorRecord(3L, 2L, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
SelectorRecord nullRegexes = new SelectorRecord(3L, 2L, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
map.put(3L, nullRegexes);
compareSelectors(map, dao.getSelectors(ENVIRONMENT));
dao.deleteSelector(3, null, null, null);
Expand All @@ -219,11 +224,12 @@ private static void testSelectorMultiDelete(H2ResourceGroupsDao dao, Map<Long, S
return;
}

dao.insertSelector(3, 3L, "user1", "pipeline", null, null, null);
dao.insertSelector(3, 3L, "user1", null, "pipeline", null, null, null);
map.put(3L, new SelectorRecord(
3L,
3L,
Optional.of(Pattern.compile("user1")),
Optional.empty(),
Optional.of(Pattern.compile("pipeline")),
Optional.empty(),
Optional.empty(),
Expand Down
Loading

0 comments on commit 53ad891

Please sign in to comment.