Skip to content

Commit

Permalink
perf(drop): speed up performance of drop (#9440)
Browse files Browse the repository at this point in the history
This PR speeds up drop construction and compilation in cases where the
number
of dropped columns is small relative to the total number of parent-table
columns.

There are two ways this is done:

- `drop` construction is sped up by reducing the number of iterations
  over the full set of columns when constructing an output schema.

  This is where the bulk of the improvement is.
- Compilation of the `drop` operation is also a bit faster for smaller
sets of
  dropped columns on some backends due to use of `* EXCLUDE` syntax.

Since the optimization is done in the `schema` property, adding a new
`DropColumns` relation IR seemed like the lightest weight approach given
that that also enables compilers to use `EXCLUDE` syntax, which will
produce a
far smaller query than using project-without-the-dropped-columns
approach.

Partially addresses the `drop` performance seen in #9111.

To address this for all backends, they either need to all support
`SELECT * EXCLUDE(col1, ..., colN)` syntax or we need to implement
column
pruning.

Follow-ups could include applying a similar approach to `rename` (using
`REPLACE`
syntax for compilation).

It might be possible to reduce the overhead of `relocate` as well, but
I haven't explored that.
  • Loading branch information
cpcloud authored Jun 26, 2024
1 parent fce5bd3 commit 1c6eb5c
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 30 deletions.
8 changes: 8 additions & 0 deletions ibis/backends/bigquery/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,11 @@ def _pudf(self, name, *args):
self.dialect
)
return self.f[name](*args)

def visit_DropColumns(self, op, *, parent, columns_to_drop):
quoted = self.quoted
excludes = [sg.column(column, quoted=quoted) for column in columns_to_drop]
star = sge.Star(**{"except": excludes})
table = sg.to_identifier(parent.alias_or_name, quoted=quoted)
column = sge.Column(this=star, table=table)
return sg.select(column).from_(parent)
8 changes: 8 additions & 0 deletions ibis/backends/clickhouse/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,3 +640,11 @@ def visit_RandomUUID(self, op, **kwargs):
@staticmethod
def _generate_groups(groups):
return groups

def visit_DropColumns(self, op, *, parent, columns_to_drop):
quoted = self.quoted
excludes = [sg.column(column, quoted=quoted) for column in columns_to_drop]
star = sge.Star(**{"except": excludes})
table = sg.to_identifier(parent.alias_or_name, quoted=quoted)
column = sge.Column(this=star, table=table)
return sg.select(column).from_(parent)
26 changes: 26 additions & 0 deletions ibis/backends/duckdb/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,3 +521,29 @@ def visit_RandomUUID(self, op, **kwargs):

def visit_TypeOf(self, op, *, arg):
return self.f.coalesce(self.f.nullif(self.f.typeof(arg), '"NULL"'), "NULL")

def visit_DropColumns(self, op, *, parent, columns_to_drop):
quoted = self.quoted
# duckdb doesn't support specifying the table name of the column name
# to drop, e.g., in SELECT t.* EXCLUDE (t.a) FROM t, the t.a bit
#
# technically it's not necessary, here's why
#
# if the table is specified then it's unambiguous when there are overlapping
# column names, say, from a join, for example
# (assuming t and s both have a column named `a`)
#
# SELECT t.* EXCLUDE (a), s.* FROM t JOIN s ON id
#
# This would exclude t.a and include s.a
#
# if it's a naked star projection from a join, like
#
# SELECT * EXCLUDE (a) FROM t JOIN s ON id
#
# then that means "exclude all columns named `a`"
excludes = [sg.column(column, quoted=quoted) for column in columns_to_drop]
star = sge.Star(**{"except": excludes})
table = sg.to_identifier(parent.alias_or_name, quoted=quoted)
column = sge.Column(this=star, table=table)
return sg.select(column).from_(parent)
4 changes: 4 additions & 0 deletions ibis/backends/pandas/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,10 @@ def visit(cls, op: ops.Sort, parent, keys):
)
return df.drop(columns=names)

@classmethod
def visit(cls, op: ops.DropColumns, parent, columns_to_drop):
return parent.drop(columns=list(columns_to_drop))

@classmethod
def visit(cls, op: PandasAggregate, parent, groups, metrics):
if groups:
Expand Down
6 changes: 6 additions & 0 deletions ibis/backends/polars/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1361,3 +1361,9 @@ def execute_timestamp_range(op, **kw):
start = translate(op.start, **kw)
stop = translate(op.stop, **kw)
return pl.datetime_ranges(start, stop, f"{step}{unit}", closed="left")


