Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Aug 6, 2019
1 parent 6db609f commit 8f11c81
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 28 deletions.
80 changes: 55 additions & 25 deletions rust/datafusion/src/execution/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,14 @@ use arrow::record_batch::RecordBatch;
pub struct CsvExec {
/// Path to directory containing partitioned CSV files with the same schema
path: String,
/// Schema representing the CSV files
/// Schema representing the CSV files after the optional projection is applied
schema: Arc<Schema>,
/// Does the CSV file have a header?
has_header: bool,
/// Optional projection for which columns to load
projection: Option<Vec<usize>>,
/// Batch size
batch_size: usize,
}

impl ExecutionPlan for CsvExec {
Expand All @@ -48,8 +54,13 @@ impl ExecutionPlan for CsvExec {
let partitions = filenames
.iter()
.map(|filename| {
Arc::new(CsvPartition::new(&filename, self.schema.clone()))
as Arc<Partition>
Arc::new(CsvPartition::new(
&filename,
self.schema.clone(),
self.has_header,
self.projection.clone(),
self.batch_size,
)) as Arc<Partition>
})
.collect();
Ok(partitions)
Expand All @@ -58,10 +69,29 @@ impl ExecutionPlan for CsvExec {

impl CsvExec {
/// Create a new execution plan for reading a set of CSV files
pub fn try_new(path: &str, schema: Arc<Schema>) -> Result<Self> {
pub fn try_new(
path: &str,
schema: Arc<Schema>,
has_header: bool,
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
let projected_schema = match &projection {
Some(p) => {
let projected_fields: Vec<Field> =
p.iter().map(|i| schema.fields()[*i].clone()).collect();

Arc::new(Schema::new(projected_fields))
}
None => schema,
};

Ok(Self {
path: path.to_string(),
schema: schema.clone(),
schema: projected_schema,
has_header,
projection,
batch_size,
})
}

Expand Down Expand Up @@ -89,13 +119,28 @@ struct CsvPartition {
path: String,
/// Schema representing the CSV file
schema: Arc<Schema>,
/// Does the CSV file have a header?
has_header: bool,
/// Optional projection for which columns to load
projection: Option<Vec<usize>>,
/// Batch size
batch_size: usize,
}

impl CsvPartition {
fn new(path: &str, schema: Arc<Schema>) -> Self {
fn new(
path: &str,
schema: Arc<Schema>,
has_header: bool,
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Self {
Self {
path: path.to_string(),
schema,
has_header,
projection,
batch_size,
}
}
}
Expand All @@ -106,17 +151,15 @@ impl Partition for CsvPartition {
Ok(Arc::new(Mutex::new(CsvIterator::try_new(
&self.path,
self.schema.clone(),
true, //TODO: do not hard-code
&None, //TODO: do not hard-code
1024, //TODO: do not hard-code
self.has_header,
&self.projection,
self.batch_size,
)?)))
}
}

/// Iterator over batches
struct CsvIterator {
/// Schema for the batches produced by this iterator
schema: Arc<Schema>,
/// Arrow CSV reader
reader: csv::Reader<File>,
}
Expand All @@ -139,20 +182,7 @@ impl CsvIterator {
projection.clone(),
);

let projected_schema = match projection {
Some(p) => {
let projected_fields: Vec<Field> =
p.iter().map(|i| schema.fields()[*i].clone()).collect();

Arc::new(Schema::new(projected_fields))
}
None => schema,
};

Ok(Self {
schema: projected_schema,
reader,
})
Ok(Self { reader })
}
}

Expand Down
2 changes: 1 addition & 1 deletion rust/datafusion/src/execution/physical_plan/expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct Column {
}

impl Column {
// Create a new column expression
/// Create a new column expression
pub fn new(index: usize) -> Self {
Self { index }
}
Expand Down
4 changes: 2 additions & 2 deletions rust/datafusion/src/execution/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct ProjectionExec {

impl ProjectionExec {
/// Create a projection on an input
fn try_new(
pub fn try_new(
expr: Vec<Arc<dyn PhysicalExpr>>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
Expand Down Expand Up @@ -164,7 +164,7 @@ mod tests {
let partitions = 4;
let path = create_partitioned_csv("aggregate_test_100.csv", partitions)?;

let csv = CsvExec::try_new(&path, schema)?;
let csv = CsvExec::try_new(&path, schema, true, None, 1024)?;

let projection =
ProjectionExec::try_new(vec![Arc::new(Column::new(0))], Arc::new(csv))?;
Expand Down

0 comments on commit 8f11c81

Please sign in to comment.