Skip to content

Commit

Permalink
feat: Support bigframes.pandas.to_datetime for scalars, iterables and…
Browse files Browse the repository at this point in the history
… series. (#372)

* feat: Support pd.to_datetime for scalars, iterables and series.

* update test and docstring

* update types

* format update

* remove import.

* update docstring

* update arg conversion

* update examples

* update format

* update code examples, and working logic.

* docstring update.

* type update.

* format update.

* Update docstring format

* remove import

* remove empty line

* Remove extra code

* remove prints.

* Code logic updates.

* Add constants.

* Update comments

* Move datetime helpers to the end of file.

* Update helper

* update format

* String process logic updated.

* update import

* remove print

* update docstring

* update docstring

* update docstring

* update note

* update docstring

* Update code examples
  • Loading branch information
Genesis929 authored Feb 12, 2024
1 parent de1e0a4 commit ffb0d15
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 0 deletions.
43 changes: 43 additions & 0 deletions bigframes/core/compile/scalar_op_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@
# ln(2**(2**10)) == (2**10)*ln(2) ~= 709.78, so EXP(x) for x>709.78 will overflow.
_FLOAT64_EXP_BOUND = typing.cast(ibis_types.NumericValue, ibis_types.literal(709.78))

# Datetime constants
UNIT_TO_US_CONVERSION_FACTORS = {
"D": 24 * 60 * 60 * 1000 * 1000,
"h": 60 * 60 * 1000 * 1000,
"m": 60 * 1000 * 1000,
"s": 1000 * 1000,
"ms": 1000,
"us": 1,
"ns": 1e-3,
}


class ScalarOpCompiler:
# Mapping of operation name to implemenations
Expand Down Expand Up @@ -656,6 +667,33 @@ def isin_op_impl(x: ibis_types.Value, op: ops.IsInOp):
return x.isin(matchable_ibis_values)


@scalar_op_compiler.register_unary_op(ops.ToDatetimeOp, pass_op=True)
def to_datetime_op_impl(x: ibis_types.Value, op: ops.ToDatetimeOp):
if x.type() == ibis_dtypes.str:
x = x.to_timestamp(op.format) if op.format else timestamp(x)
elif x.type() == ibis_dtypes.Timestamp(timezone="UTC"):
return x
elif x.type() != ibis_dtypes.timestamp:
# The default unit is set to "ns" (nanoseconds) for consistency
# with pandas, where "ns" is the default unit for datetime operations.
unit = op.unit or "ns"
if unit not in UNIT_TO_US_CONVERSION_FACTORS:
raise ValueError(f"Cannot convert input with unit '{unit}'.")
x_converted = x * UNIT_TO_US_CONVERSION_FACTORS[unit]
x_converted = x_converted.cast(ibis_dtypes.int64)

# Note: Due to an issue where casting directly to a timestamp
# without a timezone does not work, we first cast to UTC. This
# approach appears to bypass a potential bug in Ibis's cast function,
# allowing for subsequent casting to a timestamp type without timezone
# information. Further investigation is needed to confirm this behavior.
x = x_converted.to_timestamp(unit="us").cast(
ibis_dtypes.Timestamp(timezone="UTC")
)

return x.cast(ibis_dtypes.Timestamp(timezone="UTC" if op.utc else None))


@scalar_op_compiler.register_unary_op(ops.RemoteFunctionOp, pass_op=True)
def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp):
if not hasattr(op.func, "bigframes_remote_function"):
Expand Down Expand Up @@ -1141,3 +1179,8 @@ def is_null(value) -> bool:

def _ibis_num(number: float):
return typing.cast(ibis_types.NumericValue, ibis_types.literal(number))


@ibis.udf.scalar.builtin
def timestamp(a: str) -> ibis_dtypes.timestamp:
"""Convert string to timestamp."""
19 changes: 19 additions & 0 deletions bigframes/core/tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from bigframes.core.tools.datetimes import to_datetime

__all__ = [
"to_datetime",
]
82 changes: 82 additions & 0 deletions bigframes/core/tools/datetimes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Mapping
from datetime import datetime
from typing import Optional, Union

import pandas as pd

import bigframes.constants as constants
import bigframes.core.global_session as global_session
import bigframes.dataframe
import bigframes.operations as ops
import bigframes.series
import third_party.bigframes_vendored.pandas.core.tools.datetimes as vendored_pandas_datetimes


def to_datetime(
arg: Union[
vendored_pandas_datetimes.local_scalars,
vendored_pandas_datetimes.local_iterables,
bigframes.series.Series,
bigframes.dataframe.DataFrame,
],
*,
utc: bool = False,
format: Optional[str] = None,
unit: Optional[str] = None,
) -> Union[pd.Timestamp, datetime, bigframes.series.Series]:
if isinstance(arg, (int, float, str, datetime)):
return pd.to_datetime(
arg,
utc=utc,
format=format,
unit=unit,
)

