Skip to content

Commit

Permalink
feat: add DataFrame.pipe() method (#421)
Browse files Browse the repository at this point in the history
  • Loading branch information
TrevorBergeron authored Mar 14, 2024
1 parent 456fb32 commit 95f5a6e
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 1 deletion.
25 changes: 25 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,31 @@ def test_apply_series_scalar_callable(
pandas.testing.assert_series_equal(bf_result, pd_result)


def test_df_pipe(
scalars_df_index,
scalars_pandas_df_index,
):
columns = ["int64_too", "int64_col"]

def foo(x: int, y: int, df):
return (df + x) % y

bf_result = (
scalars_df_index[columns]
.pipe((foo, "df"), x=7, y=9)
.pipe(lambda x: x**2)
.to_pandas()
)

pd_result = (
scalars_pandas_df_index[columns]
.pipe((foo, "df"), x=7, y=9)
.pipe(lambda x: x**2)
)

pandas.testing.assert_frame_equal(bf_result, pd_result)


def test_df_keys(
scalars_df_index,
scalars_pandas_df_index,
Expand Down
25 changes: 25 additions & 0 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -3203,3 +3203,28 @@ def test_apply_not_supported(scalars_dfs, col, lambda_, exception):
bf_col = scalars_df[col]
with pytest.raises(exception):
bf_col.apply(lambda_, by_row=False)


def test_series_pipe(
scalars_df_index,
scalars_pandas_df_index,
):
column = "int64_too"

def foo(x: int, y: int, df):
return (df + x) % y

bf_result = (
scalars_df_index[column]
.pipe((foo, "df"), x=7, y=9)
.pipe(lambda x: x**2)
.to_pandas()
)

pd_result = (
scalars_pandas_df_index[column]
.pipe((foo, "df"), x=7, y=9)
.pipe(lambda x: x**2)
)

assert_series_equal(bf_result, pd_result)
42 changes: 42 additions & 0 deletions third_party/bigframes_vendored/pandas/core/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/common.py
from __future__ import annotations

from typing import Callable, TYPE_CHECKING

if TYPE_CHECKING:
from bigframes_vendored.pandas.pandas._typing import T


def pipe(
obj, func: Callable[..., T] | tuple[Callable[..., T], str], *args, **kwargs
) -> T:
"""
Apply a function ``func`` to object ``obj`` either by passing obj as the
first argument to the function or, in the case that the func is a tuple,
interpret the first element of the tuple as a function and pass the obj to
that function as a keyword argument whose key is the value of the second
element of the tuple.
Args:
func (callable or tuple of (callable, str)):
Function to apply to this object or, alternatively, a
``(callable, data_keyword)`` tuple where ``data_keyword`` is a
string indicating the keyword of ``callable`` that expects the
object.
args (iterable, optional):
Positional arguments passed into ``func``.
kwargs (dict, optional):
A dictionary of keyword arguments passed into ``func``.
Returns:
object: the return type of ``func``.
"""
if isinstance(func, tuple):
func, target = func
if target in kwargs:
msg = f"{target} is both the pipe target and a keyword argument"
raise ValueError(msg)
kwargs[target] = obj
return func(*args, **kwargs)
else:
return func(obj, *args, **kwargs)
105 changes: 104 additions & 1 deletion third_party/bigframes_vendored/pandas/core/generic.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/generic.py
from __future__ import annotations

from typing import Iterator, Literal, Optional
from typing import Callable, Iterator, Literal, Optional, TYPE_CHECKING

from bigframes_vendored.pandas.core import indexing
import bigframes_vendored.pandas.core.common as common

from bigframes import constants

if TYPE_CHECKING:
from bigframes_vendored.pandas.pandas._typing import T


class NDFrame(indexing.IndexingMixin):
"""
Expand Down Expand Up @@ -963,6 +967,105 @@ def expanding(self, min_periods=1):
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def pipe(
self,
func: Callable[..., T] | tuple[Callable[..., T], str],
*args,
**kwargs,
) -> T:
"""
Apply chainable functions that expect Series or DataFrames.
**Examples:**
Constructing a income DataFrame from a dictionary.
>>> import bigframes.pandas as bpd
>>> import numpy as np
>>> bpd.options.display.progress_bar = None
>>> data = [[8000, 1000], [9500, np.nan], [5000, 2000]]
>>> df = bpd.DataFrame(data, columns=['Salary', 'Others'])
>>> df
Salary Others
0 8000 1000.0
1 9500 <NA>
2 5000 2000.0
<BLANKLINE>
[3 rows x 2 columns]
Functions that perform tax reductions on an income DataFrame.
>>> def subtract_federal_tax(df):
... return df * 0.9
>>> def subtract_state_tax(df, rate):
... return df * (1 - rate)
>>> def subtract_national_insurance(df, rate, rate_increase):
... new_rate = rate + rate_increase
... return df * (1 - new_rate)
Instead of writing
>>> subtract_national_insurance(
... subtract_state_tax(subtract_federal_tax(df), rate=0.12),
... rate=0.05,
... rate_increase=0.02) # doctest: +SKIP
You can write
>>> (
... df.pipe(subtract_federal_tax)
... .pipe(subtract_state_tax, rate=0.12)
... .pipe(subtract_national_insurance, rate=0.05, rate_increase=0.02)
... )
Salary Others
0 5892.48 736.56
1 6997.32 <NA>
2 3682.8 1473.12
<BLANKLINE>
[3 rows x 2 columns]
If you have a function that takes the data as (say) the second
argument, pass a tuple indicating which keyword expects the
data. For example, suppose ``national_insurance`` takes its data as ``df``
in the second argument:
>>> def subtract_national_insurance(rate, df, rate_increase):
... new_rate = rate + rate_increase
... return df * (1 - new_rate)
>>> (
... df.pipe(subtract_federal_tax)
... .pipe(subtract_state_tax, rate=0.12)
... .pipe(
... (subtract_national_insurance, 'df'),
... rate=0.05,
... rate_increase=0.02
... )
... )
Salary Others
0 5892.48 736.56
1 6997.32 <NA>
2 3682.8 1473.12
<BLANKLINE>
[3 rows x 2 columns]
Args:
func (function):
Function to apply to this object.
``args``, and ``kwargs`` are passed into ``func``.
Alternatively a ``(callable, data_keyword)`` tuple where
``data_keyword`` is a string indicating the keyword of
``callable`` that expects this object.
args (iterable, optional):
Positional arguments passed into ``func``.
kwargs (mapping, optional):
A dictionary of keyword arguments passed into ``func``.
Returns:
same type as caller
"""
return common.pipe(self, func, *args, **kwargs)

def __nonzero__(self):
raise ValueError(
f"The truth value of a {type(self).__name__} is ambiguous. "
Expand Down

0 comments on commit 95f5a6e

Please sign in to comment.