From 5961e31b3bf35eadec3d7e3d2686670d8e45c090 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 24 Oct 2024 20:24:58 +0000 Subject: [PATCH 1/3] perf: Reduce CURRENT_TIMESTAMP queries --- .../session/_io/bigquery/read_gbq_table.py | 18 +----- bigframes/session/loader.py | 5 +- bigframes/session/time.py | 59 +++++++++++++++++++ 3 files changed, 65 insertions(+), 17 deletions(-) create mode 100644 bigframes/session/time.py diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 01ff1a3f15..f1d2b8f517 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -45,8 +45,8 @@ def get_table_metadata( bqclient: bigquery.Client, table_ref: google.cloud.bigquery.table.TableReference, + bq_time: datetime.datetime, *, - api_name: str, cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], use_cache: bool = True, ) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]: @@ -76,23 +76,9 @@ def get_table_metadata( ) return cached_table - # TODO(swast): It's possible that the table metadata is changed between now - # and when we run the CURRENT_TIMESTAMP() query to see when we can time - # travel to. Find a way to fetch the table metadata and BQ's current time - # atomically. table = bqclient.get_table(table_ref) - # TODO(swast): Use session._start_query instead? - # TODO(swast): Use query_and_wait since we know these are small results. - job_config = bigquery.QueryJobConfig() - bigframes.session._io.bigquery.add_labels(job_config, api_name=api_name) - snapshot_timestamp = list( - bqclient.query( - "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", - job_config=job_config, - ).result() - )[0][0] - cached_table = (snapshot_timestamp, table) + cached_table = (bq_time, table) cache[table_ref] = cached_table return cached_table diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 21d454d72f..a05be9e2cd 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -59,6 +59,7 @@ import bigframes.session.metrics import bigframes.session.planner import bigframes.session.temp_storage +import bigframes.session.time as session_time import bigframes.version # Avoid circular imports. @@ -128,6 +129,8 @@ def __init__( self._metrics = metrics # Unfortunate circular reference, but need to pass reference when constructing objects self._session = session + self._clock = session_time.BigquerySyncedClock(bqclient) + self._clock.sync() def read_pandas_load_job( self, pandas_dataframe: pandas.DataFrame, api_name: str @@ -246,7 +249,7 @@ def read_gbq_table( time_travel_timestamp, table = bf_read_gbq_table.get_table_metadata( self._bqclient, table_ref=table_ref, - api_name=api_name, + bq_time=self._clock.get_time(), cache=self._df_snapshot, use_cache=use_cache, ) diff --git a/bigframes/session/time.py b/bigframes/session/time.py new file mode 100644 index 0000000000..a818c9a92a --- /dev/null +++ b/bigframes/session/time.py @@ -0,0 +1,59 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import threading +import time +from typing import cast, Optional + +import google.cloud.bigquery as bigquery + +MIN_RESYNC_SECONDS = 100 + + +class BigquerySyncedClock: + """ + Local clock that attempts to synchronize its time with the bigquery service. + """ + + def __init__(self, bqclient: bigquery.Client): + self._bqclient = bqclient + self._sync_lock = threading.Lock() + self._sync_remote_time: Optional[datetime.datetime] = None + self._sync_monotonic_time: Optional[float] = None + + def get_time(self): + if (self._sync_monotonic_time is None) or (self._sync_remote_time is None): + self.sync() + assert self._sync_remote_time is not None + assert self._sync_monotonic_time is not None + return self._sync_remote_time + datetime.timedelta( + seconds=time.monotonic() - self._sync_monotonic_time + ) + + def sync(self): + with self._sync_lock: + if (self._sync_monotonic_time is not None) and ( + time.monotonic() - self._sync_monotonic_time + ) < MIN_RESYNC_SECONDS: + return + current_bq_time = list( + next( + self._bqclient.query_and_wait( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + ) + ) + )[0] + self._sync_remote_time = cast(datetime.datetime, current_bq_time) + self._sync_monotonic_time = time.monotonic() From 09427c4cb5a8b72717be671c0eb3b7f7922f9518 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 24 Oct 2024 23:43:25 +0000 Subject: [PATCH 2/3] fix unit tests --- tests/unit/resources.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/unit/resources.py b/tests/unit/resources.py index d45da82ab9..04db840b28 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -91,7 +91,16 @@ def query_mock(query, *args, **kwargs): return query_job + existing_query_and_wait = bqclient.query_and_wait + + def query_and_wait_mock(query, *args, **kwargs): + if query.startswith("SELECT CURRENT_TIMESTAMP()"): + return iter([[datetime.datetime.now()]]) + else: + return existing_query_and_wait(query, *args, **kwargs) + bqclient.query = query_mock + bqclient.query_and_wait = query_and_wait_mock clients_provider = mock.create_autospec(bigframes.session.clients.ClientsProvider) type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient) From 32490e7c593925e5ba16975856eb2644f6f07a95 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 30 Oct 2024 17:26:17 +0000 Subject: [PATCH 3/3] add unit tests with freezegun --- bigframes/session/loader.py | 2 +- bigframes/session/time.py | 2 +- noxfile.py | 1 + tests/unit/session/test_time.py | 69 +++++++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 tests/unit/session/test_time.py diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index a05be9e2cd..2a6b59fa4b 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -129,7 +129,7 @@ def __init__( self._metrics = metrics # Unfortunate circular reference, but need to pass reference when constructing objects self._session = session - self._clock = session_time.BigquerySyncedClock(bqclient) + self._clock = session_time.BigQuerySyncedClock(bqclient) self._clock.sync() def read_pandas_load_job( diff --git a/bigframes/session/time.py b/bigframes/session/time.py index a818c9a92a..bef4bbc17f 100644 --- a/bigframes/session/time.py +++ b/bigframes/session/time.py @@ -22,7 +22,7 @@ MIN_RESYNC_SECONDS = 100 -class BigquerySyncedClock: +class BigQuerySyncedClock: """ Local clock that attempts to synchronize its time with the bigquery service. """ diff --git a/noxfile.py b/noxfile.py index f537005e57..24aec29c6c 100644 --- a/noxfile.py +++ b/noxfile.py @@ -51,6 +51,7 @@ UNIT_TEST_STANDARD_DEPENDENCIES = [ "mock", "asyncmock", + "freezegun", PYTEST_VERSION, "pytest-cov", "pytest-asyncio", diff --git a/tests/unit/session/test_time.py b/tests/unit/session/test_time.py new file mode 100644 index 0000000000..87766e79bb --- /dev/null +++ b/tests/unit/session/test_time.py @@ -0,0 +1,69 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import unittest.mock as mock + +import freezegun +import google.cloud.bigquery +import pytest + +import bigframes.session.time + +INITIAL_BQ_TIME = datetime.datetime( + year=2020, + month=4, + day=24, + hour=8, + minute=55, + second=29, + tzinfo=datetime.timezone.utc, +) + + +@pytest.fixture() +def bq_client(): + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + + def query_and_wait_mock(query, *args, **kwargs): + if query.startswith("SELECT CURRENT_TIMESTAMP()"): + return iter([[INITIAL_BQ_TIME]]) + else: + return ValueError(f"mock cannot handle query : {query}") + + bqclient.query_and_wait = query_and_wait_mock + return bqclient + + +def test_bqsyncedclock_get_time(bq_client): + # this initial local time is actually irrelevant, only the ticks matter + initial_local_datetime = datetime.datetime( + year=1, month=7, day=12, hour=15, minute=6, second=3 + ) + + with freezegun.freeze_time(initial_local_datetime) as frozen_datetime: + clock = bigframes.session.time.BigQuerySyncedClock(bq_client) + + t1 = clock.get_time() + assert t1 == INITIAL_BQ_TIME + + frozen_datetime.tick(datetime.timedelta(seconds=3)) + t2 = clock.get_time() + assert t2 == INITIAL_BQ_TIME + datetime.timedelta(seconds=3) + + frozen_datetime.tick(datetime.timedelta(seconds=23529385)) + t3 = clock.get_time() + assert t3 == INITIAL_BQ_TIME + datetime.timedelta( + seconds=3 + ) + datetime.timedelta(seconds=23529385)