-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DataFrame] Parallel Load into dataframe #6983
Comments
I made a POC on #6984 which demonstrates the issue is indeed using more cores to do the write. However, the implementation of doing repartitioning is probably not right -- I think the better approach would be to set the target partitions when writing into memory table Perhaps this could be done by creating a https://docs.rs/datafusion/latest/datafusion/logical_expr/logical_plan/struct.DmlStatement.html Marking this as a good first issue as I think the approach will work well and should be able to follow existing patterns, has a reproducer, and was asked for by a customer |
@alamb Hello, I'm new to DataFusion, but can I give this issue a try? |
Thank you @gobraves -- that would be great. Once you have looked around let me know if you have any questions Basically I would suggest first verifying that running the equivalent SQL with create table t;
INSERT INTO t from SELECT * from `data.parquet` is properly parallelized Then look at the plan that comes out INSERT INTO t from SELECT * from `data.parquet` And try to update DataFrame::cache() to use the same |
hi @alamb, I apologize for the delayed response. Based on your tips, I executed the following commands in the CLI and also ran the code you provided to reproduce the issue. I noticed that executing the commands in the CLI was almost 8 times faster than running the code mentioned above, which is consistent with my CPU core count. Here are the commands I executed in the CLI:
In the logical_plan of the explain output, I observed I have one more question: Do we need to create a new DmlStatement to address this issue or improve the existing one?
I'm not entirely clear about this statement, and I believe it might be because I haven't fully grasped the problem described above. |
@gobraves Thank you for trying! I also took a look at this issue (and find it pretty difficult to solve 😨 ), hope the following info might be helpful: let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap();
let _cached = _df.cache().await; After My reproducer:
The 2nd one is parallelized, This reproducer should have the same root cause as the original one, for the original reproducer, adding a filter to let _df = _ctx
.read_parquet(FILENAME, _read_options)
.await
.unwrap()
.filter(col("l_orderkey").gt(lit(0)))
.unwrap();
// Then can be parallelized |
This POC and adding predicate seem both suppress the physical optimizer bug in repartition rule by adding another execution node on top of |
@2010YOUY01 Thank you!
use chrono;
use datafusion::common::DataFusionError;
use datafusion::prelude::*;
use object_store::local::LocalFileSystem;
use std::{sync::Arc, time::Instant};
use url::Url;
const FILENAME: &str =
"/home/neo/project_learning/arrow-datafusion/benchmarks/data/tpch_sf10/lineitem/part-0.parquet";
#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
let _ctx = SessionContext::new();
let config = _ctx.copied_config();
for item in config.options().entries().iter() {
let key = &item.key;
let value = &item.value;
println!("{key} {value:?}")
}
let local = Arc::new(LocalFileSystem::new());
let local_url = Url::parse("file://local").unwrap();
_ctx.runtime_env().register_object_store(&local_url, local);
let _read_options = ParquetReadOptions {
file_extension: ".parquet",
table_partition_cols: vec![],
parquet_pruning: None,
skip_metadata: None,
};
let _df = _ctx
.read_parquet(FILENAME, _read_options)
.await
.unwrap();
let start = Instant::now();
let _cached = _df.cache().await;
let elapsed = Instant::now() - start;
println!(
"datafusion end -> {:?} {elapsed:?}",
chrono::offset::Local::now()
);
Ok(())
} without filter: 114.913562535s If the code is modified with filter let _df = _ctx
.read_parquet(FILENAME, _read_options)
.await
.unwrap()
.filter(col("l_linenumber").gt(lit(0)))
.unwrap(); with filter: 15.583268924s
I will need to continue examining the code to understand the specific reason behind this performance difference. |
I think there is something in the physical planning that assumes the result of the final plan should be in a single partition (or at least it won't expand it when adding additional partioning). because when connecting to a client this is what makes the most sense I believe this is controlled by
So the reason the filter case goes faster is that the filter is that the filter will return I wonder if we could somehow add a flag to Alternately, I was thinking the `ExecutionPlan that does the writing could say "I want the input partitioned" and the optimizer would do the right thing. But given the DataFrame API doesn't use an ExecutionPlan for writing it might not work. Thank you both for pushing on this -- it is going to be awesome to get this working correctly |
BTW @devinjdangelo has been looking at using ExecutionPlan for dataframes here: #7141 |
This might well be done, I think all that remains is for someone to test / verify that the reproducer now runs in parallel |
@alamb Edit:... I did some debugging on this issue: When running the query without a filter, we get a plan When running the query with a filter, we get a plan Possible Solution: |
Thank you for the follow up @marvinlanhenke
I think this is likely a great thing to try. @devinjdangelo perhaps you have some more input or ideas to try |
I ran the reproducer #6983 (comment) and didn't see this issue.
@alamb this looks ok to me (unless I've missed something). |
I agree -- thank you for checking @pmcgleenon . Let's close this issue and we can open new issues for future improvements if warranted |
Is your feature request related to a problem or challenge?
When loading data into a DataFusion via SessionContext::read_parquet, DataFrame , only a single core is used even when there are many cores available.
This leads to slower performance, as reported by @mispp on #6908
Reproducer
Create data using
cd datafusion/benchmarks ./bench.sh data tpch10
Then lad the
Cargo.toml
Describe the solution you'd like
I would like datafusion to read the parquet file in parallel, using target_partitions config parameter
https://docs.rs/datafusion/latest/datafusion/config/struct.ExecutionOptions.html#structfield.target_partitions
Describe alternatives you've considered
No response
Additional context
No response
The text was updated successfully, but these errors were encountered: