From 20216bc6107911f18b6c393a1092240b5c036e34 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 7 Sep 2019 12:14:18 -0600 Subject: [PATCH] realistic test (failing) --- rust/datafusion/Cargo.toml | 2 +- rust/datafusion/src/execution/context.rs | 53 +++++++++++++----------- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index 7db5909d013ef..8fff9b3d09674 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -54,7 +54,7 @@ crossbeam = "0.7.1" [dev-dependencies] criterion = "0.2.0" - +tempdir = "0.3.7" [[bench]] name = "aggregate_query_sql" diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index f77f4fd33056a..fc1ec1a4057b8 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -510,6 +510,9 @@ impl SchemaProvider for ExecutionContextSchemaProvider { mod tests { use super::*; + use std::fs::File; + use std::io::prelude::*; + use tempdir::TempDir; #[test] fn parallel_projection() -> Result<()> { @@ -517,41 +520,43 @@ mod tests { // define schema for data source (csv file) let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, false), + Field::new("c1", DataType::UInt32, false), Field::new("c2", DataType::UInt32, false), - Field::new("c3", DataType::Int8, false), - Field::new("c4", DataType::Int16, false), - Field::new("c5", DataType::Int32, false), - Field::new("c6", DataType::Int64, false), - Field::new("c7", DataType::UInt8, false), - Field::new("c8", DataType::UInt16, false), - Field::new("c9", DataType::UInt32, false), - Field::new("c10", DataType::UInt64, false), - Field::new("c11", DataType::Float32, false), - Field::new("c12", DataType::Float64, false), - Field::new("c13", DataType::Utf8, false), ])); - let testdata = - ::std::env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let tmp_dir = TempDir::new("parallel_projection")?; + + // generate a partitioned file + let partition_count = 4; + for partition in 0..partition_count { + let filename = format!("partition-{}.csv", partition); + let file_path = tmp_dir.path().join(&filename); + let mut file = File::create(file_path)?; + + // generate some data + for i in 0..10 { + let data = format!("{},{}\n", partition, i); + file.write_all(data.as_bytes())?; + } + } // register csv file with the execution context - ctx.register_csv( - "aggregate_test_100", - &format!("{}/csv/aggregate_test_100.csv", testdata), - &schema, - true, - ); + ctx.register_csv("test", tmp_dir.path().to_str().unwrap(), &schema, true); - let logical_plan = - ctx.create_logical_plan("SELECT c1, c13 FROM aggregate_test_100")?; + let logical_plan = ctx.create_logical_plan("SELECT c1, c2 FROM test")?; let physical_plan = ctx.create_physical_plan(&logical_plan)?; let results = ctx.collect(physical_plan.as_ref())?; - assert_eq!(1, results.len()); - assert_eq!(2, results[0].num_columns()); + // there should be one batch per partition + assert_eq!(partition_count, results.len()); + + // each batch should contain 2 columns and 10 rows + for batch in &results { + assert_eq!(2, batch.num_columns()); + assert_eq!(10, batch.num_rows()); + } Ok(()) }