Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganize table providers by table format #1010

Merged
merged 39 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5e11154
[feat] stubs for provider re-organization
rdettai Oct 12, 2021
4ea972e
[feat] implement infer_schema to make test pass
rdettai Oct 12, 2021
ca37607
[wip] trying to implement pruned_partition_list
rdettai Oct 12, 2021
5034275
[typo]
rdettai Oct 12, 2021
8c7321e
[fix] replace enum with trait for extensibility
rdettai Oct 12, 2021
c69faad
[fix] add partition cols to infered schema
rdettai Oct 12, 2021
551ceb4
[feat] forked file format executors
rdettai Oct 12, 2021
522747e
[doc] comments about why we are flattening
rdettai Oct 12, 2021
33c7b91
[test] migrated tests to file formats
rdettai Oct 12, 2021
0bc191e
[test] improve listing test
rdettai Oct 12, 2021
fa115db
[feat] add avro to refactored format providers
rdettai Oct 12, 2021
553097e
[fix] remove try from new when unnecessary
rdettai Oct 12, 2021
7e0e24f
[fix] remove try_ from ListingTable new
rdettai Oct 12, 2021
abeccce
[refacto] renamed format module to file_format
rdettai Oct 12, 2021
8abcc97
[fix] removed Ballista stubs
rdettai Oct 12, 2021
c1530a9
[fix] rename create_executor
rdettai Oct 12, 2021
a763512
[feat] added store
rdettai Oct 12, 2021
e71229d
[fix] Clippy
rdettai Oct 12, 2021
f723262
[test] improve file_format tests with limit
rdettai Oct 12, 2021
6a7c36a
[fix] limit file system read size
rdettai Oct 12, 2021
ccc7938
[fix] avoid fetching unnecessary stats after limit
rdettai Oct 12, 2021
4dbe201
[fix] improve readability
rdettai Oct 12, 2021
6879938
[doc] improve comments
rdettai Oct 12, 2021
1947a87
[refacto] keep async reader stub
rdettai Oct 12, 2021
aa7cbad
[doc] cleanup comments
rdettai Oct 12, 2021
d6a718b
[test] test file listing
rdettai Oct 12, 2021
1be110c
[fix] add last_modified back
rdettai Oct 12, 2021
cbfbdc1
[refacto] simplify csv reader exec
rdettai Oct 12, 2021
3a4f6d8
[refacto] change SizedFile back to FileMeta
rdettai Oct 12, 2021
d20eff3
[doc] comment clarification
rdettai Oct 12, 2021
7a4a250
[fix] avoid keeping object store as field
rdettai Oct 12, 2021
cac1c87
[refacto] grouped params to avoid too_many_arguments
rdettai Oct 12, 2021
ab3fcdd
[fix] get_by_uri also returns path
rdettai Oct 12, 2021
8f4e242
[fix] ListingTable at store level instead of registry
rdettai Oct 12, 2021
db08372
[fix] builder take self and not ref to self
rdettai Oct 12, 2021
e0a670f
Replace file format providers (#2)
rdettai Oct 12, 2021
47a17fd
[doc] clearer doc about why sql() is async
rdettai Oct 12, 2021
6db3533
[doc] typos and clarity
rdettai Oct 12, 2021
2446b81
[fix] missing await after rebase
rdettai Oct 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ use datafusion::arrow::record_batch::RecordBatch;
async fn main() -> datafusion::error::Result<()> {
// register the table
let mut ctx = ExecutionContext::new();
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())?;
ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?;

// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")?;
let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;

// execute and print results
df.show().await?;
Expand All @@ -98,7 +98,7 @@ use datafusion::arrow::record_batch::RecordBatch;
async fn main() -> datafusion::error::Result<()> {
// create the dataframe
let mut ctx = ExecutionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;

let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?;
Expand Down
3 changes: 2 additions & 1 deletion ballista-examples/src/bin/ballista-dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ async fn main() -> Result<()> {

// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename)?
.read_parquet(filename)
.await?
.select_columns(&["id", "bool_col", "timestamp_col"])?
.filter(col("id").gt(lit(1)))?;

Expand Down
11 changes: 7 additions & 4 deletions ballista-examples/src/bin/ballista-sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ async fn main() -> Result<()> {
"aggregate_test_100",
&format!("{}/csv/aggregate_test_100.csv", testdata),
CsvReadOptions::new(),
)?;
)
.await?;

// execute the query
let df = ctx.sql(
"SELECT c1, MIN(c12), MAX(c12) \
let df = ctx
.sql(
"SELECT c1, MIN(c12), MAX(c12) \
FROM aggregate_test_100 \
WHERE c11 > 0.1 AND c11 < 0.9 \
GROUP BY c1",
)?;
)
.await?;

// print the results
df.show().await?;
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ async fn main() -> Result<()> {
"tripdata",
"/path/to/yellow_tripdata_2020-01.csv",
CsvReadOptions::new(),
)?;
).await?;

// execute the query
let df = ctx.sql(
"SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
FROM tripdata
GROUP BY passenger_count
ORDER BY passenger_count",
)?;
).await?;

