Skip to content

Commit

Permalink
refactor(dask): port the dask backend to the new execution model (#8005)
Browse files Browse the repository at this point in the history
Reimplementation of the dask backend on top of the new pandas executor.
I had to adjust the pandas backend to support extending. This way the
new dask implementation turned out to be pretty tidy.

There are a couple of features which are not implemented using proper
dask constructs, but rather have a fallback to local execution using
pandas. The most notable are the window functions. The previous dask
implementation supported just a couple of window cases, but this way we
have full coverage at least.

Thanks to the new pandas base we have a wider feature coverage, see the
removed xfails in the test suite.
  • Loading branch information
kszucs committed Feb 2, 2024
1 parent 57bb4a8 commit 17f0f8e
Show file tree
Hide file tree
Showing 70 changed files with 2,096 additions and 6,963 deletions.
22 changes: 11 additions & 11 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ jobs:
extras:
- clickhouse
- examples
# - name: dask
# title: Dask
# extras:
# - dask
- name: dask
title: Dask
extras:
- dask
- name: pandas
title: Pandas
extras:
Expand Down Expand Up @@ -438,13 +438,13 @@ jobs:
- "3.9"
- "3.11"
backend:
# - name: dask
# title: Dask
# deps:
# - "dask[array,dataframe]@2022.9.1"
# - "[email protected]"
# extras:
# - dask
- name: dask
title: Dask
deps:
- "dask[array,dataframe]@2022.9.1"
- "[email protected]"
extras:
- dask
- name: postgres
title: PostgreSQL
deps:
Expand Down
101 changes: 44 additions & 57 deletions ibis/backends/dask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,23 @@
import dask
import dask.dataframe as dd
import pandas as pd
from dask.base import DaskMethodsMixin

import ibis.common.exceptions as com

# import the pandas execution module to register dispatched implementations of
# execute_node that the dask backend will later override
import ibis.backends.pandas.execution
import ibis.config
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir
from ibis import util
from ibis.backends.dask.core import execute_and_reset
from ibis.backends.pandas import BasePandasBackend
from ibis.backends.pandas.core import _apply_schema
from ibis.formats.pandas import DaskData
from ibis.formats.pandas import PandasData

if TYPE_CHECKING:
import pathlib
from collections.abc import Mapping, MutableMapping


raise RuntimeError("Temporarily make the dask backend dysfunctional")

# Make sure that the pandas backend options have been loaded
ibis.pandas # noqa: B018


class Backend(BasePandasBackend):
name = "dask"
backend_table_type = dd.DataFrame
Expand All @@ -57,9 +48,6 @@ def do_connect(
... }
>>> ibis.dask.connect(data)
"""
# register dispatchers
from ibis.backends.dask import udf # noqa: F401

if dictionary is None:
dictionary = {}

Expand All @@ -75,49 +63,53 @@ def do_connect(
def version(self):
return dask.__version__

def execute(
self,
query: ir.Expr,
params: Mapping[ir.Expr, object] | None = None,
limit: str = "default",
**kwargs,
):
def _validate_args(self, expr, limit, timecontext):
if timecontext is not None:
raise com.UnsupportedArgumentError(
"The Dask backend does not support timecontext"
)
if limit != "default" and limit is not None:
raise ValueError(
raise com.UnsupportedArgumentError(
"limit parameter to execute is not yet implemented in the "
"dask backend"
)

if not isinstance(query, ir.Expr):
if not isinstance(expr, ir.Expr):
raise TypeError(
"`query` has type {!r}, expected ibis.expr.types.Expr".format(
type(query).__name__
"`expr` has type {!r}, expected ibis.expr.types.Expr".format(
type(expr).__name__
)
)

compiled = self.compile(query, params, **kwargs)
if isinstance(compiled, DaskMethodsMixin):
result = compiled.compute()
else:
result = compiled
return _apply_schema(query.op(), result)

def compile(
self, query: ir.Expr, params: Mapping[ir.Expr, object] | None = None, **kwargs
self,
expr: ir.Expr,
params: dict | None = None,
limit: int | None = None,
timecontext=None,
):
"""Compile `expr`.
from ibis.backends.dask.executor import DaskExecutor

Returns
-------
dask.dataframe.core.DataFrame | dask.dataframe.core.Series | dask.dataframe.core.Scalar
Dask graph.
"""
params = {
k.op() if isinstance(k, ir.Expr) else k: v
for k, v in ({} if params is None else params).items()
}
self._validate_args(expr, limit, timecontext)
params = params or {}
params = {k.op() if isinstance(k, ir.Expr) else k: v for k, v in params.items()}

return DaskExecutor.compile(expr.op(), backend=self, params=params)

def execute(
self,
expr: ir.Expr,
params: Mapping[ir.Expr, object] | None = None,
limit: str = "default",
timecontext=None,
**kwargs,
):
from ibis.backends.dask.executor import DaskExecutor

self._validate_args(expr, limit, timecontext)
params = params or {}
params = {k.op() if isinstance(k, ir.Expr) else k: v for k, v in params.items()}

return execute_and_reset(query.op(), params=params, **kwargs)
return DaskExecutor.execute(expr.op(), backend=self, params=params)

def read_csv(
self, source: str | pathlib.Path, table_name: str | None = None, **kwargs: Any
Expand Down Expand Up @@ -178,20 +170,15 @@ def read_parquet(
def table(self, name: str, schema: sch.Schema | None = None):
df = self.dictionary[name]
schema = schema or self.schemas.get(name, None)
schema = DaskData.infer_table(df, schema=schema)
schema = PandasData.infer_table(df.head(1), schema=schema)
return ops.DatabaseTable(name, schema, self).to_expr()

@classmethod
def _supports_conversion(cls, obj: Any) -> bool:
return isinstance(obj, cls.backend_table_type)

@staticmethod
def _from_pandas(df: pd.DataFrame, npartitions: int = 1) -> dd.DataFrame:
return dd.from_pandas(df, npartitions=npartitions)
def _convert_object(self, obj) -> dd.DataFrame:
if isinstance(obj, dd.DataFrame):
return obj

@classmethod
def _convert_object(cls, obj: dd.DataFrame) -> dd.DataFrame:
return obj
pandas_df = super()._convert_object(obj)
return dd.from_pandas(pandas_df, npartitions=1)

def _load_into_cache(self, name, expr):
self.create_table(name, self.compile(expr).persist())
Loading

0 comments on commit 17f0f8e

Please sign in to comment.