From e042c5f6928f5724db8eac05833394e13f560478 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Tue, 29 Sep 2020 13:02:03 +0200 Subject: [PATCH] Make number of DynamicFilterService threads configurable --- .../prestosql/execution/DynamicFilterConfig.java | 14 ++++++++++++++ .../io/prestosql/server/DynamicFilterService.java | 9 ++++++--- .../execution/TestDynamicFilterConfig.java | 3 +++ .../prestosql/execution/TestSqlStageExecution.java | 4 ++-- .../scheduler/TestSourcePartitionedScheduler.java | 13 +++++++------ .../server/remotetask/TestHttpRemoteTask.java | 3 ++- 6 files changed, 34 insertions(+), 12 deletions(-) diff --git a/presto-main/src/main/java/io/prestosql/execution/DynamicFilterConfig.java b/presto-main/src/main/java/io/prestosql/execution/DynamicFilterConfig.java index 2d29b8d43d54f..1e412959034d4 100644 --- a/presto-main/src/main/java/io/prestosql/execution/DynamicFilterConfig.java +++ b/presto-main/src/main/java/io/prestosql/execution/DynamicFilterConfig.java @@ -35,6 +35,7 @@ public class DynamicFilterConfig { private boolean enableDynamicFiltering = true; private boolean enableLargeDynamicFilters; + private int serviceThreadCount = 2; private int smallBroadcastMaxDistinctValuesPerDriver = 100; private DataSize smallBroadcastMaxSizePerDriver = DataSize.of(10, KILOBYTE); @@ -75,6 +76,19 @@ public DynamicFilterConfig setEnableLargeDynamicFilters(boolean enableLargeDynam return this; } + @Min(1) + public int getServiceThreadCount() + { + return serviceThreadCount; + } + + @Config("dynamic-filtering.service-thread-count") + public DynamicFilterConfig setServiceThreadCount(int serviceThreadCount) + { + this.serviceThreadCount = serviceThreadCount; + return this; + } + @Min(0) public int getSmallBroadcastMaxDistinctValuesPerDriver() { diff --git a/presto-main/src/main/java/io/prestosql/server/DynamicFilterService.java b/presto-main/src/main/java/io/prestosql/server/DynamicFilterService.java index 11bae54625278..fdb7218c572bf 100644 --- a/presto-main/src/main/java/io/prestosql/server/DynamicFilterService.java +++ b/presto-main/src/main/java/io/prestosql/server/DynamicFilterService.java @@ -21,7 +21,9 @@ import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import com.google.inject.Inject; import io.prestosql.Session; +import io.prestosql.execution.DynamicFilterConfig; import io.prestosql.execution.SqlQueryExecution; import io.prestosql.execution.StageId; import io.prestosql.execution.TaskId; @@ -78,7 +80,7 @@ import static io.prestosql.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; import static io.prestosql.util.MorePredicates.isInstanceOfAny; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.Executors.newFixedThreadPool; @ThreadSafe public class DynamicFilterService @@ -86,9 +88,10 @@ public class DynamicFilterService private final ExecutorService executor; private final Map dynamicFilterContexts = new ConcurrentHashMap<>(); - public DynamicFilterService() + @Inject + public DynamicFilterService(DynamicFilterConfig dynamicFilterConfig) { - this(newSingleThreadExecutor(daemonThreadsNamed("DynamicFilterService"))); + this(newFixedThreadPool(dynamicFilterConfig.getServiceThreadCount(), daemonThreadsNamed("DynamicFilterService"))); } @VisibleForTesting diff --git a/presto-main/src/test/java/io/prestosql/execution/TestDynamicFilterConfig.java b/presto-main/src/test/java/io/prestosql/execution/TestDynamicFilterConfig.java index 0b279a6dff668..66cdef16de173 100644 --- a/presto-main/src/test/java/io/prestosql/execution/TestDynamicFilterConfig.java +++ b/presto-main/src/test/java/io/prestosql/execution/TestDynamicFilterConfig.java @@ -32,6 +32,7 @@ public void testDefaults() assertRecordedDefaults(recordDefaults(DynamicFilterConfig.class) .setEnableDynamicFiltering(true) .setEnableLargeDynamicFilters(false) + .setServiceThreadCount(2) .setSmallBroadcastMaxDistinctValuesPerDriver(100) .setSmallBroadcastMaxSizePerDriver(DataSize.of(10, KILOBYTE)) .setSmallBroadcastRangeRowLimitPerDriver(0) @@ -52,6 +53,7 @@ public void testExplicitPropertyMappings() Map properties = new ImmutableMap.Builder() .put("enable-dynamic-filtering", "false") .put("enable-large-dynamic-filters", "true") + .put("dynamic-filtering.service-thread-count", "4") .put("dynamic-filtering.small-broadcast.max-distinct-values-per-driver", "256") .put("dynamic-filtering.small-broadcast.max-size-per-driver", "64kB") .put("dynamic-filtering.small-broadcast.range-row-limit-per-driver", "10000") @@ -69,6 +71,7 @@ public void testExplicitPropertyMappings() DynamicFilterConfig expected = new DynamicFilterConfig() .setEnableDynamicFiltering(false) .setEnableLargeDynamicFilters(true) + .setServiceThreadCount(4) .setSmallBroadcastMaxDistinctValuesPerDriver(256) .setSmallBroadcastMaxSizePerDriver(DataSize.of(64, KILOBYTE)) .setSmallBroadcastRangeRowLimitPerDriver(10000) diff --git a/presto-main/src/test/java/io/prestosql/execution/TestSqlStageExecution.java b/presto-main/src/test/java/io/prestosql/execution/TestSqlStageExecution.java index 135524eb2f2ca..b3eda32ae72c2 100644 --- a/presto-main/src/test/java/io/prestosql/execution/TestSqlStageExecution.java +++ b/presto-main/src/test/java/io/prestosql/execution/TestSqlStageExecution.java @@ -111,7 +111,7 @@ private void testFinalStageInfoInternal() nodeTaskMap, executor, new NoOpFailureDetector(), - new DynamicFilterService(), + new DynamicFilterService(new DynamicFilterConfig()), new SplitSchedulerStats()); stage.setOutputBuffers(createInitialEmptyOutputBuffers(ARBITRARY)); @@ -174,7 +174,7 @@ public void testIsAnyTaskBlocked() nodeTaskMap, executor, new NoOpFailureDetector(), - new DynamicFilterService(), + new DynamicFilterService(new DynamicFilterConfig()), new SplitSchedulerStats()); stage.setOutputBuffers(createInitialEmptyOutputBuffers(ARBITRARY)); diff --git a/presto-main/src/test/java/io/prestosql/execution/scheduler/TestSourcePartitionedScheduler.java b/presto-main/src/test/java/io/prestosql/execution/scheduler/TestSourcePartitionedScheduler.java index 176befac5304a..ae621ce936450 100644 --- a/presto-main/src/test/java/io/prestosql/execution/scheduler/TestSourcePartitionedScheduler.java +++ b/presto-main/src/test/java/io/prestosql/execution/scheduler/TestSourcePartitionedScheduler.java @@ -21,6 +21,7 @@ import io.prestosql.client.NodeVersion; import io.prestosql.connector.CatalogName; import io.prestosql.cost.StatsAndCosts; +import io.prestosql.execution.DynamicFilterConfig; import io.prestosql.execution.MockRemoteTaskFactory; import io.prestosql.execution.MockRemoteTaskFactory.MockRemoteTask; import io.prestosql.execution.NodeTaskMap; @@ -330,7 +331,7 @@ public void testNoNodes() Iterables.getOnlyElement(plan.getSplitSources().values()), new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(Optional.of(CONNECTOR_ID)), stage::getAllTasks), 2, - new DynamicFilterService(), + new DynamicFilterService(new DynamicFilterConfig()), () -> false); scheduler.schedule(); }).hasErrorCode(NO_NODES_AVAILABLE); @@ -404,7 +405,7 @@ public void testNewTaskScheduledWhenChildStageBufferIsUnderutilized() Iterables.getOnlyElement(plan.getSplitSources().values()), new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(Optional.of(CONNECTOR_ID)), stage::getAllTasks), 500, - new DynamicFilterService(), + new DynamicFilterService(new DynamicFilterConfig()), () -> false); // the queues of 3 running nodes should be full @@ -443,7 +444,7 @@ public void testNoNewTaskScheduledWhenChildStageBufferIsOverutilized() Iterables.getOnlyElement(plan.getSplitSources().values()), new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(Optional.of(CONNECTOR_ID)), stage::getAllTasks), 400, - new DynamicFilterService(), + new DynamicFilterService(new DynamicFilterConfig()), () -> true); // the queues of 3 running nodes should be full @@ -467,7 +468,7 @@ public void testDynamicFiltersUnblockedOnBlockedBuildSource() NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); SqlStageExecution stage = createSqlStageExecution(plan, nodeTaskMap); NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap)); - DynamicFilterService dynamicFilterService = new DynamicFilterService(); + DynamicFilterService dynamicFilterService = new DynamicFilterService(new DynamicFilterConfig()); dynamicFilterService.registerQuery( QUERY_ID, ImmutableSet.of(DYNAMIC_FILTER_ID), @@ -542,7 +543,7 @@ private static StageScheduler getSourcePartitionedScheduler( splitSource, placementPolicy, splitBatchSize, - new DynamicFilterService(), + new DynamicFilterService(new DynamicFilterConfig()), () -> false); } @@ -640,7 +641,7 @@ private SqlStageExecution createSqlStageExecution(StageExecutionPlan tableScanPl nodeTaskMap, queryExecutor, new NoOpFailureDetector(), - new DynamicFilterService(), + new DynamicFilterService(new DynamicFilterConfig()), new SplitSchedulerStats()); stage.setOutputBuffers(createInitialEmptyOutputBuffers(PARTITIONED) diff --git a/presto-main/src/test/java/io/prestosql/server/remotetask/TestHttpRemoteTask.java b/presto-main/src/test/java/io/prestosql/server/remotetask/TestHttpRemoteTask.java index 999eb3e2450fd..dea11620597d0 100644 --- a/presto-main/src/test/java/io/prestosql/server/remotetask/TestHttpRemoteTask.java +++ b/presto-main/src/test/java/io/prestosql/server/remotetask/TestHttpRemoteTask.java @@ -31,6 +31,7 @@ import io.prestosql.block.BlockJsonSerde; import io.prestosql.client.NodeVersion; import io.prestosql.connector.CatalogName; +import io.prestosql.execution.DynamicFilterConfig; import io.prestosql.execution.DynamicFiltersCollector.VersionedDynamicFilterDomains; import io.prestosql.execution.Lifespan; import io.prestosql.execution.NodeTaskMap; @@ -307,7 +308,7 @@ private RemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory) private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(TestingTaskResource testingTaskResource) { - return createHttpRemoteTaskFactory(testingTaskResource, new DynamicFilterService()); + return createHttpRemoteTaskFactory(testingTaskResource, new DynamicFilterService(new DynamicFilterConfig())); } private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(TestingTaskResource testingTaskResource, DynamicFilterService dynamicFilterService)