if isinstance(arg, (Mapping, pd.DataFrame, bigframes.dataframe.DataFrame)):
raise NotImplementedError(
"Conversion of Mapping, pandas.DataFrame, or bigframes.dataframe.DataFrame "
f"to datetime is not implemented. {constants.FEEDBACK_LINK}"
)

if not isinstance(arg, bigframes.series.Series):
# This block ensures compatibility with local data formats, including
# iterables and pandas.Series
# TODO: Currently, data upload is performed using pandas DataFrames
# combined with the `read_pandas` method due to the BigFrames DataFrame
# constructor's limitations in handling various data types. Plan to update
# the upload process to utilize the BigFrames DataFrame constructor directly
# once it is enhanced for more related datatypes.
arg = global_session.with_default_session(
bigframes.session.Session.read_pandas, pd.DataFrame(arg)
)
if len(arg.columns) != 1:
raise ValueError("Input must be 1-dimensional.")

arg = arg[arg.columns[0]]

if not utc and arg.dtype not in ("Int64", "Float64"): # type: ignore
raise NotImplementedError(
f"String and Timestamp requires utc=True. {constants.FEEDBACK_LINK}"
)

return arg._apply_unary_op( # type: ignore
ops.ToDatetimeOp(
utc=utc,
format=format,
unit=unit,
)
)
11 changes: 11 additions & 0 deletions bigframes/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,17 @@ def output_type(self, *input_types):
return input_types[0]


@dataclasses.dataclass(frozen=True)
class ToDatetimeOp(UnaryOp):
name: typing.ClassVar[str] = "to_datetime"
utc: bool = False
format: typing.Optional[str] = None
unit: typing.Optional[str] = None

def output_type(self, *input_types):
return input_types[0]


# Binary Ops
fillna_op = create_binary_op(name="fillna")
cliplower_op = create_binary_op(name="clip_lower")
Expand Down
28 changes: 28 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

from collections import namedtuple
from datetime import datetime
import inspect
import sys
import typing
Expand Down Expand Up @@ -52,6 +53,7 @@
import bigframes.core.global_session as global_session
import bigframes.core.indexes
import bigframes.core.reshape
import bigframes.core.tools
import bigframes.dataframe
import bigframes.operations as ops
import bigframes.series
Expand All @@ -61,6 +63,7 @@
import third_party.bigframes_vendored.pandas.core.reshape.encoding as vendored_pandas_encoding
import third_party.bigframes_vendored.pandas.core.reshape.merge as vendored_pandas_merge
import third_party.bigframes_vendored.pandas.core.reshape.tile as vendored_pandas_tile
import third_party.bigframes_vendored.pandas.core.tools.datetimes as vendored_pandas_datetimes
import third_party.bigframes_vendored.pandas.io.gbq as vendored_pandas_gbq


Expand Down Expand Up @@ -635,6 +638,30 @@ def read_gbq_function(function_name: str):

read_gbq_function.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_function)


def to_datetime(
arg: Union[
vendored_pandas_datetimes.local_scalars,
vendored_pandas_datetimes.local_iterables,
bigframes.series.Series,
bigframes.dataframe.DataFrame,
],
*,
utc: bool = False,
format: Optional[str] = None,
unit: Optional[str] = None,
) -> Union[pandas.Timestamp, datetime, bigframes.series.Series]:
return bigframes.core.tools.to_datetime(
arg,
utc=utc,
format=format,
unit=unit,
)


to_datetime.__doc__ = vendored_pandas_datetimes.to_datetime.__doc__


# pandas dtype attributes
NA = pandas.NA
BooleanDtype = pandas.BooleanDtype
Expand Down Expand Up @@ -680,6 +707,7 @@ def read_gbq_function(function_name: str):
"read_pandas",
"read_pickle",
"remote_function",
"to_datetime",
# pandas dtype attributes
"NA",
"BooleanDtype",
Expand Down
62 changes: 62 additions & 0 deletions tests/system/small/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime

import pandas as pd
import pytest
import pytz

import bigframes.pandas as bpd
from tests.system.utils import assert_pandas_df_equal
Expand Down Expand Up @@ -477,3 +480,62 @@ def test_qcut(scalars_dfs, q):
pd_result = pd_result.astype("Int64")

pd.testing.assert_series_equal(bf_result, pd_result)


