Skip to content

Commit

Permalink
Make number of DynamicFilterService threads configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Sep 30, 2020
1 parent 85205d0 commit e042c5f
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DynamicFilterConfig
{
private boolean enableDynamicFiltering = true;
private boolean enableLargeDynamicFilters;
private int serviceThreadCount = 2;

private int smallBroadcastMaxDistinctValuesPerDriver = 100;
private DataSize smallBroadcastMaxSizePerDriver = DataSize.of(10, KILOBYTE);
Expand Down Expand Up @@ -75,6 +76,19 @@ public DynamicFilterConfig setEnableLargeDynamicFilters(boolean enableLargeDynam
return this;
}

@Min(1)
public int getServiceThreadCount()
{
return serviceThreadCount;
}

@Config("dynamic-filtering.service-thread-count")
public DynamicFilterConfig setServiceThreadCount(int serviceThreadCount)
{
this.serviceThreadCount = serviceThreadCount;
return this;
}

@Min(0)
public int getSmallBroadcastMaxDistinctValuesPerDriver()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import io.prestosql.Session;
import io.prestosql.execution.DynamicFilterConfig;
import io.prestosql.execution.SqlQueryExecution;
import io.prestosql.execution.StageId;
import io.prestosql.execution.TaskId;
Expand Down Expand Up @@ -78,17 +80,18 @@
import static io.prestosql.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
import static io.prestosql.util.MorePredicates.isInstanceOfAny;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newFixedThreadPool;

@ThreadSafe
public class DynamicFilterService
{
private final ExecutorService executor;
private final Map<QueryId, DynamicFilterContext> dynamicFilterContexts = new ConcurrentHashMap<>();

public DynamicFilterService()
@Inject
public DynamicFilterService(DynamicFilterConfig dynamicFilterConfig)
{
this(newSingleThreadExecutor(daemonThreadsNamed("DynamicFilterService")));
this(newFixedThreadPool(dynamicFilterConfig.getServiceThreadCount(), daemonThreadsNamed("DynamicFilterService")));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(DynamicFilterConfig.class)
.setEnableDynamicFiltering(true)
.setEnableLargeDynamicFilters(false)
.setServiceThreadCount(2)
.setSmallBroadcastMaxDistinctValuesPerDriver(100)
.setSmallBroadcastMaxSizePerDriver(DataSize.of(10, KILOBYTE))
.setSmallBroadcastRangeRowLimitPerDriver(0)
Expand All @@ -52,6 +53,7 @@ public void testExplicitPropertyMappings()
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("enable-dynamic-filtering", "false")
.put("enable-large-dynamic-filters", "true")
.put("dynamic-filtering.service-thread-count", "4")
.put("dynamic-filtering.small-broadcast.max-distinct-values-per-driver", "256")
.put("dynamic-filtering.small-broadcast.max-size-per-driver", "64kB")
.put("dynamic-filtering.small-broadcast.range-row-limit-per-driver", "10000")
Expand All @@ -69,6 +71,7 @@ public void testExplicitPropertyMappings()
DynamicFilterConfig expected = new DynamicFilterConfig()
.setEnableDynamicFiltering(false)
.setEnableLargeDynamicFilters(true)
.setServiceThreadCount(4)
.setSmallBroadcastMaxDistinctValuesPerDriver(256)
.setSmallBroadcastMaxSizePerDriver(DataSize.of(64, KILOBYTE))
.setSmallBroadcastRangeRowLimitPerDriver(10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private void testFinalStageInfoInternal()
nodeTaskMap,
executor,
new NoOpFailureDetector(),
new DynamicFilterService(),
new DynamicFilterService(new DynamicFilterConfig()),
new SplitSchedulerStats());
stage.setOutputBuffers(createInitialEmptyOutputBuffers(ARBITRARY));

Expand Down Expand Up @@ -174,7 +174,7 @@ public void testIsAnyTaskBlocked()
nodeTaskMap,
executor,
new NoOpFailureDetector(),
new DynamicFilterService(),
new DynamicFilterService(new DynamicFilterConfig()),
new SplitSchedulerStats());
stage.setOutputBuffers(createInitialEmptyOutputBuffers(ARBITRARY));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.prestosql.client.NodeVersion;
import io.prestosql.connector.CatalogName;
import io.prestosql.cost.StatsAndCosts;
import io.prestosql.execution.DynamicFilterConfig;
import io.prestosql.execution.MockRemoteTaskFactory;
import io.prestosql.execution.MockRemoteTaskFactory.MockRemoteTask;
import io.prestosql.execution.NodeTaskMap;
Expand Down Expand Up @@ -330,7 +331,7 @@ public void testNoNodes()
Iterables.getOnlyElement(plan.getSplitSources().values()),
new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(Optional.of(CONNECTOR_ID)), stage::getAllTasks),
2,
new DynamicFilterService(),
new DynamicFilterService(new DynamicFilterConfig()),
() -> false);
scheduler.schedule();
}).hasErrorCode(NO_NODES_AVAILABLE);
Expand Down Expand Up @@ -404,7 +405,7 @@ public void testNewTaskScheduledWhenChildStageBufferIsUnderutilized()
Iterables.getOnlyElement(plan.getSplitSources().values()),
new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(Optional.of(CONNECTOR_ID)), stage::getAllTasks),
500,
new DynamicFilterService(),
new DynamicFilterService(new DynamicFilterConfig()),
() -> false);

