Skip to content

Commit

Permalink
Reorganize table providers by table format (apache#1010)
Browse files Browse the repository at this point in the history
* [feat] stubs for provider re-organization

* [feat] implement infer_schema to make test pass

* [wip] trying to implement pruned_partition_list

* [typo]

* [fix] replace enum with trait for extensibility

* [fix] add partition cols to infered schema

* [feat] forked file format executors

avro still missing

* [doc] comments about why we are flattening

* [test] migrated tests to file formats

* [test] improve listing test

* [feat] add avro to refactored format providers

* [fix] remove try from new when unnecessary

* [fix] remove try_ from ListingTable new

* [refacto] renamed format module to file_format

also removed statistics from the PartitionedFile abstraction

* [fix] removed Ballista stubs

* [fix] rename create_executor

* [feat] added store

* [fix] Clippy

* [test] improve file_format tests with limit

* [fix] limit file system read size

* [fix] avoid fetching unnecessary stats after limit

* [fix] improve readability

* [doc] improve comments

* [refacto] keep async reader stub

* [doc] cleanup comments

* [test] test file listing

* [fix] add last_modified back

* [refacto] simplify csv reader exec

* [refacto] change SizedFile back to FileMeta

* [doc] comment clarification

* [fix] avoid keeping object store as field

* [refacto] grouped params to avoid too_many_arguments

* [fix] get_by_uri also returns path

* [fix] ListingTable at store level instead of registry

* [fix] builder take self and not ref to self

* Replace file format providers (#2)

* [fix] replace file format providers in datafusion

* [lint] clippy

* [fix] replace file format providers in ballista

* [fix] await in python wrapper

* [doc] clearer doc about why sql() is async

* [doc] typos and clarity

* [fix] missing await after rebase
  • Loading branch information
rdettai authored Oct 13, 2021
1 parent d2d47d3 commit 2454e46
Show file tree
Hide file tree
Showing 82 changed files with 5,116 additions and 4,462 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ use datafusion::arrow::record_batch::RecordBatch;
async fn main() -> datafusion::error::Result<()> {
// register the table
let mut ctx = ExecutionContext::new();
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())?;
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?;

// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;

// execute and print results
df.show().await?;
Expand All @@ -98,7 +98,7 @@ use datafusion::arrow::record_batch::RecordBatch;
async fn main() -> datafusion::error::Result<()> {
// create the dataframe
let mut ctx = ExecutionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;

let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?;
Expand Down
3 changes: 2 additions & 1 deletion ballista-examples/src/bin/ballista-dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ async fn main() -> Result<()> {

// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename)?
.read_parquet(filename)
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;

Expand Down
11 changes: 7 additions & 4 deletions ballista-examples/src/bin/ballista-sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ async fn main() -> Result<()> {
"aggregate_test_100",
&format!("{}/csv/aggregate_test_100.csv", testdata),
CsvReadOptions::new(),
)?;
)
.await?;

// execute the query
let df = ctx.sql(
"SELECT c1, MIN(c12), MAX(c12) \
let df = ctx
.sql(
"SELECT c1, MIN(c12), MAX(c12) \
FROM aggregate_test_100 \
WHERE c11 > 0.1 AND c11 < 0.9 \
GROUP BY c1",
)?;
)
.await?;

// print the results
df.show().await?;
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ async fn main() -> Result<()> {
"tripdata",
"/path/to/yellow_tripdata_2020-01.csv",
CsvReadOptions::new(),
)?;
).await?;
// execute the query
let df = ctx.sql(
"SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
FROM tripdata
GROUP BY passenger_count
ORDER BY passenger_count",
)?;
).await?;
// collect the results and print them to stdout
let results = df.collect().await?;
Expand Down
60 changes: 32 additions & 28 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
use datafusion::sql::parser::FileType;

struct BallistaContextState {
Expand Down Expand Up @@ -128,11 +127,11 @@ impl BallistaContext {
}

/// Create a DataFrame representing an Avro table scan
pub fn read_avro(
/// TODO fetch schema from scheduler instead of resolving locally
pub async fn read_avro(
&self,
path: &str,
options: AvroReadOptions,
options: AvroReadOptions<'_>,
) -> Result<Arc<dyn DataFrame>> {
// convert to absolute path because the executor likely has a different working directory
let path = PathBuf::from(path);
Expand All @@ -147,13 +146,13 @@ impl BallistaContext {
guard.config(),
)
};
let df = ctx.read_avro(path.to_str().unwrap(), options)?;
let df = ctx.read_avro(path.to_str().unwrap(), options).await?;
Ok(df)
}

/// Create a DataFrame representing a Parquet table scan
pub fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
/// TODO fetch schema from scheduler instead of resolving locally
pub async fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
// convert to absolute path because the executor likely has a different working directory
let path = PathBuf::from(path);
let path = fs::canonicalize(&path)?;
Expand All @@ -167,16 +166,16 @@ impl BallistaContext {
guard.config(),
)
};
let df = ctx.read_parquet(path.to_str().unwrap())?;
let df = ctx.read_parquet(path.to_str().unwrap()).await?;
Ok(df)
}

