diff --git a/datafusion/src/physical_plan/file_format/ipc.rs b/datafusion/src/physical_plan/file_format/ipc.rs index 3d9497b51e50..1881a8d8ac0c 100644 --- a/datafusion/src/physical_plan/file_format/ipc.rs +++ b/datafusion/src/physical_plan/file_format/ipc.rs @@ -43,7 +43,7 @@ pub async fn plan_to_ipc( let mut tasks = vec![]; for i in 0..plan.output_partitioning().partition_count() { let plan = plan.clone(); - let filename = format!("part-{}.csv", i); + let filename = format!("part-{}.arrow", i); let path = fs_path.join(&filename); let file = fs::File::create(path)?; let stream = plan.execute(i, runtime.clone()).await?; @@ -76,14 +76,8 @@ pub async fn plan_to_ipc( mod tests { use super::*; use crate::prelude::*; - use crate::test_util::aggr_test_schema_with_missing_col; - use crate::{ - datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem}, - scalar::ScalarValue, - test_util::aggr_test_schema, - }; use arrow::datatypes::*; - use futures::StreamExt; + use arrow::ipc::writer::IpcWriteOptions; use std::fs::File; use std::io::Write; use tempfile::TempDir; @@ -138,7 +132,9 @@ mod tests { // execute a simple query and write the results to CSV let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; let df = ctx.sql("SELECT c1, c2 FROM test").await?; - df.write_csv(&out_dir).await?; + let opts = IpcWriteOptions::default(); + + df.write_ipc(&out_dir, opts).await?; // create a new context and verify that the results were saved to a partitioned csv file let mut ctx = ExecutionContext::new(); @@ -150,8 +146,12 @@ mod tests { // register each partition as well as the top level dir let csv_read_option = CsvReadOptions::new().schema(&schema); - ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option) - .await?; + ctx.register_csv( + "part0", + &format!("{}/part-0.arrow", out_dir), + csv_read_option, + ) + .await?; ctx.register_csv("allparts", &out_dir, csv_read_option) .await?;