Skip to content

Commit

Permalink
fix test and refactor to remove duplicate code
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Sep 13, 2019
1 parent 8f28b3c commit 704ec89
Showing 1 changed file with 28 additions and 45 deletions.
73 changes: 28 additions & 45 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,32 +545,9 @@ mod tests {

#[test]
fn parallel_projection() -> Result<()> {
let mut ctx = ExecutionContext::new();

// define schema for data source (csv file)
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
]));

let tmp_dir = TempDir::new("parallel_projection")?;

// generate a partitioned file
let partition_count = 4;
for partition in 0..partition_count {
let filename = format!("partition-{}.csv", partition);
let file_path = tmp_dir.path().join(&filename);
let mut file = File::create(file_path)?;

// generate some data
for i in 0..=10 {
let data = format!("{},{}\n", partition, i);
file.write_all(data.as_bytes())?;
}
}

// register csv file with the execution context
ctx.register_csv("test", tmp_dir.path().to_str().unwrap(), &schema, true);
let mut ctx = create_ctx(&tmp_dir, partition_count)?;

let logical_plan = ctx.create_logical_plan("SELECT c1, c2 FROM test")?;
let logical_plan = ctx.optimize(&logical_plan)?;
Expand All @@ -580,19 +557,42 @@ mod tests {
let results = ctx.collect(physical_plan.as_ref())?;

// there should be one batch per partition
assert_eq!(partition_count, results.len());
assert_eq!(results.len(), partition_count);

// each batch should contain 2 columns and 10 rows
for batch in &results {
assert_eq!(2, batch.num_columns());
assert_eq!(10, batch.num_rows());
assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 10);
}

Ok(())
}

#[test]
fn parallel_selection() -> Result<()> {
let tmp_dir = TempDir::new("parallel_selection")?;
let partition_count = 4;
let mut ctx = create_ctx(&tmp_dir, partition_count)?;

let logical_plan =
ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
let logical_plan = ctx.optimize(&logical_plan)?;

let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;

let results = ctx.collect(physical_plan.as_ref())?;

// there should be one batch per partition
assert_eq!(results.len(), partition_count);

let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(row_count, 20);

Ok(())
}

/// Generate a partitioned CSV file and register it with an execution context
fn create_ctx(tmp_dir: &TempDir, partition_count: usize) -> Result<ExecutionContext> {
let mut ctx = ExecutionContext::new();

// define schema for data source (csv file)
Expand All @@ -601,10 +601,7 @@ mod tests {
Field::new("c2", DataType::UInt64, false),
]));

let tmp_dir = TempDir::new("parallel_selection")?;

// generate a partitioned file
let partition_count = 4;
for partition in 0..partition_count {
let filename = format!("partition-{}.csv", partition);
let file_path = tmp_dir.path().join(&filename);
Expand All @@ -620,20 +617,6 @@ mod tests {
// register csv file with the execution context
ctx.register_csv("test", tmp_dir.path().to_str().unwrap(), &schema, true);

let logical_plan =
ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
let logical_plan = ctx.optimize(&logical_plan)?;

let physical_plan = ctx.create_physical_plan(&logical_plan, 1024)?;

let results = ctx.collect(physical_plan.as_ref())?;

// there should be one batch per partition
assert_eq!(partition_count, results.len());

let row_count: usize = results.iter().map(|batch| batch.num_rows()).sum();
assert_eq!(row_count, 20);

Ok(())
Ok(ctx)
}
}

0 comments on commit 704ec89

Please sign in to comment.