// collect the results and print them to stdout
let results = df.collect().await?;
Expand Down
60 changes: 32 additions & 28 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
use datafusion::sql::parser::FileType;

struct BallistaContextState {
Expand Down Expand Up @@ -128,11 +127,11 @@ impl BallistaContext {
}

/// Create a DataFrame representing an Avro table scan

pub fn read_avro(
/// TODO fetch schema from scheduler instead of resolving locally
pub async fn read_avro(
&self,
path: &str,
options: AvroReadOptions,
options: AvroReadOptions<'_>,
) -> Result<Arc<dyn DataFrame>> {
// convert to absolute path because the executor likely has a different working directory
let path = PathBuf::from(path);
Expand All @@ -147,13 +146,13 @@ impl BallistaContext {
guard.config(),
)
};
let df = ctx.read_avro(path.to_str().unwrap(), options)?;
let df = ctx.read_avro(path.to_str().unwrap(), options).await?;
Ok(df)
}

/// Create a DataFrame representing a Parquet table scan

pub fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
/// TODO fetch schema from scheduler instead of resolving locally
pub async fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
// convert to absolute path because the executor likely has a different working directory
let path = PathBuf::from(path);
let path = fs::canonicalize(&path)?;
Expand All @@ -167,16 +166,16 @@ impl BallistaContext {
guard.config(),
)
};
let df = ctx.read_parquet(path.to_str().unwrap())?;
let df = ctx.read_parquet(path.to_str().unwrap()).await?;
Ok(df)
}

/// Create a DataFrame representing a CSV table scan

pub fn read_csv(
/// TODO fetch schema from scheduler instead of resolving locally
pub async fn read_csv(
&self,
path: &str,
options: CsvReadOptions,
options: CsvReadOptions<'_>,
) -> Result<Arc<dyn DataFrame>> {
// convert to absolute path because the executor likely has a different working directory
let path = PathBuf::from(path);
Expand All @@ -191,7 +190,7 @@ impl BallistaContext {
guard.config(),
)
};
let df = ctx.read_csv(path.to_str().unwrap(), options)?;
let df = ctx.read_csv(path.to_str().unwrap(), options).await?;
Ok(df)
}

Expand All @@ -206,39 +205,42 @@ impl BallistaContext {
Ok(())
}

pub fn register_csv(
pub async fn register_csv(
&self,
name: &str,
path: &str,
options: CsvReadOptions,
options: CsvReadOptions<'_>,
) -> Result<()> {
match self.read_csv(path, options)?.to_logical_plan() {
match self.read_csv(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
}

pub fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
match self.read_parquet(path)?.to_logical_plan() {
pub async fn register_parquet(&self, name: &str, path: &str) -> Result<()> {
match self.read_parquet(path).await?.to_logical_plan() {
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
}

pub fn register_avro(
pub async fn register_avro(
&self,
name: &str,
path: &str,
options: AvroReadOptions,
options: AvroReadOptions<'_>,
) -> Result<()> {
match self.read_avro(path, options)?.to_logical_plan() {
match self.read_avro(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan { source, .. } => self.register_table(name, source),
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
}

/// Create a DataFrame from a SQL statement
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
/// Create a DataFrame from a SQL statement.
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
let state = self.state.lock().unwrap();
create_df_ctx_with_ballista_query_planner(
Expand Down Expand Up @@ -275,15 +277,17 @@ impl BallistaContext {
CsvReadOptions::new()
.schema(&schema.as_ref().to_owned().into())
.has_header(*has_header),
)?;
)
.await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
FileType::Parquet => {
self.register_parquet(name, location)?;
self.register_parquet(name, location).await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
FileType::Avro => {
self.register_avro(name, location, AvroReadOptions::default())?;
self.register_avro(name, location, AvroReadOptions::default())
.await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
_ => Err(DataFusionError::NotImplemented(format!(
Expand All @@ -292,7 +296,7 @@ impl BallistaContext {
))),
},

_ => ctx.sql(sql),
_ => ctx.sql(sql).await,
}
}
}
Expand All @@ -306,7 +310,7 @@ mod tests {
let context = BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1)
.await
.unwrap();
let df = context.sql("SELECT 1;").unwrap();
let df = context.sql("SELECT 1;").await.unwrap();
df.collect().await.unwrap();
}
}
1 change: 1 addition & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sqlparser = "0.11.0"
tokio = "1.0"
tonic = "0.5"
uuid = { version = "0.8", features = ["v4"] }
chrono = "0.4"

arrow-flight = { version = "^5.3" }

Expand Down
Loading