-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support bigframes.pandas.to_datetime for scalars, iterables and series. #372
Changes from 18 commits
6eefb40
033e338
e4feb09
35f14f5
22ede7d
af274cb
fe955db
8c1f633
637ca21
23fbf15
c6d254d
0692c79
f436149
87d1749
b180fe3
3f0f7db
dc6cfcd
68ec37e
8b8d61a
5e5842b
d4a71b0
e0d1f8c
d0db699
958ca00
6ef47fb
a08ea2e
6732fd9
097ca77
7c54aaa
1b68883
7057758
22abed0
a4e981b
24347a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,20 @@ | |
# ln(2**(2**10)) == (2**10)*ln(2) ~= 709.78, so EXP(x) for x>709.78 will overflow. | ||
_FLOAT64_EXP_BOUND = typing.cast(ibis_types.NumericValue, ibis_types.literal(709.78)) | ||
|
||
UNIT_TO_US_CONVERSION_FACTORS = { | ||
"D": 24 * 60 * 60 * 1000 * 1000, | ||
"h": 60 * 60 * 1000 * 1000, | ||
"m": 60 * 1000 * 1000, | ||
"s": 1000 * 1000, | ||
"ms": 1000, | ||
"us": 1, | ||
"ns": 1e-3, | ||
} | ||
|
||
TIMEZONE_POS_REGEX = r"[\+]\d{2}:\d{2}$" | ||
TIMEZONE_NEG_REGEX = r"[\-]\d{2}:\d{2}$" | ||
UTC_REGEX = r"[Zz]$" | ||
|
||
|
||
class ScalarOpCompiler: | ||
# Mapping of operation name to implemenations | ||
|
@@ -656,6 +670,84 @@ def isin_op_impl(x: ibis_types.Value, op: ops.IsInOp): | |
return x.isin(matchable_ibis_values) | ||
|
||
|
||
@scalar_op_compiler.register_unary_op(ops.ToDatetimeOp, pass_op=True) | ||
def to_datetime_op_impl(x: ibis_types.Value, op: ops.ToDatetimeOp): | ||
if x.type() == ibis_dtypes.str: | ||
# This is not a exact match of Pandas behavior, but this ensures | ||
# UTC str to be properly handled. | ||
x = ( | ||
ibis.case() | ||
.when( | ||
x.re_search(TIMEZONE_POS_REGEX), | ||
( | ||
( | ||
x.substr(0, x.length() - 6).to_timestamp(op.format) | ||
if op.format | ||
else x.substr(0, x.length() - 6) | ||
) | ||
.cast(ibis_dtypes.Timestamp(timezone="UTC")) | ||
.cast(ibis_dtypes.int64) | ||
- x.substr(x.length() - 5, 2).cast(ibis_dtypes.int64) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise, this is more "magic" with regards to which substrings we're look at. Perhaps some helper functions would be useful too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise for the rest of this function. Please refactor so that it's easier to validate the correctness of each smaller part. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
* UNIT_TO_US_CONVERSION_FACTORS["h"] | ||
- x.substr(x.length() - 2, 2).cast(ibis_dtypes.int64) | ||
* UNIT_TO_US_CONVERSION_FACTORS["m"] | ||
) | ||
.to_timestamp(unit="us") | ||
.cast(ibis_dtypes.Timestamp(timezone="UTC")), | ||
) | ||
.when( | ||
x.re_search(TIMEZONE_NEG_REGEX), | ||
( | ||
( | ||
x.substr(0, x.length() - 6).to_timestamp(op.format) | ||
if op.format | ||
else x.substr(0, x.length() - 6) | ||
) | ||
.cast(ibis_dtypes.Timestamp(timezone="UTC")) | ||
.cast(ibis_dtypes.int64) | ||
+ x.substr(x.length() - 5, 2).cast(ibis_dtypes.int64) | ||
* UNIT_TO_US_CONVERSION_FACTORS["h"] | ||
+ x.substr(x.length() - 2, 2).cast(ibis_dtypes.int64) | ||
* UNIT_TO_US_CONVERSION_FACTORS["m"] | ||
) | ||
.to_timestamp(unit="us") | ||
.cast(ibis_dtypes.Timestamp(timezone="UTC")), | ||
) | ||
.when( | ||
x.re_search(UTC_REGEX), | ||
( | ||
x.substr(0, x.length() - 1).to_timestamp(op.format) | ||
if op.format | ||
else x.substr(0, x.length() - 1) | ||
).cast(ibis_dtypes.Timestamp(timezone="UTC")), | ||
) | ||
.else_( | ||
(x.to_timestamp(op.format) if op.format else x).cast( | ||
ibis_dtypes.Timestamp(timezone="UTC") | ||
) | ||
) | ||
.end() | ||
) | ||
elif x.type() == ibis_dtypes.Timestamp(timezone="UTC"): | ||
return x | ||
elif x.type() != ibis_dtypes.timestamp: | ||
unit = op.unit if op.unit is not None else "ns" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's comment why we are making "ns" the default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
if unit not in UNIT_TO_US_CONVERSION_FACTORS: | ||
raise ValueError(f"Cannot convert input with unit '{unit}'.") | ||
x_converted = x * UNIT_TO_US_CONVERSION_FACTORS[unit] | ||
x_converted = x_converted.cast(ibis_dtypes.int64) | ||
# Note: Due to an issue where casting directly to a non-UTC | ||
# timezone does not work, we first cast to UTC. This seems | ||
# to bypass a potential bug in Ibis's cast function, allowing | ||
# for subsequent casting to a non-UTC timezone. Further | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would it mean to cast to non-UTC timezone type in BigQuery? It only supports UTC at the data-type level, even though other timezones are supported for parsing and formatting. Please raise an error if someone tries to cast to a non-UTC timezone. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry about the confusion, will update the comment, this means without timezone. Basically this is for utc=True vs utc=False. Because of some unknown issue related to data type, potentially because of ibis, it's impossible to cast to the proper type when utc=False, unless cast it to utc timezone first. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on tests it would appear although the result of int64 to_timestamp is in utc timezone, the cast function think the datatype is actually without timezone, and skip the cast, this is to fix the issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment updated. |
||
# investigation is needed to confirm this behavior. | ||
x = x_converted.to_timestamp(unit="us").cast( | ||
ibis_dtypes.Timestamp(timezone="UTC") | ||
) | ||
|
||
return x.cast(ibis_dtypes.Timestamp(timezone="UTC" if op.utc else None)) | ||
|
||
|
||
@scalar_op_compiler.register_unary_op(ops.RemoteFunctionOp, pass_op=True) | ||
def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp): | ||
if not hasattr(op.func, "bigframes_remote_function"): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# Copyright 2024 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from bigframes.core.tools.datetimes import to_datetime | ||
|
||
__all__ = [ | ||
"to_datetime", | ||
] |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,80 @@ | ||||||
# Copyright 2024 Google LLC | ||||||
# | ||||||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
# you may not use this file except in compliance with the License. | ||||||
# You may obtain a copy of the License at | ||||||
# | ||||||
# http://www.apache.org/licenses/LICENSE-2.0 | ||||||
# | ||||||
# Unless required by applicable law or agreed to in writing, software | ||||||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
# See the License for the specific language governing permissions and | ||||||
# limitations under the License. | ||||||
|
||||||
from collections.abc import Mapping | ||||||
from datetime import datetime | ||||||
from typing import Optional, Union | ||||||
|
||||||
import pandas as pd | ||||||
|
||||||
import bigframes.constants as constants | ||||||
import bigframes.core.global_session as global_session | ||||||
import bigframes.dataframe | ||||||
import bigframes.operations as ops | ||||||
import bigframes.series | ||||||
import third_party.bigframes_vendored.pandas.core.tools.datetimes as vendored_pandas_datetimes | ||||||
|
||||||
|
||||||
def to_datetime( | ||||||
arg: Union[ | ||||||
vendored_pandas_datetimes.local_scalars, | ||||||
vendored_pandas_datetimes.local_iterables, | ||||||
bigframes.series.Series, | ||||||
bigframes.dataframe.DataFrame, | ||||||
], | ||||||
*, | ||||||
utc: bool = False, | ||||||
format: Optional[str] = None, | ||||||
unit: Optional[str] = None, | ||||||
) -> Union[pd.Timestamp, datetime, bigframes.series.Series]: | ||||||
if isinstance(arg, (int, float, str, datetime)): | ||||||
return pd.to_datetime( | ||||||
arg, | ||||||
utc=utc, | ||||||
format=format, | ||||||
unit=unit, | ||||||
) | ||||||
|
||||||
if isinstance(arg, (Mapping, pd.DataFrame, bigframes.dataframe.DataFrame)): | ||||||
raise NotImplementedError( | ||||||
"Conversion of Mapping, pandas.DataFrame, or bigframes.dataframe.DataFrame " | ||||||
f"to datetime is not implemented. {constants.FEEDBACK_LINK}" | ||||||
) | ||||||
|
||||||
if ~isinstance(arg, bigframes.series.Series): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't use bitwise negation to negate a boolean.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, let's add a comment that this is intended to support pandas Series (and Index maybe?). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||
# TODO: Currently, data upload is performed using pandas DataFrames | ||||||
# combined with the `read_pandas` method due to the BigFrames DataFrame | ||||||
# constructor's limitations in handling various data types. Plan to update | ||||||
# the upload process to utilize the BigPandas DataFrame constructor directly | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||
# once it is enhanced for more related datatypes. | ||||||
arg = global_session.with_default_session( | ||||||
bigframes.session.Session.read_pandas, pd.DataFrame(arg) | ||||||
) | ||||||
if len(arg.columns) != 1: | ||||||
raise ValueError("Input must be 1-dimensional.") | ||||||
|
||||||
arg = arg[arg.columns[0]] | ||||||
|
||||||
if not utc and arg.dtype not in ("Int64", "Float64"): # type: ignore | ||||||
raise NotImplementedError( | ||||||
f"String and Timestamp requires utc=True. {constants.FEEDBACK_LINK}" | ||||||
) | ||||||
|
||||||
return arg._apply_unary_op( # type: ignore | ||||||
ops.ToDatetimeOp( | ||||||
utc=utc, | ||||||
format=format, | ||||||
unit=unit, | ||||||
) | ||||||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,8 +12,11 @@ | |
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from datetime import datetime | ||
|
||
import pandas as pd | ||
import pytest | ||
import pytz | ||
|
||
import bigframes.pandas as bpd | ||
from tests.system.utils import assert_pandas_df_equal | ||
|
@@ -477,3 +480,61 @@ def test_qcut(scalars_dfs, q): | |
pd_result = pd_result.astype("Int64") | ||
|
||
pd.testing.assert_series_equal(bf_result, pd_result) | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("arg", "utc", "unit", "format"), | ||
[ | ||
(173872738, False, None, None), | ||
(32787983.23, True, "s", None), | ||
("2023-01-01", False, None, "%Y-%m-%d"), | ||
(datetime(2023, 1, 1, 12, 0), False, None, None), | ||
], | ||
) | ||
def test_to_datetime_scalar(arg, utc, unit, format): | ||
bf_result = bpd.to_datetime(arg, utc=utc, unit=unit, format=format) | ||
pd_result = pd.to_datetime(arg, utc=utc, unit=unit, format=format) | ||
|
||
assert bf_result == pd_result | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("arg", "utc", "unit", "format"), | ||
[ | ||
([173872738], False, None, None), | ||
([32787983.23], True, "s", None), | ||
( | ||
[datetime(2023, 1, 1, 12, 0, tzinfo=pytz.timezone("America/New_York"))], | ||
True, | ||
None, | ||
None, | ||
), | ||
(["2023-01-01"], True, None, "%Y-%m-%d"), | ||
(["2023-02-01T15:00:00+07:22"], True, None, None), | ||
(["01-31-2023 14:00", "02-01-2023 15:00"], True, None, "%m-%d-%Y %H:%M"), | ||
], | ||
) | ||
def test_to_datetime_iterable(arg, utc, unit, format): | ||
bf_result = ( | ||
bpd.to_datetime(arg, utc=utc, unit=unit, format=format) | ||
.to_pandas() | ||
.astype("datetime64[ns, UTC]" if utc else "datetime64[ns]") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fascinating. So utc=False will use DATETIME type in BigQuery? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, for utc=False, it will be later cast to DATETIME type. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the example sql: SELECT |
||
) | ||
pd_result = pd.Series( | ||
pd.to_datetime(arg, utc=utc, unit=unit, format=format) | ||
).dt.floor("us") | ||
pd.testing.assert_series_equal( | ||
bf_result, pd_result, check_index_type=False, check_names=False | ||
) | ||
|
||
|
||
def test_to_datetime_series(scalars_dfs): | ||
scalars_df, scalars_pandas_df = scalars_dfs | ||
col = "int64_too" | ||
bf_result = ( | ||
bpd.to_datetime(scalars_df[col], unit="s").to_pandas().astype("datetime64[s]") | ||
) | ||
pd_result = pd.Series(pd.to_datetime(scalars_pandas_df[col], unit="s")) | ||
pd.testing.assert_series_equal( | ||
bf_result, pd_result, check_index_type=False, check_names=False | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
6 is a bit of a "magic" number here. Please make some constants and comments explaining the intention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.