Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.
The foundational technologies in Ballista are:
- Apache Arrow memory model and compute kernels for efficient processing of data.
- Apache Arrow Flight Protocol for efficient data transfer between processes.
- Google Protocol Buffers for serializing query plans.
- Docker for packaging up executors along with user-defined code.
Ballista can be deployed as a standalone cluster and also supports Kubernetes. In either case, the scheduler can be configured to use etcd as a backing store to (eventually) provide redundancy in the case of a scheduler failing.
There are numerous ways to start a Ballista cluster, including support for Docker and Kubernetes. For full documentation, refer to the DataFusion User Guide
A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.
cargo install ballista-scheduler
cargo install ballista-executor
With these crates installed, it is now possible to start a scheduler process.
RUST_LOG=info ballista-scheduler
The scheduler will bind to port 50050 by default.
Next, start an executor processes in a new terminal session with the specified concurrency level.
RUST_LOG=info ballista-executor -c 4
The executor will bind to port 50051 by default. Additional executors can be started by manually specifying a bind port. For example:
RUST_LOG=info ballista-executor --bind-port 50052 -c 4
Ballista provides a BallistaContext
as a starting point for creating queries. DataFrames can be created
by invoking the read_csv
, read_parquet
, and sql
methods.
The following example runs a simple aggregate SQL query against a CSV file from the New York Taxi and Limousine Commission data set.
use ballista::prelude::*;
use datafusion::arrow::util::pretty;
use datafusion::prelude::CsvReadOptions;
#[tokio::main]
async fn main() -> Result<()> {
// create configuration
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
// connect to Ballista scheduler
let ctx = BallistaContext::remote("localhost", 50050, &config);
// register csv file with the execution context
ctx.register_csv(
"tripdata",
"/path/to/yellow_tripdata_2020-01.csv",
CsvReadOptions::new(),
)?;
// execute the query
let df = ctx.sql(
"SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
FROM tripdata
GROUP BY passenger_count
ORDER BY passenger_count",
)?;
// collect the results and print them to stdout
let results = df.collect().await?;
pretty::print_batches(&results)?;
Ok(())
}