// the queues of 3 running nodes should be full
Expand Down Expand Up @@ -443,7 +444,7 @@ public void testNoNewTaskScheduledWhenChildStageBufferIsOverutilized()
Iterables.getOnlyElement(plan.getSplitSources().values()),
new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(Optional.of(CONNECTOR_ID)), stage::getAllTasks),
400,
new DynamicFilterService(),
new DynamicFilterService(new DynamicFilterConfig()),
() -> true);

// the queues of 3 running nodes should be full
Expand All @@ -467,7 +468,7 @@ public void testDynamicFiltersUnblockedOnBlockedBuildSource()
NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService);
SqlStageExecution stage = createSqlStageExecution(plan, nodeTaskMap);
NodeScheduler nodeScheduler = new NodeScheduler(new UniformNodeSelectorFactory(nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), nodeTaskMap));
DynamicFilterService dynamicFilterService = new DynamicFilterService();
DynamicFilterService dynamicFilterService = new DynamicFilterService(new DynamicFilterConfig());
dynamicFilterService.registerQuery(
QUERY_ID,
ImmutableSet.of(DYNAMIC_FILTER_ID),
Expand Down Expand Up @@ -542,7 +543,7 @@ private static StageScheduler getSourcePartitionedScheduler(
splitSource,
placementPolicy,
splitBatchSize,
new DynamicFilterService(),
new DynamicFilterService(new DynamicFilterConfig()),
() -> false);
}

Expand Down Expand Up @@ -640,7 +641,7 @@ private SqlStageExecution createSqlStageExecution(StageExecutionPlan tableScanPl
nodeTaskMap,
queryExecutor,
new NoOpFailureDetector(),
new DynamicFilterService(),
new DynamicFilterService(new DynamicFilterConfig()),
new SplitSchedulerStats());

stage.setOutputBuffers(createInitialEmptyOutputBuffers(PARTITIONED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.prestosql.block.BlockJsonSerde;
import io.prestosql.client.NodeVersion;
import io.prestosql.connector.CatalogName;
import io.prestosql.execution.DynamicFilterConfig;
import io.prestosql.execution.DynamicFiltersCollector.VersionedDynamicFilterDomains;
import io.prestosql.execution.Lifespan;
import io.prestosql.execution.NodeTaskMap;
Expand Down Expand Up @@ -307,7 +308,7 @@ private RemoteTask createRemoteTask(HttpRemoteTaskFactory httpRemoteTaskFactory)

private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(TestingTaskResource testingTaskResource)
{
return createHttpRemoteTaskFactory(testingTaskResource, new DynamicFilterService());
return createHttpRemoteTaskFactory(testingTaskResource, new DynamicFilterService(new DynamicFilterConfig()));
}

private static HttpRemoteTaskFactory createHttpRemoteTaskFactory(TestingTaskResource testingTaskResource, DynamicFilterService dynamicFilterService)
Expand Down

0 comments on commit e042c5f

Please sign in to comment.