Skip to content
This repository has been archived by the owner on Oct 2, 2024. It is now read-only.

datafusion-contrib/ray-sql

Repository files navigation

RaySQL: DataFusion on Ray

This was a research project to evaluate performing distributed SQL queries from Python, using Ray and DataFusion.

** The code has now moved to https://github.com/apache/datafusion-ray **

Goals

  • Demonstrate how easily new systems can be built on top of DataFusion. See the design documentation to understand how RaySQL works.
  • Drive requirements for DataFusion's Python bindings.
  • Create content for an interesting blog post or conference talk.

Non Goals

  • Build and support a production system.

Example

Run the following example live in your browser using a Google Colab notebook.

import os
import pandas as pd
import ray

from raysql import RaySqlContext

SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))

# Start a local cluster
ray.init(resources={"worker": 1})

# Create a context and register a table
ctx = RaySqlContext(2, use_ray_shuffle=True)
# Register either a CSV or Parquet file
# ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True)
ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet")

result_set = ctx.sql(
  "select sex, smoker, avg(tip/total_bill) as tip_pct from tips group by sex, smoker"
)
for record_batch in result_set:
  print(record_batch.to_pandas())

Status

  • RaySQL can run all queries in the TPC-H benchmark

Features

  • Mature SQL support (CTEs, joins, subqueries, etc) thanks to DataFusion
  • Support for CSV and Parquet files

Limitations

  • Requires a shared file system currently

Performance

This chart shows the performance of RaySQL compared to Apache Spark for SQLBench-H at a very small data set (10GB), running on a desktop (Threadripper with 24 physical cores). Both RaySQL and Spark are configured with 24 executors.

Overall Time

RaySQL is ~1.9x faster overall for this scale factor and environment with disk-based shuffle.

SQLBench-H Total

Per Query Time

Spark is much faster on some queries, likely due to broadcast exchanges, which RaySQL hasn't implemented yet.

SQLBench-H Per Query

Performance Plan

I'm planning on experimenting with the following changes to improve performance:

  • Make better use of Ray futures to run more tasks in parallel
  • Use Ray object store for shuffle data transfer to reduce disk I/O cost
  • Keep upgrading to newer versions of DataFusion to pick up the latest optimizations

Building

# prepare development environment (used to build wheel / install in development)
python3 -m venv venv
# activate the venv
source venv/bin/activate
# update pip itself if necessary
python -m pip install -U pip
# install dependencies (for Python 3.8+)
python -m pip install -r requirements-in.txt

Whenever rust code changes (your changes or via git pull):

# make sure you activate the venv using "source venv/bin/activate" first
maturin develop
python -m pytest

Benchmarking

Create a release build when running benchmarks, then use pip to install the wheel.

maturin develop --release

How to update dependencies

To change test dependencies, change the requirements.in and run

# install pip-tools (this can be done only once), also consider running in venv
python -m pip install pip-tools
python -m piptools compile --generate-hashes -o requirements-310.txt

To update dependencies, run with -U

python -m piptools compile -U --generate-hashes -o requirements-310.txt

More details here

About

Distributed SQL Query Engine in Python using Ray

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •