-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve bucketed table write parallelism for Presto on Spark #15934
Improve bucketed table write parallelism for Presto on Spark #15934
Conversation
@@ -598,17 +612,50 @@ else if (redistributeWrites) { | |||
!source.getProperties().isCompatibleTablePartitioningWith(shufflePartitioningScheme.get().getPartitioning(), false, metadata, session) && | |||
!(source.getProperties().isRefinedPartitioningOver(shufflePartitioningScheme.get().getPartitioning(), false, metadata, session) && | |||
canPushdownPartialMerge(source.getNode(), partialMergePushdownStrategy))) { | |||
PartitioningScheme exchangePartitioningScheme = shufflePartitioningScheme.get(); | |||
if (node.getTablePartitioningScheme().isPresent() && isPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled(session)) { | |||
int writerThreadsPerNode = getTaskPartitionedWriterCount(session); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Presto on Spark we always create 1 task per bucket. It is generally fine, as we can further partition data using local partitioning. However when writing to a bucketed table the data cannot be further partitioned, as all the data for a single bucket has to be written to a single file. Thus for the fragment that only contains a partitioned table writer operator we have to make sure that we assign as least as many buckets per task as there are threads available (getTaskPartitionedWriterCount
).
What makes this PR special - is that the bucketToPartition
is being assigned during the planning phase when usually in all other places it is assigned during the scheduling phase. This creates a precedent. Now the plan contains information about the physical partitions assignment, such as number of partitions and the mapping to buckets, before it get's to the scheduler (or RDD translator in case of Presto on Spark).
Another approach would be to try to "reverse engineer" the plan during the scheduling phase (which in case of Presto on Spark is a translation to RDD). By "reverse engineer" I mean traversing the distributed plan and trying to deduce what fragment is a "partitioned table writer only", and then assign the bucketToPartition
accordingly.
I don't really like either of two solutions. I decided to go with the current approach as it is less likely to break. On the other side it breaks some assumptions that we currently have.
I would really love to hear your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I fully understand the problem we are trying to solve so might ask some dumb questions. Is it fair to say that because we need to "assign as least as many buckets per task as there are threads available", we would need to assign bucketToPartition
at planning phase so that we could do addExchanges
accordingly?
Why do we need to assign at least as many buckets per task as there are threads available? My impression is that when creating writer, each writer would know the bucket they are trying to write, so ideally even if we have more threads than bucket, there should only be idle threads. What am I missing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it fair to say that because we need to "assign as least as many buckets per task as there are threads available", we would need to assign bucketToPartition at planning phase so that we could do addExchanges accordingly?
Yeah, this is exactly what is happening.
Why do we need to assign at least as many buckets per task as there are threads available? My impression is that when creating writer, each writer would know the bucket they are trying to write, so ideally even if we have more threads than bucket, there should only be idle threads. What am I missing here?
It is usually not a problem in classic Presto, as it is multi-tenant. If one query doesn't use available CPU's - other queries will. Resource allocation in Spark works differently. In Spark we are being allocated a fixed number of CPU's. If we are not utilizing them - we are effectively wasting them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The solution makes sense to me. It is an improvement compared with previous bucketToPartition
allocation.
int bucket = 0; | ||
int partition = 0; | ||
while (bucket < bucketCount) { | ||
for (int i = 0; i < writerThreadsPerNode && bucket < bucketCount; i++) { | ||
bucketToPartition[bucket] = partition; | ||
bucket++; | ||
} | ||
partition++; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bucketToPartition[bucket] = bucket / writerThreadsPerNode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will create a skew. Buckets assigned must be non co-divisible.
Let me give you an example. Let's say we have 4 buckets and 2 writer threads. That means that we will end up with 2 partitions. Let's assume the buckets are assigned using the bucketToPartition[bucket] = bucket / writerThreadsPerNode
. Partition 0
will get Bucket 0
and Bucket 2
, Partition 1
will get Bucket 1
and Bucket 3
. Then locally we also take modulo when assigning buckets to threads. Bucket 0 % 2 = Thread 0
, Bucket 2 % 2 = Thread 0
. So we will end up with a single thread writing to 2 buckets. What we want to achieve is each thread has a single bucket to write. So we want to assingn Bucket 0
and Bucket 1
to Partition 0
, and Bucket 2
and Bucket 3
to Partition 1
, so further when assigning buckets to threads each Thread has a bucket to write.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check again, as it is range assigned not modular based distribution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. I'm being slow. bucketToPartition[bucket] = bucket % writerThreadsPerNode
doesn't work, bucketToPartition[bucket] = bucket / writerThreadsPerNode
does exactly what I'm doing with extra cycles. Let me fix that.
presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java
Show resolved
Hide resolved
Move partitioning assignment to PrestoSparkQueryExecutionFactory This will allow to simply follow the number of partitions set in the bucketToPartition when creating a spark partitioner instead of running the logic of assigning numbers of partitions twice
When writing to a partitioned (bucketed) table ensure that each writer node has enough buckets to write to efficiently utilize all available concurrent threads
f8f78bb
to
68689dc
Compare
Make sure each partitioned (bucketed) table writer task has assigned
task_partitioned_writer_count
number of buckets to keep all available writer threads busy. Currently there's only 1 bucket per task being assigned what results in threads starvation.