Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Insert cleanups #19147

Merged
merged 5 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ public class TaskManagerConfig
// Set the value of default max writer count to the number of processors * 2 and cap it to 64. We can set this value
// higher because preferred write partitioning is always enabled for local exchange thus partitioned inserts will never
// use this property. Additionally, we have a mechanism to stop scaling if local memory utilization is high.
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32) * 2;
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount() * 2, 64);
private int writerCount = 1;
// Default value of partitioned task writer count should be above 1, otherwise it can create a plan
// with a single gather exchange node on the coordinator due to a single available processor. Whereas,
// on the worker nodes due to more available processors, the default value could be above 1. Therefore,
// it can cause error due to config mismatch during execution. Additionally, cap it to 64 in order to
// avoid small pages produced by local partitioning exchanges.
private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32) * 2;
private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount() * 2), 2), 64);
// Default value of task concurrency should be above 1, otherwise it can create a plan with a single gather
// exchange node on the coordinator due to a single available processor. Whereas, on the worker nodes due to
// more available processors, the default value could be above 1. Therefore, it can cause error due to config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ public static PartitionFunction createPartitionFunction(
IntStream.range(0, bucketCount).toArray());
}

public static int getMaxPartitionWritersBasedOnMemory(Session session)
public static int getMaxWritersBasedOnMemory(Session session)
{
return (int) ceil((double) getQueryMaxMemoryPerNode(session).toBytes() / getMaxMemoryPerPartitionWriter(session).toBytes());
}

public static int getScaleWritersMaxSkewedPartitions(Session session)
{
// Set the value of maxSkewedPartitions to scale to 60% of maximum number of writers possible per node.
return (int) (getMaxPartitionWritersBasedOnMemory(session) * 0.60);
return (int) (getMaxWritersBasedOnMemory(session) * 0.60);
}

public static int getTaskCount(PartitioningScheme partitioningScheme)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@
import static io.trino.operator.join.NestedLoopJoinOperator.NestedLoopJoinOperatorFactory;
import static io.trino.operator.output.SkewedPartitionRebalancer.checkCanScalePartitionsRemotely;
import static io.trino.operator.output.SkewedPartitionRebalancer.createPartitionFunction;
import static io.trino.operator.output.SkewedPartitionRebalancer.getMaxPartitionWritersBasedOnMemory;
import static io.trino.operator.output.SkewedPartitionRebalancer.getMaxWritersBasedOnMemory;
import static io.trino.operator.output.SkewedPartitionRebalancer.getScaleWritersMaxSkewedPartitions;
import static io.trino.operator.output.SkewedPartitionRebalancer.getTaskCount;
import static io.trino.operator.window.pattern.PhysicalValuePointer.CLASSIFIER;
Expand Down Expand Up @@ -585,10 +585,7 @@ public LocalExecutionPlan plan(
int taskCount = getTaskCount(partitioningScheme);
if (checkCanScalePartitionsRemotely(taskContext.getSession(), taskCount, partitioningScheme.getPartitioning().getHandle(), nodePartitioningManager)) {
partitionFunction = createPartitionFunction(taskContext.getSession(), nodePartitioningManager, partitioningScheme, partitionChannelTypes);
// Consider memory while calculating the number of writers. This is to avoid creating too many task buckets.
int partitionedWriterCount = min(
getTaskPartitionedWriterCount(taskContext.getSession()),
previousPowerOfTwo(getMaxPartitionWritersBasedOnMemory(taskContext.getSession())));
int partitionedWriterCount = getPartitionedWriterCountBasedOnMemory(taskContext.getSession());
// Keep the task bucket count to 50% of total local writers
int taskBucketCount = (int) ceil(0.5 * partitionedWriterCount);
skewedPartitionRebalancer = Optional.of(new SkewedPartitionRebalancer(
Expand Down Expand Up @@ -3505,19 +3502,18 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin
return 1;
}

int maxWritersBasedOnMemory = getMaxPartitionWritersBasedOnMemory(session);
if (partitioningScheme.isPresent()) {
// The default value of partitioned writer count is 32 which is high enough to use it
// for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many
// small files since when scaling is disabled only single writer will handle a single partition.
int partitionedWriterCount = getTaskWriterCount(session);
// The default value of partitioned writer count is 2 * number_of_cores (capped to 64) which is high
// enough to use it for cases with or without scaling enabled. Additionally, it doesn't lead
// to too many small files when scaling is disabled because single partition will be written by
// a single writer only.
int partitionedWriterCount = getTaskPartitionedWriterCount(session);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth testing ?

if (isLocalScaledWriterExchange(source)) {
partitionedWriterCount = connectorScalingOptions.perTaskMaxScaledWriterCount()
.map(writerCount -> min(writerCount, getTaskPartitionedWriterCount(session)))
.orElse(getTaskPartitionedWriterCount(session));
}
// Consider memory while calculating writer count.
return min(partitionedWriterCount, previousPowerOfTwo(maxWritersBasedOnMemory));
return getPartitionedWriterCountBasedOnMemory(partitionedWriterCount, session);
}

int unpartitionedWriterCount = getTaskWriterCount(session);
Expand All @@ -3527,7 +3523,7 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin
.orElse(getTaskScaleWritersMaxWriterCount(session));
}
// Consider memory while calculating writer count.
return min(unpartitionedWriterCount, maxWritersBasedOnMemory);
return min(unpartitionedWriterCount, getMaxWritersBasedOnMemory(session));
}

private boolean isSingleGatheringExchange(PlanNode node)
Expand Down Expand Up @@ -4111,6 +4107,16 @@ private OperatorFactory createHashAggregationOperatorFactory(
}
}