@translate.register(ops.DropColumns)
def execute_drop_columns(op, **kw):
parent = translate(op.parent, **kw)
return parent.drop(op.columns_to_drop)
8 changes: 8 additions & 0 deletions ibis/backends/snowflake/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,3 +654,11 @@ def visit_JoinLink(self, op, *, how, table, predicates):
this=table, kind=how, on=on, match_condition=match_condition
)
return super().visit_JoinLink(op, how=how, table=table, predicates=predicates)

def visit_DropColumns(self, op, *, parent, columns_to_drop):
quoted = self.quoted
excludes = [sg.column(column, quoted=quoted) for column in columns_to_drop]
star = sge.Star(**{"except": excludes})
table = sg.to_identifier(parent.alias_or_name, quoted=quoted)
column = sge.Column(this=star, table=table)
return sg.select(column).from_(parent)
13 changes: 13 additions & 0 deletions ibis/backends/sql/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,19 @@ def visit_Unsupported(self, op, **_):
f"{type(op).__name__!r} operation is not supported in the {self.dialect} backend"
)

def visit_DropColumns(self, op, *, parent, columns_to_drop):
# the generated query will be huge for wide tables
#
# TODO: figure out a way to produce an IR that only contains exactly
# what is used
parent_alias = parent.alias_or_name
quoted = self.quoted
columns_to_keep = (
sg.column(column, table=parent_alias, quoted=quoted)
for column in op.schema.names
)
return sg.select(*columns_to_keep).from_(parent)


# `__init_subclass__` is uncalled for subclasses - we manually call it here to
# autogenerate the base class implementations as well.
Expand Down
12 changes: 12 additions & 0 deletions ibis/backends/sql/rewrites.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ def sort_to_select(_, **kwargs):
return Select(_.parent, selections=_.values, sort_keys=_.keys)


@replace(p.DropColumns)
def drop_columns_to_select(_, **kwargs):
"""Convert a DropColumns node to a Select node."""
# if we're dropping fewer than 50% of the parent table's columns then the
# compiled query will likely be smaller than if we list everything *NOT*
# being dropped
if len(_.columns_to_drop) < len(_.schema) // 2:
return _
return Select(_.parent, selections=_.values)