/// Create a DataFrame representing a CSV table scan
pub fn read_csv(
/// TODO fetch schema from scheduler instead of resolving locally
pub async fn read_csv(
&self,
path: &str,
options: CsvReadOptions,
options: CsvReadOptions<'_>,
) -> Result<Arc<dyn DataFrame>> {
// convert to absolute path because the executor likely has a different working directory
let path = PathBuf::from(path);
Expand All @@ -191,7 +190,7 @@ impl BallistaContext {
guard.config(),
)
};
let df = ctx.read_csv(path.to_str().unwrap(), options)?;
let df = ctx.read_csv(path.to_str().unwrap(), options).await?;
Ok(df)
}

Expand All @@ -206,39 +205,42 @@ impl BallistaContext {
Ok(())
}

pub fn register_csv(
pub async fn register_csv(
&self,
name: &str,
path: &str,
options: CsvReadOptions,
options: CsvReadOptions<'_>,
) -> Result<()> {
match self.read_csv(path, options)?.to_logical_plan() {
match self.read_csv(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
}

pub fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
match self.read_parquet(path)?.to_logical_plan() {
pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
match self.read_parquet(path).await?.to_logical_plan() {
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
}

pub fn register_avro(
pub async fn register_avro(
&self,
name: &str,
path: &str,
options: AvroReadOptions,
options: AvroReadOptions<'_>,
) -> Result<()> {
match self.read_avro(path, options)?.to_logical_plan() {
match self.read_avro(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
}

/// Create a DataFrame from a SQL statement
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
/// Create a DataFrame from a SQL statement.
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
let state = self.state.lock().unwrap();
create_df_ctx_with_ballista_query_planner(
Expand Down Expand Up @@ -275,15 +277,17 @@ impl BallistaContext {
CsvReadOptions::new()
.schema(&schema.as_ref().to_owned().into())
.has_header(*has_header),
)?;
)
.await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
FileType::Parquet => {
self.register_parquet(name, location)?;
self.register_parquet(name, location).await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
FileType::Avro => {
self.register_avro(name, location, AvroReadOptions::default())?;
self.register_avro(name, location, AvroReadOptions::default())
.await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
_ => Err(DataFusionError::NotImplemented(format!(
Expand All @@ -292,7 +296,7 @@ impl BallistaContext {
))),
},

_ => ctx.sql(sql),
_ => ctx.sql(sql).await,
}
}
}
Expand All @@ -306,7 +310,7 @@ mod tests {
let context = BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1)
.await
.unwrap();
let df = context.sql("SELECT 1;").unwrap();
let df = context.sql("SELECT 1;").await.unwrap();
df.collect().await.unwrap();
}
}
1 change: 1 addition & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sqlparser = "0.11.0"
tokio = "1.0"
tonic = "0.5"
uuid = { version = "0.8", features = ["v4"] }
chrono = "0.4"

arrow-flight = { version = "^5.3" }

Expand Down
Loading

0 comments on commit 2454e46

Please sign in to comment.