Skip to content

Commit

Permalink
refactor: Add DSL validation for cloud eligible check (#17287)
Browse files Browse the repository at this point in the history
Co-authored-by: Stijn de Gooijer <[email protected]>
  • Loading branch information
ritchie46 and stinodego authored Jul 16, 2024
1 parent ecddc13 commit e74c0d5
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 4 deletions.
6 changes: 3 additions & 3 deletions crates/polars-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ pub enum PolarsError {
ComputeError(ErrString),
#[error("duplicate: {0}")]
Duplicate(ErrString),
#[error("invalid operation: {0}")]
#[error("{0}")]
InvalidOperation(ErrString),
#[error("{}", match msg {
Some(msg) => format!("{}", msg),
None => format!("{}", error)
Some(msg) => format!("{}", msg),
None => format!("{}", error)
})]
IO {
error: Arc<io::Error>,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ binary_encoding = ["polars-plan/binary_encoding"]
string_encoding = ["polars-plan/string_encoding"]

bigidx = ["polars-plan/bigidx"]
polars_cloud = ["polars-plan/polars_cloud"]

panic_on_schema = ["polars-plan/panic_on_schema", "polars-expr/panic_on_schema"]

Expand Down
2 changes: 2 additions & 0 deletions crates/polars-lazy/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub use polars_io::parquet::write::ParquetWriteOptions;
pub use polars_ops::prelude::{JoinArgs, JoinType, JoinValidation};
#[cfg(feature = "rank")]
pub use polars_ops::prelude::{RankMethod, RankOptions};
#[cfg(feature = "polars_cloud")]
pub use polars_plan::client::assert_cloud_eligible;
pub use polars_plan::plans::{
AnonymousScan, AnonymousScanArgs, AnonymousScanOptions, DslPlan, Literal, LiteralValue, Null,
NULL,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ month_end = ["polars-time/month_end"]
offset_by = ["polars-time/offset_by"]

bigidx = ["polars-core/bigidx"]
polars_cloud = []

panic_on_schema = []

Expand Down
102 changes: 102 additions & 0 deletions crates/polars-plan/src/client/dsl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use crate::dsl::Expr;
use crate::prelude::DslPlan;

impl DslPlan {
fn inputs<'a>(&'a self, scratch: &mut Vec<&'a DslPlan>) {
use DslPlan::*;
match self {
Select { input, .. }
| GroupBy { input, .. }
| Filter { input, .. }
| Distinct { input, .. }
| Sort { input, .. }
| Slice { input, .. }
| HStack { input, .. }
| MapFunction { input, .. }
| Sink { input, .. }
| Cache { input, .. } => scratch.push(input),
Union { inputs, .. } | HConcat { inputs, .. } => scratch.extend(inputs),
Join {
input_left,
input_right,
..
} => {
scratch.push(input_left);
scratch.push(input_right);
},
ExtContext { input, contexts } => {
scratch.push(input);
scratch.extend(contexts);
},
IR { dsl, .. } => scratch.push(dsl),
Scan { .. } | DataFrameScan { .. } => (),
#[cfg(feature = "python")]
PythonScan { .. } => (),
}
}

pub(super) fn get_expr<'a>(&'a self, scratch: &mut Vec<&'a Expr>) {
use DslPlan::*;
match self {
Filter { predicate, .. } => scratch.push(predicate),
Scan { predicate, .. } => {
if let Some(expr) = predicate {
scratch.push(expr)
}
},
DataFrameScan { filter, .. } => {
if let Some(expr) = filter {
scratch.push(expr)
}
},
Select { expr, .. } => scratch.extend(expr),
HStack { exprs, .. } => scratch.extend(exprs),
Sort { by_column, .. } => scratch.extend(by_column),
GroupBy { keys, aggs, .. } => {
scratch.extend(keys);
scratch.extend(aggs);
},
Join {
left_on, right_on, ..
} => {
scratch.extend(left_on);
scratch.extend(right_on);
},
Cache { .. }
| Distinct { .. }
| Slice { .. }
| MapFunction { .. }
| Union { .. }
| HConcat { .. }
| ExtContext { .. }
| Sink { .. }
| IR { .. } => (),
#[cfg(feature = "python")]
PythonScan { .. } => (),
}
}
}

pub struct DslPlanIter<'a> {
stack: Vec<&'a DslPlan>,
}

impl<'a> Iterator for DslPlanIter<'a> {
type Item = &'a DslPlan;

fn next(&mut self) -> Option<Self::Item> {
self.stack.pop().map(|next| {
next.inputs(&mut self.stack);
next
})
}
}

impl<'a> IntoIterator for &'a DslPlan {
type Item = &'a DslPlan;
type IntoIter = DslPlanIter<'a>;

fn into_iter(self) -> Self::IntoIter {
DslPlanIter { stack: vec![self] }
}
}
68 changes: 68 additions & 0 deletions crates/polars-plan/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
mod dsl;

use polars_core::error::{polars_err, PolarsResult};
use polars_io::path_utils::is_cloud_url;

use crate::dsl::Expr;
use crate::plans::options::SinkType;
use crate::plans::{DslFunction, DslPlan, FunctionNode};