private int getPartitionedWriterCountBasedOnMemory(Session session)
{
return getPartitionedWriterCountBasedOnMemory(getTaskPartitionedWriterCount(session), session);
}

private int getPartitionedWriterCountBasedOnMemory(int partitionedWriterCount, Session session)
{
return min(partitionedWriterCount, previousPowerOfTwo(getMaxWritersBasedOnMemory(session)));
}

private static Optional<PartialAggregationController> createPartialAggregationController(Optional<DataSize> maxPartialAggregationMemorySize, AggregationNode.Step step, Session session)
{
return maxPartialAggregationMemorySize.isPresent() && step.isOutputPartial() && isAdaptivePartialAggregationEnabled(session) ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
public class TestTaskManagerConfig
{
private static final int DEFAULT_PROCESSOR_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32);
private static final int DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT = min(getAvailablePhysicalProcessorCount(), 32) * 2;
private static final int DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT = min(getAvailablePhysicalProcessorCount() * 2, 64);
private static final int DEFAULT_PARTITIONED_WRITER_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount() * 2), 2), 64);

@Test
public void testDefaults()
Expand Down Expand Up @@ -66,7 +67,7 @@ public void testDefaults()
.setScaleWritersEnabled(true)
.setScaleWritersMaxWriterCount(DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT)
.setWriterCount(1)
.setPartitionedWriterCount(DEFAULT_PROCESSOR_COUNT * 2)
.setPartitionedWriterCount(DEFAULT_PARTITIONED_WRITER_COUNT)
.setTaskConcurrency(DEFAULT_PROCESSOR_COUNT)
.setHttpResponseThreads(100)
.setHttpTimeoutThreads(3)
Expand All @@ -84,7 +85,8 @@ public void testDefaults()
public void testExplicitPropertyMappings()
{
int processorCount = DEFAULT_PROCESSOR_COUNT == 32 ? 16 : 32;
int maxWriterCount = DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT == 32 ? 16 : 32;
int scaleWritersMaxWriterCount = DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT == 32 ? 16 : 32;
int partitionedWriterCount = DEFAULT_PARTITIONED_WRITER_COUNT == 32 ? 16 : 32;
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("experimental.thread-per-driver-scheduler-enabled", "true")
.put("task.initial-splits-per-node", "1")
Expand All @@ -110,9 +112,9 @@ public void testExplicitPropertyMappings()
.put("driver.max-page-partitioning-buffer-size", "40MB")
.put("driver.page-partitioning-buffer-pool-size", "0")
.put("task.scale-writers.enabled", "false")
.put("task.scale-writers.max-writer-count", Integer.toString(maxWriterCount))
.put("task.scale-writers.max-writer-count", Integer.toString(scaleWritersMaxWriterCount))
.put("task.writer-count", "4")
.put("task.partitioned-writer-count", Integer.toString(processorCount))
.put("task.partitioned-writer-count", Integer.toString(partitionedWriterCount))
.put("task.concurrency", Integer.toString(processorCount))
.put("task.http-response-threads", "4")
.put("task.http-timeout-threads", "10")
Expand Down Expand Up @@ -151,9 +153,9 @@ public void testExplicitPropertyMappings()
.setMaxPagePartitioningBufferSize(DataSize.of(40, Unit.MEGABYTE))
.setPagePartitioningBufferPoolSize(0)
.setScaleWritersEnabled(false)
.setScaleWritersMaxWriterCount(maxWriterCount)
.setScaleWritersMaxWriterCount(scaleWritersMaxWriterCount)
.setWriterCount(4)
.setPartitionedWriterCount(processorCount)
.setPartitionedWriterCount(partitionedWriterCount)
.setTaskConcurrency(processorCount)
.setHttpResponseThreads(4)
.setHttpTimeoutThreads(10)
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/admin/properties-task.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ allocates a certain amount of memory for buffering.

- **Type:** {ref}`prop-type-integer`
- **Restrictions:** Must be a power of two
- **Default value:** The number of physical CPUs of the node, with a minimum value of 2 and a maximum of 32
- **Default value:** The number of physical CPUs of the node, with a minimum value of 2 and a maximum of 64
- **Session property:** `task_partitioned_writer_count`

The number of concurrent writer threads per worker per query when
Expand Down
2 changes: 1 addition & 1 deletion docs/src/main/sphinx/admin/properties-writer-scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ writing.
## `task.scale-writers.max-writer-count`

- **Type:** {ref}`prop-type-integer`
- **Default value:** The number of physical CPUs of the node with a maximum of 32
- **Default value:** The number of physical CPUs of the node with a maximum of 64

Maximum number of concurrent writers per task up to which the task can be scaled
when `task.scale-writers.enabled` is set. Increasing this value may improve the
Expand Down
Loading