@pytest.mark.parametrize(
("arg", "utc", "unit", "format"),
[
(173872738, False, None, None),
(32787983.23, True, "s", None),
("2023-01-01", False, None, "%Y-%m-%d"),
(datetime(2023, 1, 1, 12, 0), False, None, None),
],
)
def test_to_datetime_scalar(arg, utc, unit, format):
bf_result = bpd.to_datetime(arg, utc=utc, unit=unit, format=format)
pd_result = pd.to_datetime(arg, utc=utc, unit=unit, format=format)

assert bf_result == pd_result


@pytest.mark.parametrize(
("arg", "utc", "unit", "format"),
[
([173872738], False, None, None),
([32787983.23], True, "s", None),
(
[datetime(2023, 1, 1, 12, 0, tzinfo=pytz.timezone("America/New_York"))],
True,
None,
None,
),
(["2023-01-01"], True, None, "%Y-%m-%d"),
(["2023-02-01T15:00:00+07:22"], True, None, None),
(["01-31-2023 14:30 -0800"], True, None, "%m-%d-%Y %H:%M %z"),
(["01-31-2023 14:00", "02-01-2023 15:00"], True, None, "%m-%d-%Y %H:%M"),
],
)
def test_to_datetime_iterable(arg, utc, unit, format):
bf_result = (
bpd.to_datetime(arg, utc=utc, unit=unit, format=format)
.to_pandas()
.astype("datetime64[ns, UTC]" if utc else "datetime64[ns]")
)
pd_result = pd.Series(
pd.to_datetime(arg, utc=utc, unit=unit, format=format)
).dt.floor("us")
pd.testing.assert_series_equal(
bf_result, pd_result, check_index_type=False, check_names=False
)


def test_to_datetime_series(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
col = "int64_too"
bf_result = (
bpd.to_datetime(scalars_df[col], unit="s").to_pandas().astype("datetime64[s]")
)
pd_result = pd.Series(pd.to_datetime(scalars_pandas_df[col], unit="s"))
pd.testing.assert_series_equal(
bf_result, pd_result, check_index_type=False, check_names=False
)
Empty file.
77 changes: 77 additions & 0 deletions third_party/bigframes_vendored/pandas/core/tools/datetimes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/tools/datetimes.py

from datetime import datetime
from typing import Iterable, Mapping, Union

import pandas as pd

from bigframes import constants, series

local_scalars = Union[int, float, str, datetime]
local_iterables = Union[Iterable, pd.Series, pd.DataFrame, Mapping]


def to_datetime(
arg,
*,
utc=False,
format=None,
unit=None,
) -> Union[pd.Timestamp, datetime, series.Series]:
"""
This function converts a scalar, array-like or Series to a datetime object.
.. note::
BigQuery only supports precision up to microseconds (us). Therefore, when working
with timestamps that have a finer granularity than microseconds, be aware that
the additional precision will not be represented in BigQuery.
.. note::
The format strings for specifying datetime representations in BigQuery and pandas
are not completely identical. Ensure that the format string provided is compatible
with BigQuery.
**Examples:**
>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None
Converting a Scalar to datetime:
>>> scalar = 123456.789
>>> bpd.to_datetime(scalar, unit = 's')
Timestamp('1970-01-02 10:17:36.789000')
Converting a List of Strings without Timezone Information:
>>> list_str = ["01-31-2021 14:30", "02-28-2021 15:45"]
>>> bpd.to_datetime(list_str, format="%m-%d-%Y %H:%M", utc=True)
0 2021-01-31 14:30:00+00:00
1 2021-02-28 15:45:00+00:00
Name: 0, dtype: timestamp[us, tz=UTC][pyarrow]
Converting a Series of Strings with Timezone Information:
>>> series_str = bpd.Series(["01-31-2021 14:30+08:00", "02-28-2021 15:45+00:00"])
>>> bpd.to_datetime(series_str, format="%m-%d-%Y %H:%M%Z", utc=True)
0 2021-01-31 06:30:00+00:00
1 2021-02-28 15:45:00+00:00
dtype: timestamp[us, tz=UTC][pyarrow]
Args:
arg (int, float, str, datetime, list, tuple, 1-d array, Series):
The object to convert to a datetime.
utc (bool, default False):
Control timezone-related parsing, localization and conversion. If True, the
function always returns a timezone-aware UTC-localized timestamp or series.
If False (default), inputs will not be coerced to UTC.
format (str, default None):
The strftime to parse time, e.g. "%d/%m/%Y".
unit (str, default 'ns'):
The unit of the arg (D,s,ms,us,ns) denote the unit, which is an integer or
float number.
Returns:
Timestamp, datetime.datetime or bigframes.series.Series: Return type depends on input.
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

0 comments on commit ffb0d15

Please sign in to comment.