/// Assert that the given [`DslPlan`] is eligible to be executed on Polars Cloud.
pub fn assert_cloud_eligible(dsl: &DslPlan) -> PolarsResult<()> {
let mut expr_stack = vec![];
for plan_node in dsl.into_iter() {
match plan_node {
DslPlan::MapFunction {
function: DslFunction::FunctionNode(function),
..
} => match function {
FunctionNode::Opaque { .. } => return ineligible_error("contains opaque function"),
#[cfg(feature = "python")]
FunctionNode::OpaquePython { .. } => {
return ineligible_error("contains Python function")
},
_ => (),
},
#[cfg(feature = "python")]
DslPlan::PythonScan { .. } => return ineligible_error("contains Python scan"),
DslPlan::GroupBy { apply: Some(_), .. } => {
return ineligible_error("contains Python function in group by operation")
},
DslPlan::Scan { paths, .. }
if paths.lock().unwrap().0.iter().any(|p| !is_cloud_url(p)) =>
{
return ineligible_error("contains scan of local file system")
},
DslPlan::Sink { payload, .. } => {
if !matches!(payload, SinkType::Cloud { .. }) {
return ineligible_error("contains sink to non-cloud location");
}
},
plan => {
plan.get_expr(&mut expr_stack);

for expr in expr_stack.drain(..) {
for expr_node in expr.into_iter() {
match expr_node {
Expr::AnonymousFunction { .. } => {
return ineligible_error("contains anonymous function")
},
Expr::RenameAlias { .. } => {
return ineligible_error("contains custom name remapping")
},
_ => (),
}
}
}
},
}
}
Ok(())
}

fn ineligible_error(message: &str) -> PolarsResult<()> {
Err(polars_err!(
InvalidOperation:
"logical plan ineligible for execution on Polars Cloud: {message}"
))
}
2 changes: 2 additions & 0 deletions crates/polars-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ pub mod plans;
pub mod prelude;
// Activate later
// mod reduce;
#[cfg(feature = "polars_cloud")]
pub mod client;
pub mod utils;
4 changes: 3 additions & 1 deletion crates/polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ approx_unique = ["polars-lazy?/approx_unique", "polars-ops/approx_unique"]
arg_where = ["polars-lazy?/arg_where"]
array_any_all = ["polars-lazy?/array_any_all", "dtype-array"]
asof_join = ["polars-lazy?/asof_join", "polars-ops/asof_join"]
bigidx = ["polars-core/bigidx", "polars-lazy?/bigidx", "polars-ops/big_idx"]
binary_encoding = ["polars-ops/binary_encoding", "polars-lazy?/binary_encoding", "polars-sql?/binary_encoding"]
business = ["polars-lazy?/business", "polars-ops/business"]
checked_arithmetic = ["polars-core/checked_arithmetic"]
Expand Down Expand Up @@ -227,6 +226,9 @@ true_div = ["polars-lazy?/true_div"]
unique_counts = ["polars-ops/unique_counts", "polars-lazy?/unique_counts"]
zip_with = ["polars-core/zip_with"]

bigidx = ["polars-core/bigidx", "polars-lazy?/bigidx", "polars-ops/big_idx"]
polars_cloud = ["polars-lazy?/polars_cloud"]

test = [
"lazy",
"rolling_window",
Expand Down
3 changes: 3 additions & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ optimizations = [
"streaming",
]

polars_cloud = ["polars/polars_cloud"]

all = [
"optimizations",
"io",
Expand All @@ -244,6 +246,7 @@ all = [
"sql",
"binary_encoding",
"ffi_plugin",
"polars_cloud",
# "new_streaming",
]

Expand Down
30 changes: 30 additions & 0 deletions py-polars/polars/_utils/cloud.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import polars.polars as plr

if TYPE_CHECKING:
from polars import LazyFrame


def assert_cloud_eligible(lf: LazyFrame) -> None:
"""
Assert that the given LazyFrame is eligible to be executed on Polars Cloud.
The following conditions will disqualify a LazyFrame from being eligible:
- Contains a user-defined function
- Scans a local filesystem
Parameters
----------
lf
The LazyFrame to check.
Raises
------
AssertionError
If the given LazyFrame is not eligible to be run on Polars Cloud.
"""
plr.assert_cloud_eligible(lf._ldf)
12 changes: 12 additions & 0 deletions py-polars/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use pyo3::exceptions::PyAssertionError;
use pyo3::prelude::*;

use crate::PyLazyFrame;

#[pyfunction]
pub fn assert_cloud_eligible(lf: PyLazyFrame) -> PyResult<()> {
let plan = &lf.ldf.logical_plan;
polars::prelude::assert_cloud_eligible(plan)
.map_err(|e| PyAssertionError::new_err(e.to_string()))?;
Ok(())
}
7 changes: 7 additions & 0 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ mod build {
mod allocator;
#[cfg(feature = "csv")]
mod batched_csv;
#[cfg(feature = "polars_cloud")]
mod cloud;
mod conversion;
mod dataframe;
mod datatypes;
Expand Down Expand Up @@ -402,6 +404,11 @@ fn polars(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
)
.unwrap();

// Cloud
#[cfg(feature = "polars_cloud")]
m.add_wrapped(wrap_pyfunction!(cloud::assert_cloud_eligible))
.unwrap();

// Build info
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
#[cfg(feature = "build_info")]
Expand Down
Empty file.
Loading

0 comments on commit e74c0d5

Please sign in to comment.