Skip to content

Commit

Permalink
Update test
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewmturner committed Mar 15, 2022
1 parent c25a1e3 commit c9930e7
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions datafusion/src/physical_plan/file_format/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub async fn plan_to_ipc(
let mut tasks = vec![];
for i in 0..plan.output_partitioning().partition_count() {
let plan = plan.clone();
let filename = format!("part-{}.csv", i);
let filename = format!("part-{}.arrow", i);
let path = fs_path.join(&filename);
let file = fs::File::create(path)?;
let stream = plan.execute(i, runtime.clone()).await?;
Expand Down Expand Up @@ -76,14 +76,8 @@ pub async fn plan_to_ipc(
mod tests {
use super::*;
use crate::prelude::*;
use crate::test_util::aggr_test_schema_with_missing_col;
use crate::{
datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
scalar::ScalarValue,
test_util::aggr_test_schema,
};
use arrow::datatypes::*;
use futures::StreamExt;
use arrow::ipc::writer::IpcWriteOptions;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
Expand Down Expand Up @@ -138,7 +132,9 @@ mod tests {
// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
df.write_csv(&out_dir).await?;
let opts = IpcWriteOptions::default();

df.write_ipc(&out_dir, opts).await?;

// create a new context and verify that the results were saved to a partitioned csv file
let mut ctx = ExecutionContext::new();
Expand All @@ -150,8 +146,12 @@ mod tests {

// register each partition as well as the top level dir
let csv_read_option = CsvReadOptions::new().schema(&schema);
ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option)
.await?;
ctx.register_csv(
"part0",
&format!("{}/part-0.arrow", out_dir),
csv_read_option,
)
.await?;
ctx.register_csv("allparts", &out_dir, csv_read_option)
.await?;

Expand Down

0 comments on commit c9930e7

Please sign in to comment.