From 1b1a3a102a26e454a0ca85ce0ff12586f8767faa Mon Sep 17 00:00:00 2001 From: raviranak <126759945+raviranak@users.noreply.github.com> Date: Wed, 15 Nov 2023 15:44:06 +0530 Subject: [PATCH 1/3] delta support --- raysql/context.py | 3 +++ src/context.rs | 10 ++++++++++ 2 files changed, 13 insertions(+) diff --git a/raysql/context.py b/raysql/context.py index 28efb8f..a003d53 100644 --- a/raysql/context.py +++ b/raysql/context.py @@ -203,6 +203,9 @@ def register_csv(self, table_name: str, path: str, has_header: bool): def register_parquet(self, table_name: str, path: str): self.ctx.register_parquet(table_name, path) + def register_data_lake(self, table_name: str, paths: List[str]): + self.ctx.register_datalake_table(table_name, paths) + def sql(self, sql: str) -> pa.RecordBatch: # TODO we should parse sql and inspect the plan rather than # perform a string comparison here diff --git a/src/context.rs b/src/context.rs index cd7bda1..633ecaf 100644 --- a/src/context.rs +++ b/src/context.rs @@ -73,6 +73,16 @@ impl PyContext { Ok(()) } + pub fn register_datalake_table(&self, name: &str, path: Vec<&str>, py: Python) -> PyResult<()> { + let options = ParquetReadOptions::default(); + let listing_options = options.to_listing_options(&self.ctx.state().config()); + wait_for_future(py, self.ctx.register_listing_table(name, path, listing_options, None, None))?; + Ok(()) + } + + + + /// Execute SQL directly against the DataFusion context. Useful for statements /// such as "create view" or "drop view" pub fn sql(&self, sql: &str, py: Python) -> PyResult<()> { From 2046d4169bc6c54e4c980b54174192a3e9b489cc Mon Sep 17 00:00:00 2001 From: raviranak <126759945+raviranak@users.noreply.github.com> Date: Wed, 15 Nov 2023 16:04:53 +0530 Subject: [PATCH 2/3] imports --- raysql/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raysql/context.py b/raysql/context.py index a003d53..312fc86 100644 --- a/raysql/context.py +++ b/raysql/context.py @@ -8,7 +8,7 @@ import raysql from raysql import Context, ExecutionGraph, QueryStage - +from typing import List def schedule_execution( graph: ExecutionGraph, From 68279e2f8cc83e6d083ac5722985a253ab639074 Mon Sep 17 00:00:00 2001 From: raviranak <126759945+raviranak@users.noreply.github.com> Date: Mon, 20 Nov 2023 10:30:30 +0530 Subject: [PATCH 3/3] changes for pa dataset support --- Cargo.toml | 6 +-- raysql/context.py | 4 +- src/context.rs | 13 +++-- src/dataset.rs | 126 ++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 5 files changed, 141 insertions(+), 9 deletions(-) create mode 100644 src/dataset.rs diff --git a/Cargo.toml b/Cargo.toml index b71faf1..c85abde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,9 @@ rust-version = "1.62" build = "build.rs" [dependencies] -datafusion = { version = "20.0.0", features = ["pyarrow", "avro"] } -datafusion-proto = "20.0.0" -datafusion-python = "20.0.0" +datafusion = { version = "22.0.0", features = ["pyarrow", "avro"] } +datafusion-proto = "22.0.0" +datafusion-python = "22.0.0" futures = "0.3" glob = "0.3" log = "0.4" diff --git a/raysql/context.py b/raysql/context.py index 312fc86..3b704b3 100644 --- a/raysql/context.py +++ b/raysql/context.py @@ -203,8 +203,8 @@ def register_csv(self, table_name: str, path: str, has_header: bool): def register_parquet(self, table_name: str, path: str): self.ctx.register_parquet(table_name, path) - def register_data_lake(self, table_name: str, paths: List[str]): - self.ctx.register_datalake_table(table_name, paths) + def register_dataset(self, table_name: str, dataset: pa.Dataset): + self.ctx.register_dataset(table_name, dataset) def sql(self, sql: str) -> pa.RecordBatch: # TODO we should parse sql and inspect the plan rather than diff --git a/src/context.rs b/src/context.rs index 633ecaf..223cd14 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,9 +1,11 @@ +use crate::dataset::Dataset; use crate::planner::{make_execution_graph, PyExecutionGraph}; use crate::shuffle::{RayShuffleReaderExec, ShuffleCodec}; use crate::utils::wait_for_future; use datafusion::arrow::pyarrow::PyArrowConvert; use datafusion::arrow::record_batch::RecordBatch; use datafusion::config::Extensions; +use datafusion::datasource::TableProvider; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::TaskContext; use datafusion::execution::disk_manager::DiskManagerConfig; @@ -73,10 +75,13 @@ impl PyContext { Ok(()) } - pub fn register_datalake_table(&self, name: &str, path: Vec<&str>, py: Python) -> PyResult<()> { - let options = ParquetReadOptions::default(); - let listing_options = options.to_listing_options(&self.ctx.state().config()); - wait_for_future(py, self.ctx.register_listing_table(name, path, listing_options, None, None))?; + fn register_dataset(&self, name: &str, dataset: &PyAny, py: Python) -> PyResult<()> { + let table: Arc = Arc::new(Dataset::new(dataset, py)?); + + self.ctx + .register_table(name, table) + .map_err(DataFusionError::from)?; + Ok(()) } diff --git a/src/dataset.rs b/src/dataset.rs new file mode 100644 index 0000000..2fad226 --- /dev/null +++ b/src/dataset.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use pyo3::exceptions::PyValueError; +/// Implements a Datafusion TableProvider that delegates to a PyArrow Dataset +/// This allows us to use PyArrow Datasets as Datafusion tables while pushing down projections and filters +use pyo3::prelude::*; +use pyo3::types::PyType; + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; + +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::pyarrow::PyArrowType; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::execution::context::SessionState; +use datafusion::logical_expr::TableProviderFilterPushDown; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_expr::Expr; + +use crate::dataset_exec::DatasetExec; +use crate::pyarrow_filter_expression::PyArrowFilterExpression; + +// Wraps a pyarrow.dataset.Dataset class and implements a Datafusion TableProvider around it +#[derive(Debug, Clone)] +pub(crate) struct Dataset { + dataset: PyObject, +} + +impl Dataset { + // Creates a Python PyArrow.Dataset + pub fn new(dataset: &PyAny, py: Python) -> PyResult { + // Ensure that we were passed an instance of pyarrow.dataset.Dataset + let ds = PyModule::import(py, "pyarrow.dataset")?; + let ds_type: &PyType = ds.getattr("Dataset")?.downcast()?; + if dataset.is_instance(ds_type)? { + Ok(Dataset { + dataset: dataset.into(), + }) + } else { + Err(PyValueError::new_err( + "dataset argument must be a pyarrow.dataset.Dataset object", + )) + } + } +} + +#[async_trait] +impl TableProvider for Dataset { + /// Returns the table provider as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any { + self + } + + /// Get a reference to the schema for this table + fn schema(&self) -> SchemaRef { + Python::with_gil(|py| { + let dataset = self.dataset.as_ref(py); + // This can panic but since we checked that self.dataset is a pyarrow.dataset.Dataset it should never + Arc::new( + dataset + .getattr("schema") + .unwrap() + .extract::>() + .unwrap() + .0, + ) + }) + } + + /// Get the type of this table for metadata/catalog purposes. + fn table_type(&self) -> TableType { + TableType::Base + } + + /// Create an ExecutionPlan that will scan the table. + /// The table provider will be usually responsible of grouping + /// the source data into partitions that can be efficiently + /// parallelized or distributed. + async fn scan( + &self, + _ctx: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + // limit can be used to reduce the amount scanned + // from the datasource as a performance optimization. + // If set, it contains the amount of rows needed by the `LogicalPlan`, + // The datasource should return *at least* this number of rows if available. + _limit: Option, + ) -> DFResult> { + Python::with_gil(|py| { + let plan: Arc = Arc::new( + DatasetExec::new(py, self.dataset.as_ref(py), projection.cloned(), filters) + .map_err(|err| DataFusionError::External(Box::new(err)))?, + ); + Ok(plan) + }) + } + + /// Tests whether the table provider can make use of a filter expression + /// to optimise data retrieval. + fn supports_filter_pushdown(&self, filter: &Expr) -> DFResult { + match PyArrowFilterExpression::try_from(filter) { + Ok(_) => Ok(TableProviderFilterPushDown::Exact), + _ => Ok(TableProviderFilterPushDown::Unsupported), + } + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index cca8d16..e101144 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ pub mod planner; pub mod query_stage; pub mod shuffle; pub mod utils; +pub mod dataset; /// A Python module implemented in Rust. #[pymodule]