Skip to content

Commit

Permalink
Add Ballista support to DataFusion CLI (#889)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Aug 16, 2021
1 parent 57bf54f commit d4e32d1
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 59 deletions.
104 changes: 74 additions & 30 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@ use std::path::PathBuf;
use std::sync::{Arc, Mutex};

use ballista_core::config::BallistaConfig;
use ballista_core::{datasource::DfTableAdapter, utils::create_datafusion_context};
use ballista_core::{
datasource::DfTableAdapter, utils::create_df_ctx_with_ballista_query_planner,
};

use datafusion::catalog::TableReference;
use datafusion::dataframe::DataFrame;
use datafusion::error::Result;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::sql::parser::FileType;

struct BallistaContextState {
/// Ballista configuration
Expand Down Expand Up @@ -129,12 +133,14 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let guard = self.state.lock().unwrap();
let mut ctx = create_datafusion_context(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
);
let mut ctx = {
let guard = self.state.lock().unwrap();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
)
};
let df = ctx.read_parquet(path.to_str().unwrap())?;
Ok(df)
}
Expand All @@ -151,12 +157,14 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let guard = self.state.lock().unwrap();
let mut ctx = create_datafusion_context(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
);
let mut ctx = {
let guard = self.state.lock().unwrap();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
)
};
let df = ctx.read_csv(path.to_str().unwrap(), options)?;
Ok(df)
}
Expand Down Expand Up @@ -187,23 +195,59 @@ impl BallistaContext {

/// Create a DataFrame from a SQL statement
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
// use local DataFusion context for now but later this might call the scheduler
// register tables
let state = self.state.lock().unwrap();
let mut ctx = create_datafusion_context(
&state.scheduler_host,
state.scheduler_port,
state.config(),
);
for (name, plan) in &state.tables {
let plan = ctx.optimize(plan)?;
let execution_plan = ctx.create_physical_plan(&plan)?;
ctx.register_table(
TableReference::Bare { table: name },
Arc::new(DfTableAdapter::new(plan, execution_plan)),
)?;
let mut ctx = {
let state = self.state.lock().unwrap();
create_df_ctx_with_ballista_query_planner(
&state.scheduler_host,
state.scheduler_port,
state.config(),
)
};

// register tables with DataFusion context
{
let state = self.state.lock().unwrap();
for (name, plan) in &state.tables {
let plan = ctx.optimize(plan)?;
let execution_plan = ctx.create_physical_plan(&plan)?;
ctx.register_table(
TableReference::Bare { table: name },
Arc::new(DfTableAdapter::new(plan, execution_plan)),
)?;
}
}

let plan = ctx.create_logical_plan(sql)?;
match plan {
LogicalPlan::CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
ref has_header,
} => match file_type {
FileType::CSV => {
self.register_csv(
name,
location,
CsvReadOptions::new()
.schema(&schema.as_ref().to_owned().into())
.has_header(*has_header),
)?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
FileType::Parquet => {
self.register_parquet(name, location)?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
))),
},

_ => ctx.sql(sql),
}
ctx.sql(sql)
}
}

Expand Down
23 changes: 16 additions & 7 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionStats;

use crate::config::BallistaConfig;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::{
array::{
Expand All @@ -51,6 +52,7 @@ use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::csv::CsvExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
Expand Down Expand Up @@ -236,8 +238,9 @@ fn build_exec_plan_diagram(
Ok(node_id)
}

/// Create a DataFusion context that is compatible with Ballista
pub fn create_datafusion_context(
/// Create a DataFusion context that uses the BallistaQueryPlanner to send logical plans
/// to a Ballista scheduler
pub fn create_df_ctx_with_ballista_query_planner(
scheduler_host: &str,
scheduler_port: u16,
config: &BallistaConfig,
Expand Down Expand Up @@ -272,11 +275,17 @@ impl QueryPlanner for BallistaQueryPlanner {
logical_plan: &LogicalPlan,
_ctx_state: &ExecutionContextState,
) -> std::result::Result<Arc<dyn ExecutionPlan>, DataFusionError> {
Ok(Arc::new(DistributedQueryExec::new(
self.scheduler_url.clone(),
self.config.clone(),
logical_plan.clone(),
)))
match logical_plan {
LogicalPlan::CreateExternalTable { .. } => {
// table state is managed locally in the BallistaContext, not in the scheduler
Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
}
_ => Ok(Arc::new(DistributedQueryExec::new(
self.scheduler_url.clone(),
self.config.clone(),
logical_plan.clone(),
))),
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ name = "datafusion-cli"
version = "4.0.0-SNAPSHOT"
authors = ["Apache Arrow <[email protected]>"]
edition = "2018"
keywords = [ "arrow", "query", "sql", "cli", "repl" ]
keywords = [ "arrow", "datafusion", "ballista", "query", "sql", "cli", "repl" ]
license = "Apache-2.0"
homepage = "https://github.com/apache/arrow-datafusion"
repository = "https://github.com/apache/arrow-datafusion"
Expand All @@ -31,4 +31,5 @@ clap = "2.33"
rustyline = "8.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
datafusion = { path = "../datafusion", version = "5.1.0" }
arrow = { version = "5.0" }
ballista = { path = "../ballista/rust/client", version = "0.6.0" }
arrow = { version = "5.0" }
74 changes: 74 additions & 0 deletions datafusion-cli/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# DataFusion Command-line Interface

The DataFusion CLI allows SQL queries to be executed by an in-process DataFusion context, or by a distributed
Ballista context.

```ignore
USAGE:
datafusion-cli [FLAGS] [OPTIONS]
FLAGS:
-h, --help Prints help information
-q, --quiet Reduce printing other than the results and work quietly
-V, --version Prints version information
OPTIONS:
-c, --batch-size <batch-size> The batch size of each query, or use DataFusion default
-p, --data-path <data-path> Path to your data, default to current directory
-f, --file <file>... Execute commands from file(s), then exit
--format <format> Output format [default: table] [possible values: csv, tsv, table, json, ndjson]
--host <host> Ballista scheduler host
--port <port> Ballista scheduler port
```

## Example

Create a CSV file to query.

```bash,ignore
$ echo "1,2" > data.csv
```

```sql,ignore
$ datafusion-cli
DataFusion CLI v4.0.0-SNAPSHOT
> CREATE EXTERNAL TABLE foo (a INT, b INT) STORED AS CSV LOCATION 'data.csv';
0 rows in set. Query took 0.001 seconds.
> SELECT * FROM foo;
+---+---+
| a | b |
+---+---+
| 1 | 2 |
+---+---+
1 row in set. Query took 0.017 seconds.
```

## Ballista

The DataFusion CLI can connect to a Ballista scheduler for query execution.

```bash,ignore
datafusion-cli --host localhost --port 50050
```
5 changes: 5 additions & 0 deletions datafusion-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#![doc = include_str!("../README.md")]
#![allow(unused_imports)]
pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");

pub mod print_format;

use datafusion::arrow::record_batch::RecordBatch;
Expand Down
Loading

0 comments on commit d4e32d1

Please sign in to comment.