Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: merge to use logical plans #1720

Merged
merged 9 commits into from
Nov 19, 2023
Merged

Conversation

Blajda
Copy link
Collaborator

@Blajda Blajda commented Oct 13, 2023

Description

This refactors the merge operation to use DataFusion's DataFrame and LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join operator. This also enables the operation to use multiple threads and should result in significant speed up.
Merge is still limited to using a single thread in some area. When collecting benchmarks, I encountered multiple OoM issues with Datafusion's hash join implementation. There are multiple tickets upstream open regarding this. For now, I've limited the number of partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was required to ensure data types were aligned. Now the logical plan will perform type coercion when optimizing the plan.

Related Issues

@github-actions github-actions bot added binding/rust Issues for the Rust crate rust labels Oct 13, 2023
@Blajda
Copy link
Collaborator Author

Blajda commented Oct 13, 2023

Can be merged after #1639
Also requires a new release of Datafusion that contains the fix for this: apache/datafusion#7790

@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Nov 2, 2023

@Blajda is this ready to be merged now or are you still working on creating some benchmarks? I see that the issue and PR you linked have been merged/resolved now

@Blajda
Copy link
Collaborator Author

Blajda commented Nov 2, 2023

@ion-elgreco It depends on a new version of datafusion being released and then having #1775 updated and merged
I also need to make one small change to prevent predicate pushdowns. The benchmark code is done I'll raise a PR for that soon.

@ion-elgreco
Copy link
Collaborator

@Blajda oh nice! Really curious to see how fast the rust merge is versus the scala merge 👀

@Blajda
Copy link
Collaborator Author

Blajda commented Nov 19, 2023

Below are the benchmark results. These were executed with the standard merge benchmarks compiled with release flags. The TPC-DS with a scale of 1 was used.
These are the specs of my desktop for some context;

CPU: AMD Ryzen 7 5800X 8-Core Processor
Memory: 32gb @ 2133 mhz

Writes were done to a HDD.

+-------------------------------------------------------------------------------------+---------------------+--------------------+------------------------------------------+
| name                                                                                | before_duration_avg | after_duration_avg | before_duration_avg / after_duration_avg |
+-------------------------------------------------------------------------------------+---------------------+--------------------+------------------------------------------+
| delete_only_fileMatchedFraction_0.05_rowMatchedFraction_0.05                        | 16429.0             | 6298.0             | 2.6086059066370275                       |
| multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.05            | 14731.0             | 5436.0             | 2.709896983075791                        |
| multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.50            | 71219.0             | 5260.0             | 13.539733840304182                       |
| multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_1.0             | 116833.0            | 5336.0             | 21.89523988005997                        |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.01_rowNotMatchedFraction_0.1   | 22406.0             | 5003.0             | 4.478512892264641                        |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.0_rowNotMatchedFraction_0.1    | 34476.0             | 4496.0             | 7.668149466192171                        |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.0    | 22723.0             | 4841.0             | 4.693864903945466                        |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.01   | 21223.0             | 4897.0             | 4.333877884419032                        |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.5_rowNotMatchedFraction_0.001  | 68764.0             | 4673.0             | 14.715172266210143                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.99_rowNotMatchedFraction_0.001 | 113265.0            | 4676.0             | 24.22262617621899                        |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_1.0_rowNotMatchedFraction_0.001  | 91681.0             | 4883.0             | 18.77554781896375                        |
| upsert_fileMatchedFraction_0.5_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001 | 12294.0             | 4685.0             | 2.624119530416222                        |
| upsert_fileMatchedFraction_1.0_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001 | 14956.0             | 4868.0             | 3.0723089564502875                       |
+-------------------------------------------------------------------------------------+---------------------+--------------------+------------------------------------------+

@ion-elgreco
Copy link
Collaborator

@Blajda I assume after_duration is after logical plans.

Looks quite a speed bump and also more consistent across the different queries, so something else seems to be a limiting there? Maybe your HDD? 😛

What's the scale btw, milliseconds?

@Blajda
Copy link
Collaborator Author

Blajda commented Nov 19, 2023

Yeah the units are milliseconds.
I can only speculate on the limiting factor. The merge benchmark has a lot of partitions with small files which I think the writer has some challenges with. Another factor is that an entire rewrite of the table is performed still.
I should finally get around to formatting the SSD I have and try to rerun there.

@Blajda Blajda marked this pull request as ready for review November 19, 2023 18:01
@ion-elgreco
Copy link
Collaborator

@Blajda it will also close this one then: #1753

@Blajda
Copy link
Collaborator Author

Blajda commented Nov 19, 2023

Before: Merge logical to HDD
After Merge Logical to SDD

Currently the size of the write is not the limiting factor