@replace(p.FillNull)
def fill_null_to_select(_, **kwargs):
"""Rewrite FillNull to a Select node."""
Expand Down Expand Up @@ -292,6 +303,7 @@ def sqlize(
| sort_to_select
| fill_null_to_select
| drop_null_to_select
| drop_columns_to_select
| first_to_firstvalue,
context=context,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
s = ibis.table(name="s", schema={"b": "string"})
t = ibis.table(name="t", schema={"a": "int64", "b": "string", "c": "timestamp"})
f = t.filter(t.c == lit)
dropcolumns = f.select(f.a, f.b, f.c.name("C")).drop("C")
joinchain = (
f.select(f.a, f.b, lit.name("the_date"))
.inner_join(s, f.select(f.a, f.b, lit.name("the_date")).b == s.b)
.select(f.select(f.a, f.b, lit.name("the_date")).a)
dropcolumns.select(dropcolumns.a, dropcolumns.b, lit.name("the_date"))
.inner_join(
s,
dropcolumns.select(dropcolumns.a, dropcolumns.b, lit.name("the_date")).b == s.b,
)
.select(dropcolumns.select(dropcolumns.a, dropcolumns.b, lit.name("the_date")).a)
)

result = joinchain.filter(joinchain.a < 1.0)
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@ SELECT
"t11"."total_amount" AS "customer_lifetime_value"
FROM (
SELECT
"t10"."customer_id",
"t10"."first_name",
"t10"."last_name",
"t10"."first_order",
"t10"."most_recent_order",
"t10"."number_of_orders"
"t10".*
EXCLUDE ("customer_id_right")
FROM (
SELECT
"t3"."customer_id",
Expand Down
5 changes: 5 additions & 0 deletions ibis/expr/decompile.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ def distinct(op, parent):
return f"{parent}.distinct()"


@translate.register(ops.DropColumns)
def drop(op, parent, columns_to_drop):
return f"{parent}.drop({_inline(map(repr, columns_to_drop))})"


@translate.register(ops.SelfReference)
def self_reference(op, parent, identifier):
return f"{parent}.view()"
Expand Down
20 changes: 20 additions & 0 deletions ibis/expr/operations/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,26 @@ def schema(self):
return self.parent.schema


@public
class DropColumns(Relation):
parent: Relation
columns_to_drop: VarTuple[str]

@attribute
def schema(self):
schema = self.parent.schema.fields.copy()
for column in self.columns_to_drop:
del schema[column]
return Schema(schema)

@attribute
def values(self):
fields = self.parent.fields.copy()
for column in self.columns_to_drop:
del fields[column]
return fields


@public
class Reference(Relation):
_uid_counter = itertools.count()
Expand Down
15 changes: 3 additions & 12 deletions ibis/expr/types/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2479,23 +2479,14 @@ def drop(self, *fields: str | Selector) -> Table:
│ Torgersen │ 193 │ 3450 │ female │ 2007 │
└───────────┴───────────────────┴─────────────┴────────┴───────┘
"""
from ibis import selectors as s

if not fields:
# no-op if nothing to be dropped
return self

fields = tuple(
field.resolve(self) if isinstance(field, Deferred) else field
for field in fields
columns_to_drop = tuple(
map(operator.methodcaller("get_name"), self.bind(*fields))
)

if missing_fields := {f for f in fields if isinstance(f, str)}.difference(
self.schema().names
):
raise KeyError(f"Fields not in table: {sorted(missing_fields)}")

return self.select(~s._to_selector(fields))
return ops.DropColumns(parent=self, columns_to_drop=columns_to_drop).to_expr()

def filter(
self,
Expand Down
30 changes: 30 additions & 0 deletions ibis/tests/benchmarks/test_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import itertools
import math
import os
import random
import string
from operator import attrgetter, itemgetter

import numpy as np
import pandas as pd
import pytest
from packaging.version import parse as vparse
from pytest import param

import ibis
import ibis.expr.datatypes as dt
Expand Down Expand Up @@ -886,3 +888,31 @@ def test_memtable_register(lots_of_tables, benchmark):
t = ibis.memtable({"x": [1, 2, 3]})
result = benchmark(lots_of_tables.execute, t)
assert len(result) == 3


@pytest.fixture(params=[10, 100, 1_000, 10_000], scope="module")
def wide_table(request):
num_cols = request.param
return ibis.table(name="t", schema={f"a{i}": "int" for i in range(num_cols)})


@pytest.fixture(
params=[param(0.01, id="1"), param(0.5, id="50"), param(0.99, id="99")],
scope="module",
)
def cols_to_drop(wide_table, request):
perc_cols_to_drop = request.param
total_cols = len(wide_table.columns)
ncols = math.floor(perc_cols_to_drop * total_cols)
cols_to_drop = random.sample(range(total_cols), ncols)
return [f"a{i}" for i in cols_to_drop]


def test_wide_drop_construct(benchmark, wide_table, cols_to_drop):
benchmark(wide_table.drop, *cols_to_drop)


def test_wide_drop_compile(benchmark, wide_table, cols_to_drop):
benchmark(
lambda expr: ibis.to_sql(expr, dialect="duckdb"), wide_table.drop(*cols_to_drop)
)
16 changes: 7 additions & 9 deletions ibis/tests/expr/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1869,25 +1869,23 @@ def test_drop():
assert t.drop() is t

res = t.drop("a")
assert res.equals(t.select("b", "c", "d"))
assert res.schema() == t.select("b", "c", "d").schema()

res = t.drop("a", "b")
assert res.equals(t.select("c", "d"))
assert res.schema() == t.select("c", "d").schema()

assert res.equals(t.select("c", "d"))

assert res.equals(t.drop(s.matches("a|b")))
assert res.schema() == t.drop(s.matches("a|b")).schema()

res = t.drop(_.a)
assert res.equals(t.select("b", "c", "d"))
assert res.schema() == t.select("b", "c", "d").schema()

res = t.drop(_.a, _.b)
assert res.equals(t.select("c", "d"))
assert res.schema() == t.select("c", "d").schema()

res = t.drop(_.a, "b")
assert res.equals(t.select("c", "d"))
assert res.schema() == t.select("c", "d").schema()

with pytest.raises(KeyError):
with pytest.raises(com.IbisTypeError):
t.drop("e")


Expand Down

0 comments on commit 1c6eb5c

Please sign in to comment.