-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INSERT INTO support for MemTable #5520
INSERT INTO support for MemTable #5520
Conversation
I prepare to review this PR tomorrow, thank you @metesynnada |
_state: &SessionState, | ||
_input: &LogicalPlan, | ||
) -> Result<()> { | ||
let msg = "Insertion not implemented for this table".to_owned(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the table has name, its likely better to return it to the user in error message instead of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think TableProvider
has a way to fetch its own name yet 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imho TableProvider
likely should have its own name.... I can investigate the purpose of having name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the name for a particular table provider is currently managed by its containing schema - this allows, for example, the same provider to be registered as different table names.
// Create a physical plan from the logical plan. | ||
let plan = state.create_physical_plan(input).await?; | ||
|
||
// Check that the schema of the plan matches the schema of this table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please elaborate why exactly this check is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would the alternate behavior be? Pad missing columns with nulls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check was done with a defensive approach in mind. I am not sure how the two schemas would be different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think whatever is creating the plan should ensure that the incoming rows match the record batch (as in I prefer this error approach).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @metesynnada -- this is very exciting!
Can you please update the sqllogictest runner https://github.com/apache/arrow-datafusion/blob/bd645279874df8e70f1cd891ecea89577733b788/datafusion/core/tests/sqllogictests/src/engines/datafusion/insert.rs#L34-L88 to use this new API (I think maybe all that is needed is to remove the special case code)
Not only will that reduce code duplication, it will increase the test coverage (with existing tests that use the sqllogictest runner)
I am very excited to see this@
let table_scan = Arc::new( | ||
LogicalPlanBuilder::scan("source", multi_partition_provider, None)? | ||
.build()?, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly I think you can refactor this into a function, it would make the tests easier to foloow;
let table_scan = create_table_scan(multi_partition_provider);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed 👍🏻
@@ -2714,6 +2728,46 @@ mod tests { | |||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn sql_table_insert() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to write this as a sqllogictest https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests (after updating it to use the new codepath)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed 👍🏻
/// Convert a logical plan into a builder with a [DmlStatement] | ||
pub fn insert_into( | ||
input: LogicalPlan, | ||
table_name: impl Into<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have complete knowledge about PR at the moment, but I'll look into it. Nevertheless, it's not a critical inclusion, so it can be readily eliminated if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would just involve changing from:
table_name: impl Into<String>
to:
table_name: impl Into<OwnedTableReference>
and use that directly, if #5343 gets merged first. Since there will be a From<String>
implementation for OwnedTableReference
anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed 👍🏻
// Create a physical plan from the logical plan. | ||
let plan = state.create_physical_plan(input).await?; | ||
|
||
// Check that the schema of the plan matches the schema of this table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would the alternate behavior be? Pad missing columns with nulls?
|
||
// Execute the plan and collect the results into batches. | ||
let mut tasks = vec![]; | ||
for idx in 0..table_partition_count { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will try and run all the partitions in parallel, which is probably fine (though maybe we want to limit it to the target_partitions 🤔 )
I don't think this is important to fix now
I made changes to the code based on the feedback received. While testing the code, I discovered a problem with how a table was being created using a DDL query. The issue was that the table was made with non-nullable columns by default, which contradicts PostgreSQL 15. CREATE TABLE table_without_values(field1 BIGINT, field2 BIGINT); DDL query. It defaults to non-nullable columns while creating a table, which is not the case in PostgreSQL 15.
To fix this issue, I changed the DDL queries to explicitly specify that the columns can contain null values, like this: PS: The code piece that results non-nullable default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me -- thank you @metesynnada
@@ -1,93 +0,0 @@ | |||
// Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
I agree -- I have been hit by this too -- filed #5575 to track |
Benchmark runs are scheduled for baseline = 3df1cb3 and contender = ff55eff. ff55eff is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Improves #5130.
Rationale for this change
"INSERT INTO abc SELECT * FROM xyz"
support forMemTable
.What changes are included in this PR?
insert_into
API for theTableProvider
trait.insert_into
implementation for theMemTable
If we have different partition counts with created plan, we handle it as
Are these changes tested?
Yes
Are there any user-facing changes?
Query is supported.