| name                                                                                | before_duration_avg | after_duration_avg | before_duration_avg / after_duration_avg |
+-------------------------------------------------------------------------------------+---------------------+--------------------+------------------------------------------+
| delete_only_fileMatchedFraction_0.05_rowMatchedFraction_0.05                        | 6298.0              | 6089.0             | 1.0343241911643948                       |
| multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.05            | 5436.0              | 5175.0             | 1.0504347826086957                       |
| multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.50            | 5260.0              | 5458.0             | 0.9637229754488824                       |
| multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_1.0             | 5336.0              | 5343.0             | 0.9986898746022833                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.01_rowNotMatchedFraction_0.1   | 5003.0              | 5035.0             | 0.9936444885799404                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.0_rowNotMatchedFraction_0.1    | 4496.0              | 4165.0             | 1.079471788715486                        |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.0    | 4841.0              | 4932.0             | 0.9815490673154906                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.01   | 4897.0              | 4610.0             | 1.0622559652928416                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.5_rowNotMatchedFraction_0.001  | 4673.0              | 4843.0             | 0.9648977906256453                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.99_rowNotMatchedFraction_0.001 | 4676.0              | 4481.0             | 1.0435170720821245                       |
| upsert_fileMatchedFraction_0.05_rowMatchedFraction_1.0_rowNotMatchedFraction_0.001  | 4883.0              | 4865.0             | 1.003699897225077                        |
| upsert_fileMatchedFraction_0.5_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001 | 4685.0              | 4393.0             | 1.0664693831094925                       |
| upsert_fileMatchedFraction_1.0_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001 | 4868.0              | 4627.0             | 1.0520855846120596                       |
+-------------------------------------------------------------------------------------+---------------------+--------------------+------------------------------------------+

wjones127
wjones127 previously approved these changes Nov 19, 2023
Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great work. I feel like I'm learning a lot about DataFusion just by reviewing your PRs :)

Comment on lines +1115 to +1116
//TODO: Datafusion's Hashjoin has some memory issues. Running with all cores results in a OoM. Can be removed when upstream improvemetns are made.
let config = SessionConfig::new().with_target_partitions(1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Blajda Why don't you configure a Greedy memory pool in the RuntimeConfig? Right now the SessionContext seems to be configured without a memory limit, so it's no surprise that it can OOM, right?

You can pass that in with SessionContext::new_with_config_rt

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give this a try. Issues on the DF tracker suggest that even with a memory pool configured there are allocations that are made outside of the pool. As a start I think limiting memory to 80% of the user's system would be a good start.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enabled the mem pool and removed the cpu limit. Still leads to low memory system. I think this will require a follow up on how to tune this best.

Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great and really excited to see us moving towards more proper planning!

As this is quite complex to reason about, should we maybe add some test to validate generated plans via their string representation, much like datafusion does?

That would also help me understand how the metrics observers impact pushdown. I guess since we optimize the input plans before we preserve file skipping etc, but would help to see it :).

Comment on lines +1086 to +1103
struct MergePlanner {}

#[async_trait]
impl QueryPlanner for MergePlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners(
vec![Arc::new(MergeMetricExtensionPlanner {})],
)));
planner
.create_physical_plan(logical_plan, session_state)
.await
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually we should consolidate this into a DeltaPlanner, some early experiments are in the deltalake-sql crate, but that is not helpful right now 😆 - maybe add a comment?

roeap
roeap previously approved these changes Nov 19, 2023
@Blajda
Copy link
Collaborator Author

Blajda commented Nov 19, 2023

That would also help me understand how the metrics observers impact pushdown. I guess since we optimize the input plans before we preserve file skipping etc, but would help to see it :).

Merge doesn't support file skipping currently. A new operator that tracks which files have modifications needs to be built and another operator that determine enables push downs depending on which operators are used. (I.e upserts can support push down but anything with a delete would require additional work)

As this is quite complex to reason about, should we maybe add some test to validate generated plans via their string representation, much like datafusion does?

I think this would be possible but it would require additional refactoring. Current the string repr is just spaghetti of a bunch of case statements and parquet read nodes.

@roeap
Copy link
Collaborator

roeap commented Nov 19, 2023

Yes, Sounds like a follow-up 😀

We could use the same heuristics as for the conflict resolution, where we assume all read files are rewritten regardless of them having matches. But again, something to dive in later.

@Blajda Blajda dismissed stale reviews from roeap and wjones127 via 4070b76 November 19, 2023 23:43
@Blajda Blajda enabled auto-merge (squash) November 19, 2023 23:45
@Blajda Blajda merged commit 8a66343 into delta-io:main Nov 19, 2023
21 checks passed
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this pull request Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this pull request Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this pull request Nov 20, 2023
# Description
This refactors the merge operation to use DataFusion's DataFrame and
LogicalPlan APIs

The NLJ is eliminated and the query planner can pick the optimal join
operator. This also enables the operation to use multiple threads and
should result in significant speed up.
Merge is still limited to using a single thread in some area. When
collecting benchmarks, I encountered multiple OoM issues with
Datafusion's hash join implementation. There are multiple tickets
upstream open regarding this. For now, I've limited the number of
partitions to just 1 to prevent this.

Predicates passed as SQL are also easier to use now. Manual casting was
required to ensure data types were aligned. Now the logical plan will
perform type coercion when optimizing the plan.

# Related Issues
- enhances delta-io#850
- closes delta-io#1790 
- closes delta-io#1753
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate crate/core rust
Projects
None yet
Development

Successfully merging this pull request may close these issues.

rust merge error - datafusion panics Writing with large arrow types in MERGE
4 participants