Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Aug 25, 2019
1 parent 522494c commit 5a114a8
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 14 deletions.
8 changes: 8 additions & 0 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow::datatypes::*;
use crate::arrow::array::{ArrayRef, BooleanBuilder};
use crate::arrow::record_batch::RecordBatch;
use crate::datasource::csv::CsvFile;
use crate::datasource::parquet::ParquetTable;
use crate::datasource::TableProvider;
use crate::error::{ExecutionError, Result};
use crate::execution::aggregate::AggregateRelation;
Expand Down Expand Up @@ -167,6 +168,13 @@ impl ExecutionContext {
self.register_table(name, Rc::new(CsvFile::new(filename, schema, has_header)));
}

/// Register a Parquet file as a table so that it can be queried from SQL
pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> {
let table = ParquetTable::try_new(&filename)?;
self.register_table(name, Rc::new(table));
Ok(())
}

/// Register a table so that it can be queried from SQL
pub fn register_table(&mut self, name: &str, provider: Rc<dyn TableProvider>) {
self.datasources
Expand Down
4 changes: 1 addition & 3 deletions rust/datafusion/src/execution/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ impl CsvIterator {
projection.clone(),
);

Ok(Self {
reader,
})
Ok(Self { reader })
}
}

Expand Down
6 changes: 5 additions & 1 deletion rust/datafusion/src/optimizer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ fn rewrite_expr(expr: &Expr, schema: &Schema) -> Result<Expr> {
let left_type = left.get_type(schema);
let right_type = right.get_type(schema);
if left_type == right_type {
Ok(expr.clone())
Ok(Expr::BinaryExpr {
left: Arc::new(left),
op: op.clone(),
right: Arc::new(right),
})
} else {
let super_type = utils::get_supertype(&left_type, &right_type)?;
Ok(Expr::BinaryExpr {
Expand Down
17 changes: 7 additions & 10 deletions rust/datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ extern crate datafusion;
use arrow::array::*;
use arrow::datatypes::{DataType, Field, Schema};

use datafusion::datasource::parquet::ParquetTable;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use datafusion::execution::relation::Relation;
Expand Down Expand Up @@ -89,10 +87,7 @@ fn nyc() -> Result<()> {
#[test]
fn parquet_query() {
let mut ctx = ExecutionContext::new();
ctx.register_table(
"alltypes_plain",
load_parquet_table("alltypes_plain.parquet"),
);
register_alltypes_parquet(&mut ctx);
let sql = "SELECT id, string_col FROM alltypes_plain";
let actual = execute(&mut ctx, sql);
let expected = "4\t\"0\"\n5\t\"1\"\n6\t\"0\"\n7\t\"1\"\n2\t\"0\"\n3\t\"1\"\n0\t\"0\"\n1\t\"1\"\n".to_string();
Expand Down Expand Up @@ -407,11 +402,13 @@ fn register_csv(
ctx.register_csv(name, filename, &schema, true);
}

fn load_parquet_table(name: &str) -> Rc<dyn TableProvider> {
fn register_alltypes_parquet(ctx: &mut ExecutionContext) {
let testdata = env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined");
let filename = format!("{}/{}", testdata, name);
let table = ParquetTable::try_new(&filename).unwrap();
Rc::new(table)
ctx.register_parquet(
"alltypes_plain",
&format!("{}/alltypes_plain.parquet", testdata),
)
.unwrap();
}

/// Execute query and return result set as tab delimited string
Expand Down

0 comments on commit 5a114a8

Please sign in to comment.