From 98ff63cd389207998b3452ec46e5a2f0fc70485c Mon Sep 17 00:00:00 2001 From: Shuchu Han Date: Mon, 1 Jul 2024 22:54:28 -0400 Subject: [PATCH] fix: Using one single function call for utcnow(). (#4307) Signed-off-by: Shuchu Han --- sdk/python/feast/feature_store.py | 9 +++--- .../feast/infra/offline_stores/bigquery.py | 4 +-- .../trino_offline_store/trino_queries.py | 6 ++-- .../feast/infra/online_stores/datastore.py | 6 ++-- .../feast/infra/registry/caching_registry.py | 9 +++--- .../contrib/azure/azure_registry_store.py | 4 +-- sdk/python/feast/infra/registry/file.py | 4 +-- sdk/python/feast/infra/registry/gcs.py | 4 +-- sdk/python/feast/infra/registry/registry.py | 21 ++++++------ sdk/python/feast/infra/registry/s3.py | 4 +-- sdk/python/feast/infra/registry/snowflake.py | 22 ++++++------- sdk/python/feast/infra/registry/sql.py | 9 +++--- sdk/python/feast/on_demand_feature_view.py | 6 ++-- sdk/python/feast/utils.py | 4 +++ sdk/python/tests/conftest.py | 11 ++++--- sdk/python/tests/data/data_creator.py | 15 +++++---- sdk/python/tests/doctest/test_all.py | 7 ++-- .../feature_repos/repo_configuration.py | 3 +- .../materialization/test_snowflake.py | 5 +-- .../offline_store/test_offline_write.py | 9 +++--- .../test_push_features_to_offline_store.py | 5 ++- .../test_universal_historical_retrieval.py | 13 ++++---- .../offline_store/test_validation.py | 5 ++- .../test_push_features_to_online_store.py | 7 ++-- .../test_python_feature_server.py | 10 +++--- .../online_store/test_remote_online_store.py | 4 +-- .../online_store/test_universal_online.py | 13 ++++---- .../test_universal_odfv_feature_inference.py | 7 ++-- .../registration/test_universal_registry.py | 7 ++-- .../registration/test_universal_types.py | 3 +- sdk/python/tests/unit/cli/test_cli_chdir.py | 5 +-- .../tests/unit/local_feast_tests/test_init.py | 5 +-- .../online_store/test_online_retrieval.py | 32 +++++++++---------- sdk/python/tests/unit/test_datetime.py | 6 ++++ sdk/python/tests/unit/test_feature_views.py | 10 +++--- .../tests/unit/test_stream_feature_view.py | 10 +++--- .../tests/utils/basic_read_write_test.py | 7 ++-- .../tests/utils/dynamo_table_creator.py | 5 ++- sdk/python/tests/utils/e2e_test_validation.py | 3 +- .../tests/utils/online_write_benchmark.py | 6 ++-- sdk/python/tests/utils/test_log_creator.py | 8 ++--- 41 files changed, 176 insertions(+), 157 deletions(-) create mode 100644 sdk/python/tests/unit/test_datetime.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 6476af5ac8..9600732e17 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -86,6 +86,7 @@ from feast.repo_contents import RepoContents from feast.saved_dataset import SavedDataset, SavedDatasetStorage, ValidationReference from feast.stream_feature_view import StreamFeatureView +from feast.utils import _utc_now from feast.version import get_version warnings.simplefilter("once", DeprecationWarning) @@ -1246,7 +1247,7 @@ def materialize_incremental( >>> from feast import FeatureStore, RepoConfig >>> from datetime import datetime, timedelta >>> fs = FeatureStore(repo_path="project/feature_repo") - >>> fs.materialize_incremental(end_date=datetime.utcnow() - timedelta(minutes=5)) + >>> fs.materialize_incremental(end_date=_utc_now() - timedelta(minutes=5)) Materializing... ... @@ -1270,7 +1271,7 @@ def materialize_incremental( f" either a ttl to be set or for materialize() to have been run at least once." ) elif feature_view.ttl.total_seconds() > 0: - start_date = datetime.utcnow() - feature_view.ttl + start_date = _utc_now() - feature_view.ttl else: # TODO(felixwang9817): Find the earliest timestamp for this specific feature # view from the offline store, and set the start date to that timestamp. @@ -1278,7 +1279,7 @@ def materialize_incremental( f"Since the ttl is 0 for feature view {Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}, " "the start date will be set to 1 year before the current time." ) - start_date = datetime.utcnow() - timedelta(weeks=52) + start_date = _utc_now() - timedelta(weeks=52) provider = self._get_provider() print( f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}" @@ -1335,7 +1336,7 @@ def materialize( >>> from datetime import datetime, timedelta >>> fs = FeatureStore(repo_path="project/feature_repo") >>> fs.materialize( - ... start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10) + ... start_date=_utc_now() - timedelta(hours=3), end_date=_utc_now() - timedelta(minutes=10) ... ) Materializing... diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 36334b606d..3e4a0f1b99 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -45,7 +45,7 @@ from feast.on_demand_feature_view import OnDemandFeatureView from feast.repo_config import FeastConfigBaseModel, RepoConfig from feast.saved_dataset import SavedDatasetStorage -from feast.utils import get_user_agent +from feast.utils import _utc_now, get_user_agent from .bigquery_source import ( BigQueryLoggingDestination, @@ -701,7 +701,7 @@ def _upload_entity_df( # Ensure that the table expires after some time table = client.get_table(table=table_name) - table.expires = datetime.utcnow() + timedelta(minutes=30) + table.expires = _utc_now() + timedelta(minutes=30) client.update_table(table, ["expires"]) return table diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_queries.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_queries.py index 50472407bc..3a26583af2 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_queries.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_queries.py @@ -1,6 +1,5 @@ from __future__ import annotations -import datetime import signal from dataclasses import dataclass from enum import Enum @@ -16,6 +15,7 @@ from feast.infra.offline_stores.contrib.trino_offline_store.trino_type_map import ( trino_to_pa_value_type, ) +from feast.utils import _utc_now class QueryStatus(Enum): @@ -97,12 +97,12 @@ def __init__(self, query_text: str, cursor: Cursor): def execute(self) -> Results: try: self.status = QueryStatus.RUNNING - start_time = datetime.datetime.utcnow() + start_time = _utc_now() self._cursor.execute(operation=self.query_text) rows = self._cursor.fetchall() - end_time = datetime.datetime.utcnow() + end_time = _utc_now() self.execution_time = end_time - start_time self.status = QueryStatus.COMPLETED diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index b33767cea5..9ae10792f5 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -44,7 +44,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig -from feast.utils import get_user_agent +from feast.utils import _utc_now, get_user_agent LOGGER = logging.getLogger(__name__) @@ -122,7 +122,7 @@ def update( entity = datastore.Entity( key=key, exclude_from_indexes=("created_ts", "event_ts", "values") ) - entity.update({"created_ts": datetime.utcnow()}) + entity.update({"created_ts": _utc_now()}) client.put(entity) for table in tables_to_delete: @@ -457,7 +457,7 @@ def update(self): entity = datastore.Entity( key=key, exclude_from_indexes=("created_ts", "event_ts", "values") ) - entity.update({"created_ts": datetime.utcnow()}) + entity.update({"created_ts": _utc_now()}) client.put(entity) def teardown(self): diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 6336dd7fee..f7eab7d70a 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -1,6 +1,6 @@ import logging from abc import abstractmethod -from datetime import datetime, timedelta +from datetime import timedelta from threading import Lock from typing import List, Optional @@ -15,6 +15,7 @@ from feast.project_metadata import ProjectMetadata from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView +from feast.utils import _utc_now logger = logging.getLogger(__name__) @@ -27,7 +28,7 @@ def __init__( ): self.cached_registry_proto = self.proto() proto_registry_utils.init_project_metadata(self.cached_registry_proto, project) - self.cached_registry_proto_created = datetime.utcnow() + self.cached_registry_proto_created = _utc_now() self._refresh_lock = Lock() self.cached_registry_proto_ttl = timedelta( seconds=cache_ttl_seconds if cache_ttl_seconds is not None else 0 @@ -318,7 +319,7 @@ def refresh(self, project: Optional[str] = None): self.cached_registry_proto, project ) self.cached_registry_proto = self.proto() - self.cached_registry_proto_created = datetime.utcnow() + self.cached_registry_proto_created = _utc_now() def _refresh_cached_registry_if_necessary(self): with self._refresh_lock: @@ -329,7 +330,7 @@ def _refresh_cached_registry_if_necessary(self): self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity and ( - datetime.utcnow() + _utc_now() > ( self.cached_registry_proto_created + self.cached_registry_proto_ttl diff --git a/sdk/python/feast/infra/registry/contrib/azure/azure_registry_store.py b/sdk/python/feast/infra/registry/contrib/azure/azure_registry_store.py index 9c00170b0f..f9317bf7a4 100644 --- a/sdk/python/feast/infra/registry/contrib/azure/azure_registry_store.py +++ b/sdk/python/feast/infra/registry/contrib/azure/azure_registry_store.py @@ -3,7 +3,6 @@ import os import uuid -from datetime import datetime from pathlib import Path from tempfile import TemporaryFile from urllib.parse import urlparse @@ -11,6 +10,7 @@ from feast.infra.registry.registry import RegistryConfig from feast.infra.registry.registry_store import RegistryStore from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.utils import _utc_now REGISTRY_SCHEMA_VERSION = "1" @@ -89,7 +89,7 @@ def teardown(self): def _write_registry(self, registry_proto: RegistryProto): registry_proto.version_id = str(uuid.uuid4()) - registry_proto.last_updated.FromDatetime(datetime.utcnow()) + registry_proto.last_updated.FromDatetime(_utc_now()) file_obj = TemporaryFile() file_obj.write(registry_proto.SerializeToString()) diff --git a/sdk/python/feast/infra/registry/file.py b/sdk/python/feast/infra/registry/file.py index 7117a0d2c6..ae783bf82c 100644 --- a/sdk/python/feast/infra/registry/file.py +++ b/sdk/python/feast/infra/registry/file.py @@ -1,10 +1,10 @@ import uuid -from datetime import datetime from pathlib import Path from feast.infra.registry.registry_store import RegistryStore from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.repo_config import RegistryConfig +from feast.utils import _utc_now class FileRegistryStore(RegistryStore): @@ -37,7 +37,7 @@ def teardown(self): def _write_registry(self, registry_proto: RegistryProto): registry_proto.version_id = str(uuid.uuid4()) - registry_proto.last_updated.FromDatetime(datetime.utcnow()) + registry_proto.last_updated.FromDatetime(_utc_now()) file_dir = self._filepath.parent file_dir.mkdir(exist_ok=True) with open(self._filepath, mode="wb", buffering=0) as f: diff --git a/sdk/python/feast/infra/registry/gcs.py b/sdk/python/feast/infra/registry/gcs.py index 7e4b7104cf..72498ad054 100644 --- a/sdk/python/feast/infra/registry/gcs.py +++ b/sdk/python/feast/infra/registry/gcs.py @@ -1,5 +1,4 @@ import uuid -from datetime import datetime from pathlib import Path from tempfile import TemporaryFile from urllib.parse import urlparse @@ -7,6 +6,7 @@ from feast.infra.registry.registry_store import RegistryStore from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.repo_config import RegistryConfig +from feast.utils import _utc_now class GCSRegistryStore(RegistryStore): @@ -62,7 +62,7 @@ def teardown(self): def _write_registry(self, registry_proto: RegistryProto): registry_proto.version_id = str(uuid.uuid4()) - registry_proto.last_updated.FromDatetime(datetime.utcnow()) + registry_proto.last_updated.FromDatetime(_utc_now()) # we have already checked the bucket exists so no need to do it again gs_bucket = self.gcs_client.get_bucket(self._bucket) blob = gs_bucket.blob(self._blob) diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index 4d6bff4cc7..fe44e6253a 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -47,6 +47,7 @@ from feast.repo_contents import RepoContents from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView +from feast.utils import _utc_now REGISTRY_SCHEMA_VERSION = "1" @@ -217,7 +218,7 @@ def clone(self) -> "Registry": if self.cached_registry_proto else RegistryProto() ) - new_registry.cached_registry_proto_created = datetime.utcnow() + new_registry.cached_registry_proto_created = _utc_now() new_registry._registry_store = NoopRegistryStore() return new_registry @@ -248,7 +249,7 @@ def get_infra(self, project: str, allow_cache: bool = False) -> Infra: def apply_entity(self, entity: Entity, project: str, commit: bool = True): entity.is_valid() - now = datetime.utcnow() + now = _utc_now() if not entity.created_timestamp: entity.created_timestamp = now entity.last_updated_timestamp = now @@ -334,7 +335,7 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): def apply_feature_service( self, feature_service: FeatureService, project: str, commit: bool = True ): - now = datetime.utcnow() + now = _utc_now() if not feature_service.created_timestamp: feature_service.created_timestamp = now feature_service.last_updated_timestamp = now @@ -390,7 +391,7 @@ def apply_feature_view( ): feature_view.ensure_valid() - now = datetime.utcnow() + now = _utc_now() if not feature_view.created_timestamp: feature_view.created_timestamp = now feature_view.last_updated_timestamp = now @@ -517,7 +518,7 @@ def apply_materialization( existing_feature_view.materialization_intervals.append( (start_date, end_date) ) - existing_feature_view.last_updated_timestamp = datetime.utcnow() + existing_feature_view.last_updated_timestamp = _utc_now() feature_view_proto = existing_feature_view.to_proto() feature_view_proto.spec.project = project del self.cached_registry_proto.feature_views[idx] @@ -539,7 +540,7 @@ def apply_materialization( existing_stream_feature_view.materialization_intervals.append( (start_date, end_date) ) - existing_stream_feature_view.last_updated_timestamp = datetime.utcnow() + existing_stream_feature_view.last_updated_timestamp = _utc_now() stream_feature_view_proto = existing_stream_feature_view.to_proto() stream_feature_view_proto.spec.project = project del self.cached_registry_proto.stream_feature_views[idx] @@ -664,7 +665,7 @@ def apply_saved_dataset( project: str, commit: bool = True, ): - now = datetime.utcnow() + now = _utc_now() if not saved_dataset.created_timestamp: saved_dataset.created_timestamp = now saved_dataset.last_updated_timestamp = now @@ -812,7 +813,7 @@ def _prepare_registry_for_changes(self, project: str): registry_proto = RegistryProto() registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION self.cached_registry_proto = registry_proto - self.cached_registry_proto_created = datetime.utcnow() + self.cached_registry_proto_created = _utc_now() # Initialize project metadata if needed assert self.cached_registry_proto @@ -848,7 +849,7 @@ def _get_registry_proto( self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity and ( - datetime.utcnow() + _utc_now() > ( self.cached_registry_proto_created + self.cached_registry_proto_ttl @@ -871,7 +872,7 @@ def _get_registry_proto( logger.info("Registry cache expired, so refreshing") registry_proto = self._registry_store.get_registry_proto() self.cached_registry_proto = registry_proto - self.cached_registry_proto_created = datetime.utcnow() + self.cached_registry_proto_created = _utc_now() if not project: return registry_proto diff --git a/sdk/python/feast/infra/registry/s3.py b/sdk/python/feast/infra/registry/s3.py index cbae3af11c..8aac4d52ee 100644 --- a/sdk/python/feast/infra/registry/s3.py +++ b/sdk/python/feast/infra/registry/s3.py @@ -1,6 +1,5 @@ import os import uuid -from datetime import datetime from pathlib import Path from tempfile import TemporaryFile from urllib.parse import urlparse @@ -9,6 +8,7 @@ from feast.infra.registry.registry_store import RegistryStore from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.repo_config import RegistryConfig +from feast.utils import _utc_now try: import boto3 @@ -70,7 +70,7 @@ def teardown(self): def _write_registry(self, registry_proto: RegistryProto): registry_proto.version_id = str(uuid.uuid4()) - registry_proto.last_updated.FromDatetime(datetime.utcnow()) + registry_proto.last_updated.FromDatetime(_utc_now()) # we have already checked the bucket exists so no need to do it again file_obj = TemporaryFile() file_obj.write(registry_proto.SerializeToString()) diff --git a/sdk/python/feast/infra/registry/snowflake.py b/sdk/python/feast/infra/registry/snowflake.py index d7ab67e7d0..f2bc09e7e4 100644 --- a/sdk/python/feast/infra/registry/snowflake.py +++ b/sdk/python/feast/infra/registry/snowflake.py @@ -10,7 +10,6 @@ from pydantic import ConfigDict, Field, StrictStr import feast -from feast import utils from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity @@ -54,6 +53,7 @@ from feast.repo_config import RegistryConfig from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView +from feast.utils import _utc_now, has_all_tags logger = logging.getLogger(__name__) @@ -126,16 +126,15 @@ def __init__( with GetSnowflakeConnection(self.registry_config) as conn: sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql" with open(sql_function_file, "r") as file: - sqlFile = file.read() - - sqlCommands = sqlFile.split(";") - for command in sqlCommands: + sql_file = file.read() + sql_cmds = sql_file.split(";") + for command in sql_cmds: query = command.replace("REGISTRY_PATH", f"{self.registry_path}") execute_snowflake_statement(conn, query) self.cached_registry_proto = self.proto() proto_registry_utils.init_project_metadata(self.cached_registry_proto, project) - self.cached_registry_proto_created = datetime.utcnow() + self.cached_registry_proto_created = _utc_now() self._refresh_lock = Lock() self.cached_registry_proto_ttl = timedelta( seconds=registry_config.cache_ttl_seconds @@ -154,7 +153,7 @@ def refresh(self, project: Optional[str] = None): self.cached_registry_proto, project ) self.cached_registry_proto = self.proto() - self.cached_registry_proto_created = datetime.utcnow() + self.cached_registry_proto_created = _utc_now() def _refresh_cached_registry_if_necessary(self): with self._refresh_lock: @@ -165,7 +164,7 @@ def _refresh_cached_registry_if_necessary(self): self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity and ( - datetime.utcnow() + _utc_now() > ( self.cached_registry_proto_created + self.cached_registry_proto_ttl @@ -182,7 +181,6 @@ def teardown(self): sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_deletion.sql" with open(sql_function_file, "r") as file: sqlFile = file.read() - sqlCommands = sqlFile.split(";") for command in sqlCommands: query = command.replace("REGISTRY_PATH", f"{self.registry_path}") @@ -281,7 +279,7 @@ def _apply_object( name = name or (obj.name if hasattr(obj, "name") else None) assert name, f"name needs to be provided for {obj}" - update_datetime = datetime.utcnow() + update_datetime = _utc_now() if hasattr(obj, "last_updated_timestamp"): obj.last_updated_timestamp = update_datetime @@ -416,7 +414,7 @@ def _delete_object( if cursor.rowcount < 1 and not_found_exception: # type: ignore raise not_found_exception(name, project) - self._set_last_updated_metadata(datetime.utcnow(), project) + self._set_last_updated_metadata(_utc_now(), project) return cursor.rowcount @@ -787,7 +785,7 @@ def _list_objects( obj = python_class.from_proto( proto_class.FromString(row[1][proto_field_name]) ) - if utils.has_all_tags(obj.tags, tags): + if has_all_tags(obj.tags, tags): objects.append(obj) return objects return [] diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 239898677c..6ef08989b7 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -60,6 +60,7 @@ from feast.repo_config import RegistryConfig from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView +from feast.utils import _utc_now metadata = MetaData() @@ -591,7 +592,7 @@ def apply_user_metadata( table.c.project_id == project, ) row = conn.execute(stmt).first() - update_datetime = datetime.utcnow() + update_datetime = _utc_now() update_time = int(update_datetime.timestamp()) if row: values = { @@ -703,7 +704,7 @@ def _apply_object( assert name, f"name needs to be provided for {obj}" with self.engine.begin() as conn: - update_datetime = datetime.utcnow() + update_datetime = _utc_now() update_time = int(update_datetime.timestamp()) stmt = select(table).where( getattr(table.c, id_field_name) == name, table.c.project_id == project @@ -770,7 +771,7 @@ def _apply_object( def _maybe_init_project_metadata(self, project): # Initialize project metadata if needed with self.engine.begin() as conn: - update_datetime = datetime.utcnow() + update_datetime = _utc_now() update_time = int(update_datetime.timestamp()) stmt = select(feast_metadata).where( feast_metadata.c.metadata_key == FeastMetadataKeys.PROJECT_UUID.value, @@ -803,7 +804,7 @@ def _delete_object( rows = conn.execute(stmt) if rows.rowcount < 1 and not_found_exception: raise not_found_exception(name, project) - self._set_last_updated_metadata(datetime.utcnow(), project) + self._set_last_updated_metadata(_utc_now(), project) return rows.rowcount diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 839ce4d64c..586f5d1bac 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -2,7 +2,6 @@ import functools import inspect import warnings -from datetime import datetime from types import FunctionType from typing import Any, Optional, Union @@ -34,6 +33,7 @@ from feast.transformation.pandas_transformation import PandasTransformation from feast.transformation.python_transformation import PythonTransformation from feast.transformation.substrait_transformation import SubstraitTransformation +from feast.utils import _utc_now from feast.value_type import ValueType warnings.simplefilter("once", DeprecationWarning) @@ -549,7 +549,7 @@ def _construct_random_input(self) -> dict[str, list[Any]]: ValueType.DOUBLE: [1.0], ValueType.FLOAT: [1.0], ValueType.BOOL: [True], - ValueType.UNIX_TIMESTAMP: [datetime.utcnow()], + ValueType.UNIX_TIMESTAMP: [_utc_now()], ValueType.BYTES_LIST: [[str.encode("hello world")]], ValueType.STRING_LIST: [["hello world"]], ValueType.INT32_LIST: [[1]], @@ -557,7 +557,7 @@ def _construct_random_input(self) -> dict[str, list[Any]]: ValueType.DOUBLE_LIST: [[1.0]], ValueType.FLOAT_LIST: [[1.0]], ValueType.BOOL_LIST: [[True]], - ValueType.UNIX_TIMESTAMP_LIST: [[datetime.utcnow()]], + ValueType.UNIX_TIMESTAMP_LIST: [[_utc_now()]], } feature_dict = {} diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index a6c893c954..1a1d757fc1 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -1052,3 +1052,7 @@ def tags_str_to_dict(tags: str = "") -> dict[str, str]: cast(tuple[str, str], tag.split(":", 1)) for tag in tags_list if ":" in tag ).items() } + + +def _utc_now() -> datetime: + return datetime.utcnow() diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index fb6b7e5608..1fd510d104 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -15,7 +15,7 @@ import multiprocessing import os import random -from datetime import datetime, timedelta +from datetime import timedelta from multiprocessing import Process from sys import platform from typing import Any, Dict, List, Tuple, no_type_check @@ -27,6 +27,7 @@ from feast.data_source import DataSource from feast.feature_store import FeatureStore # noqa: E402 +from feast.utils import _utc_now from feast.wait import wait_retry_backoff # noqa: E402 from tests.data.data_creator import ( # noqa: E402 create_basic_driver_dataset, @@ -133,7 +134,7 @@ def pytest_collection_modifyitems(config, items: List[Item]): @pytest.fixture def simple_dataset_1() -> pd.DataFrame: - now = datetime.utcnow() + now = _utc_now() ts = pd.Timestamp(now).round("ms") data = { "id_join_key": [1, 2, 1, 3, 3], @@ -153,7 +154,7 @@ def simple_dataset_1() -> pd.DataFrame: @pytest.fixture def simple_dataset_2() -> pd.DataFrame: - now = datetime.utcnow() + now = _utc_now() ts = pd.Timestamp(now).round("ms") data = { "id_join_key": ["a", "b", "c", "d", "e"], @@ -391,8 +392,8 @@ def fake_ingest_data(): "conv_rate": [0.5], "acc_rate": [0.6], "avg_daily_trips": [4], - "event_timestamp": [pd.Timestamp(datetime.utcnow()).round("ms")], - "created": [pd.Timestamp(datetime.utcnow()).round("ms")], + "event_timestamp": [pd.Timestamp(_utc_now()).round("ms")], + "created": [pd.Timestamp(_utc_now()).round("ms")], } return pd.DataFrame(data) diff --git a/sdk/python/tests/data/data_creator.py b/sdk/python/tests/data/data_creator.py index 1be96f753a..15d09c5a40 100644 --- a/sdk/python/tests/data/data_creator.py +++ b/sdk/python/tests/data/data_creator.py @@ -5,6 +5,7 @@ from pytz import timezone, utc from feast.types import FeastType, Float32, Int32, Int64, String +from feast.utils import _utc_now def create_basic_driver_dataset( @@ -13,7 +14,7 @@ def create_basic_driver_dataset( feature_is_list: bool = False, list_has_empty_list: bool = False, ) -> pd.DataFrame: - now = datetime.utcnow().replace(microsecond=0, second=0, minute=0) + now = _utc_now().replace(microsecond=0, second=0, minute=0) ts = pd.Timestamp(now).round("ms") data = { "driver_id": get_entities_for_feast_type(entity_type), @@ -86,14 +87,14 @@ def create_document_dataset() -> pd.DataFrame: "embedding_float": [[4.0, 5.0], [1.0, 2.0], [3.0, 4.0]], "embedding_double": [[4.0, 5.0], [1.0, 2.0], [3.0, 4.0]], "ts": [ - pd.Timestamp(datetime.utcnow()).round("ms"), - pd.Timestamp(datetime.utcnow()).round("ms"), - pd.Timestamp(datetime.utcnow()).round("ms"), + pd.Timestamp(_utc_now()).round("ms"), + pd.Timestamp(_utc_now()).round("ms"), + pd.Timestamp(_utc_now()).round("ms"), ], "created_ts": [ - pd.Timestamp(datetime.utcnow()).round("ms"), - pd.Timestamp(datetime.utcnow()).round("ms"), - pd.Timestamp(datetime.utcnow()).round("ms"), + pd.Timestamp(_utc_now()).round("ms"), + pd.Timestamp(_utc_now()).round("ms"), + pd.Timestamp(_utc_now()).round("ms"), ], } return pd.DataFrame(data) diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/doctest/test_all.py index 814a7ca798..52348e7da4 100644 --- a/sdk/python/tests/doctest/test_all.py +++ b/sdk/python/tests/doctest/test_all.py @@ -6,13 +6,14 @@ import unittest import feast +from feast.utils import _utc_now FILES_TO_IGNORE = {"app"} def setup_feature_store(): """Prepares the local environment for a FeatureStore docstring test.""" - from datetime import datetime, timedelta + from datetime import timedelta from feast import Entity, FeatureStore, FeatureView, Field, FileSource from feast.repo_operations import init_repo @@ -42,8 +43,8 @@ def setup_feature_store(): ) fs.apply([driver_hourly_stats_view, driver]) fs.materialize( - start_date=datetime.utcnow() - timedelta(hours=3), - end_date=datetime.utcnow() - timedelta(minutes=10), + start_date=_utc_now() - timedelta(hours=3), + end_date=_utc_now() - timedelta(minutes=10), ) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 9e3c02b9c0..48f5070f1e 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -21,6 +21,7 @@ ) from feast.infra.feature_servers.local_process.config import LocalFeatureServerConfig from feast.repo_config import RegistryConfig, RepoConfig +from feast.utils import _utc_now from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, RegistryLocation, @@ -412,7 +413,7 @@ class Environment: fixture_request: Optional[pytest.FixtureRequest] = None def __post_init__(self): - self.end_date = datetime.utcnow().replace(microsecond=0, second=0, minute=0) + self.end_date = _utc_now().replace(microsecond=0, second=0, minute=0) self.start_date: datetime = self.end_date - timedelta(days=3) def setup(self): diff --git a/sdk/python/tests/integration/materialization/test_snowflake.py b/sdk/python/tests/integration/materialization/test_snowflake.py index adb2bd7e7d..f12191363b 100644 --- a/sdk/python/tests/integration/materialization/test_snowflake.py +++ b/sdk/python/tests/integration/materialization/test_snowflake.py @@ -8,6 +8,7 @@ from feast.entity import Entity from feast.feature_view import FeatureView from feast.types import Array, Bool, Bytes, Float64, Int32, Int64, String, UnixTimestamp +from feast.utils import _utc_now from tests.data.data_creator import create_basic_driver_dataset from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, @@ -146,7 +147,7 @@ def test_snowflake_materialization_consistency_internal_with_lists( split_dt = df["ts_1"][4].to_pydatetime() - timedelta(seconds=1) print(f"Split datetime: {split_dt}") - now = datetime.utcnow() + now = _utc_now() full_feature_names = True start_date = (now - timedelta(hours=5)).replace(tzinfo=utc) @@ -231,7 +232,7 @@ def test_snowflake_materialization_entityless_fv(): print(f"Split datetime: {split_dt}") - now = datetime.utcnow() + now = _utc_now() start_date = (now - timedelta(hours=5)).replace(tzinfo=utc) end_date = split_dt diff --git a/sdk/python/tests/integration/offline_store/test_offline_write.py b/sdk/python/tests/integration/offline_store/test_offline_write.py index b8c465946d..63bdc4755a 100644 --- a/sdk/python/tests/integration/offline_store/test_offline_write.py +++ b/sdk/python/tests/integration/offline_store/test_offline_write.py @@ -1,5 +1,5 @@ import random -from datetime import datetime, timedelta +from datetime import timedelta import numpy as np import pandas as pd @@ -7,6 +7,7 @@ from feast import FeatureView, Field from feast.types import Float32, Int32 +from feast.utils import _utc_now from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, ) @@ -23,7 +24,7 @@ def test_reorder_columns(environment, universal_data_sources): driver_fv = feature_views.driver store.apply([driver(), driver_fv]) - now = datetime.utcnow() + now = _utc_now() ts = pd.Timestamp(now).round("ms") # This dataframe has columns in the wrong order. @@ -53,7 +54,7 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources): driver_fv = feature_views.driver store.apply([driver(), driver_fv]) - now = datetime.utcnow() + now = _utc_now() ts = pd.Timestamp(now).round("ms") expected_df = pd.DataFrame.from_dict( @@ -91,7 +92,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour ), # This is to make sure all offline store data is out of date since get_historical_features() only searches backwards for a ttl window. ) - now = datetime.utcnow() + now = _utc_now() ts = pd.Timestamp(now, unit="ns") entity_df = pd.DataFrame.from_dict( diff --git a/sdk/python/tests/integration/offline_store/test_push_features_to_offline_store.py b/sdk/python/tests/integration/offline_store/test_push_features_to_offline_store.py index 0b1db9011a..5e3d72e671 100644 --- a/sdk/python/tests/integration/offline_store/test_push_features_to_offline_store.py +++ b/sdk/python/tests/integration/offline_store/test_push_features_to_offline_store.py @@ -1,10 +1,9 @@ -import datetime - import numpy as np import pandas as pd import pytest from feast.data_source import PushMode +from feast.utils import _utc_now from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, ) @@ -20,7 +19,7 @@ def test_push_features_and_read(environment, universal_data_sources): location_fv = feature_views.pushed_locations store.apply([location(), location_fv]) - now = pd.Timestamp(datetime.datetime.utcnow()).round("ms") + now = pd.Timestamp(_utc_now()).round("ms") entity_df = pd.DataFrame.from_dict({"location_id": [1], "event_timestamp": [now]}) before_df = store.get_historical_features( diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index bfb8a56200..ecaa5f40db 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -15,6 +15,7 @@ DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, ) from feast.types import Float32, Int32 +from feast.utils import _utc_now from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, table_name_from_data_source, @@ -144,11 +145,11 @@ def test_historical_features_main( files = job_from_df.to_remote_storage() assert len(files) # 0 # This test should be way more detailed - start_time = datetime.utcnow() + start_time = _utc_now() actual_df_from_df_entities = job_from_df.to_df() print(f"actual_df_from_df_entities shape: {actual_df_from_df_entities.shape}") - end_time = datetime.utcnow() + end_time = _utc_now() print(str(f"Time to execute job_from_df.to_df() = '{(end_time - start_time)}'\n")) assert sorted(expected_df.columns) == sorted(actual_df_from_df_entities.columns) @@ -303,9 +304,9 @@ def test_historical_features_with_entities_from_query( full_feature_names=full_feature_names, ) - start_time = datetime.utcnow() + start_time = _utc_now() actual_df_from_sql_entities = job_from_sql.to_df() - end_time = datetime.utcnow() + end_time = _utc_now() print(str(f"\nTime to execute job_from_sql.to_df() = '{(end_time - start_time)}'")) event_timestamp = ( @@ -618,11 +619,11 @@ def test_historical_features_containing_backfills(environment): full_feature_names=False, ) - start_time = datetime.utcnow() + start_time = _utc_now() actual_df = offline_job.to_df() print(f"actual_df shape: {actual_df.shape}") - end_time = datetime.utcnow() + end_time = _utc_now() print(str(f"Time to execute job_from_df.to_df() = '{(end_time - start_time)}'\n")) assert sorted(expected_df.columns) == sorted(actual_df.columns) diff --git a/sdk/python/tests/integration/offline_store/test_validation.py b/sdk/python/tests/integration/offline_store/test_validation.py index 1731f823c8..6f0496e8c8 100644 --- a/sdk/python/tests/integration/offline_store/test_validation.py +++ b/sdk/python/tests/integration/offline_store/test_validation.py @@ -16,7 +16,7 @@ LoggingConfig, ) from feast.protos.feast.serving.ServingService_pb2 import FieldStatus -from feast.utils import make_tzaware +from feast.utils import _utc_now, make_tzaware from feast.wait import wait_retry_backoff from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, @@ -316,8 +316,7 @@ def test_e2e_validation_via_cli(environment, universal_data_sources): "avg_passenger_count": [0], "lifetime_trip_count": [0], "event_timestamp": [ - make_tzaware(datetime.datetime.utcnow()) - - datetime.timedelta(hours=1) + make_tzaware(_utc_now()) - datetime.timedelta(hours=1) ], } ) diff --git a/sdk/python/tests/integration/online_store/test_push_features_to_online_store.py b/sdk/python/tests/integration/online_store/test_push_features_to_online_store.py index 42561563f9..98fe3ab1ec 100644 --- a/sdk/python/tests/integration/online_store/test_push_features_to_online_store.py +++ b/sdk/python/tests/integration/online_store/test_push_features_to_online_store.py @@ -1,8 +1,7 @@ -import datetime - import pandas as pd import pytest +from feast.utils import _utc_now from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, ) @@ -21,8 +20,8 @@ def test_push_features_and_read(environment, universal_data_sources): data = { "location_id": [1], "temperature": [4], - "event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], - "created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], + "event_timestamp": [pd.Timestamp(_utc_now()).round("ms")], + "created": [pd.Timestamp(_utc_now()).round("ms")], } df_ingest = pd.DataFrame(data) diff --git a/sdk/python/tests/integration/online_store/test_python_feature_server.py b/sdk/python/tests/integration/online_store/test_python_feature_server.py index 089efd7a56..1010e73178 100644 --- a/sdk/python/tests/integration/online_store/test_python_feature_server.py +++ b/sdk/python/tests/integration/online_store/test_python_feature_server.py @@ -1,5 +1,4 @@ import json -from datetime import datetime from typing import List import pytest @@ -7,6 +6,7 @@ from feast.feast_object import FeastObject from feast.feature_server import get_app +from feast.utils import _utc_now from tests.integration.feature_repos.repo_configuration import ( construct_universal_feature_views, ) @@ -67,8 +67,8 @@ def test_push(python_fs_client): "df": { "location_id": [1], "temperature": [initial_temp * 100], - "event_timestamp": [str(datetime.utcnow())], - "created": [str(datetime.utcnow())], + "event_timestamp": [str(_utc_now())], + "created": [str(_utc_now())], }, } ) @@ -98,8 +98,8 @@ def test_push_source_does_not_exist(python_fs_client): "df": { "location_id": [1], "temperature": [initial_temp * 100], - "event_timestamp": [str(datetime.utcnow())], - "created": [str(datetime.utcnow())], + "event_timestamp": [str(_utc_now())], + "created": [str(_utc_now())], }, } ), diff --git a/sdk/python/tests/integration/online_store/test_remote_online_store.py b/sdk/python/tests/integration/online_store/test_remote_online_store.py index 759a9c7a87..1d5dd0fca0 100644 --- a/sdk/python/tests/integration/online_store/test_remote_online_store.py +++ b/sdk/python/tests/integration/online_store/test_remote_online_store.py @@ -1,12 +1,12 @@ import os import subprocess import tempfile -from datetime import datetime from textwrap import dedent import pytest from feast.feature_store import FeatureStore +from feast.utils import _utc_now from feast.wait import wait_retry_backoff from tests.utils.cli_repo_creator import CliRunner from tests.utils.http_server import check_port_open, free_port @@ -150,7 +150,7 @@ def _default_store(temp_dir, project_name) -> FeatureStore: fs = FeatureStore(repo_path=repo_path) fs.materialize_incremental( - end_date=datetime.utcnow(), feature_views=["driver_hourly_stats"] + end_date=_utc_now(), feature_views=["driver_hourly_stats"] ) return fs diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index c6b034e2aa..38656b90a9 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -22,6 +22,7 @@ from feast.infra.utils.postgres.postgres_config import ConnectionType from feast.online_response import TIMESTAMP_POSTFIX from feast.types import Float32, Int32, String +from feast.utils import _utc_now from feast.wait import wait_retry_backoff from tests.integration.feature_repos.repo_configuration import ( Environment, @@ -136,9 +137,9 @@ def test_write_to_online_store_event_check(environment): fs = environment.feature_store # write same data points 3 with different timestamps - now = pd.Timestamp(datetime.datetime.utcnow()).round("ms") - hour_ago = pd.Timestamp(datetime.datetime.utcnow() - timedelta(hours=1)).round("ms") - latest = pd.Timestamp(datetime.datetime.utcnow() + timedelta(seconds=1)).round("ms") + now = pd.Timestamp(_utc_now()).round("ms") + hour_ago = pd.Timestamp(_utc_now() - timedelta(hours=1)).round("ms") + latest = pd.Timestamp(_utc_now() + timedelta(seconds=1)).round("ms") data = { "id": [123, 567, 890], @@ -221,7 +222,7 @@ def test_write_to_online_store_event_check(environment): # writes to online store via datasource (dataframe_source) materialization fs.materialize( start_date=datetime.datetime.now() - timedelta(hours=12), - end_date=datetime.datetime.utcnow(), + end_date=_utc_now(), ) df = fs.get_online_features( @@ -250,8 +251,8 @@ def test_write_to_online_store(environment, universal_data_sources): "conv_rate": [0.85], "acc_rate": [0.91], "avg_daily_trips": [14], - "event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], - "created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")], + "event_timestamp": [pd.Timestamp(_utc_now()).round("ms")], + "created": [pd.Timestamp(_utc_now()).round("ms")], } df_data = pd.DataFrame(data) diff --git a/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py b/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py index ce960b9c35..151f629289 100644 --- a/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py +++ b/sdk/python/tests/integration/registration/test_universal_odfv_feature_inference.py @@ -1,5 +1,3 @@ -from datetime import datetime - import pandas as pd import pytest @@ -7,6 +5,7 @@ from feast.errors import SpecifiedFeaturesNotPresentError from feast.infra.offline_stores.file_source import FileSource from feast.types import Float64 +from feast.utils import _utc_now from tests.integration.feature_repos.universal.entities import customer, driver, item from tests.integration.feature_repos.universal.feature_views import ( conv_rate_plus_100_feature_view, @@ -50,8 +49,8 @@ def test_infer_odfv_list_features(environment, infer_features, tmp_path): "item_id": [0], "embedding_float": [fake_embedding], "embedding_double": [fake_embedding], - "event_timestamp": [pd.Timestamp(datetime.utcnow())], - "created": [pd.Timestamp(datetime.utcnow())], + "event_timestamp": [pd.Timestamp(_utc_now())], + "created": [pd.Timestamp(_utc_now())], } ) output_path = f"{tmp_path}/items.parquet" diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index c119ae800a..c06ccf2d4d 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -14,7 +14,7 @@ import logging import os import time -from datetime import datetime, timedelta +from datetime import timedelta from tempfile import mkstemp from unittest import mock @@ -46,6 +46,7 @@ from feast.repo_config import RegistryConfig from feast.stream_feature_view import Aggregation, StreamFeatureView from feast.types import Array, Bytes, Float32, Int32, Int64, String +from feast.utils import _utc_now from feast.value_type import ValueType from tests.integration.feature_repos.universal.entities import driver @@ -745,7 +746,7 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: ) # Simulate materialization - current_date = datetime.utcnow() + current_date = _utc_now() end_date = current_date.replace(tzinfo=utc) start_date = (current_date - timedelta(days=1)).replace(tzinfo=utc) test_registry.apply_materialization(feature_view, project, start_date, end_date) @@ -814,7 +815,7 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: ) # Simulate materialization a second time - current_date = datetime.utcnow() + current_date = _utc_now() end_date_1 = current_date.replace(tzinfo=utc) start_date_1 = (current_date - timedelta(days=1)).replace(tzinfo=utc) test_registry.apply_materialization( diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index ca15681c9b..928d05ad31 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -20,6 +20,7 @@ String, UnixTimestamp, ) +from feast.utils import _utc_now from tests.data.data_creator import create_basic_driver_dataset from tests.integration.feature_repos.universal.entities import driver from tests.integration.feature_repos.universal.feature_views import driver_feature_view @@ -93,7 +94,7 @@ def test_feature_get_historical_features_types_match( entity_df = pd.DataFrame() entity_df["driver_id"] = [1, 3] - ts = pd.Timestamp(datetime.utcnow()).round("ms") + ts = pd.Timestamp(_utc_now()).round("ms") entity_df["ts"] = [ ts - timedelta(hours=4), ts - timedelta(hours=2), diff --git a/sdk/python/tests/unit/cli/test_cli_chdir.py b/sdk/python/tests/unit/cli/test_cli_chdir.py index 12ca8f6b08..dd592db074 100644 --- a/sdk/python/tests/unit/cli/test_cli_chdir.py +++ b/sdk/python/tests/unit/cli/test_cli_chdir.py @@ -1,7 +1,8 @@ import tempfile -from datetime import datetime, timedelta +from datetime import timedelta from pathlib import Path +from feast.utils import _utc_now from tests.utils.cli_repo_creator import CliRunner @@ -29,7 +30,7 @@ def test_cli_chdir() -> None: ) assert result.returncode == 0 - end_date = datetime.utcnow() + end_date = _utc_now() start_date = end_date - timedelta(days=100) result = runner.run( [ diff --git a/sdk/python/tests/unit/local_feast_tests/test_init.py b/sdk/python/tests/unit/local_feast_tests/test_init.py index c5d3cbe57d..4543a23979 100644 --- a/sdk/python/tests/unit/local_feast_tests/test_init.py +++ b/sdk/python/tests/unit/local_feast_tests/test_init.py @@ -1,8 +1,9 @@ import tempfile -from datetime import datetime, timedelta +from datetime import timedelta from pathlib import Path from textwrap import dedent +from feast.utils import _utc_now from tests.utils.cli_repo_creator import CliRunner @@ -20,7 +21,7 @@ def test_repo_init() -> None: result = runner.run(["apply"], cwd=repo_path) assert result.returncode == 0 - end_date = datetime.utcnow() + end_date = _utc_now() start_date = end_date - timedelta(days=100) result = runner.run( ["materialize", start_date.isoformat(), end_date.isoformat()], cwd=repo_path diff --git a/sdk/python/tests/unit/online_store/test_online_retrieval.py b/sdk/python/tests/unit/online_store/test_online_retrieval.py index 1e8cf45dcc..0b552c0453 100644 --- a/sdk/python/tests/unit/online_store/test_online_retrieval.py +++ b/sdk/python/tests/unit/online_store/test_online_retrieval.py @@ -3,7 +3,6 @@ import sqlite3 import sys import time -from datetime import datetime import numpy as np import pandas as pd @@ -17,6 +16,7 @@ from feast.protos.feast.types.Value_pb2 import FloatList as FloatListProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RegistryConfig +from feast.utils import _utc_now from tests.integration.feature_repos.universal.feature_views import TAGS from tests.utils.cli_repo_creator import CliRunner, get_example_repo @@ -51,8 +51,8 @@ def test_get_online_features() -> None: "lat": ValueProto(double_val=0.1), "lon": ValueProto(string_val="1.0"), }, - datetime.utcnow(), - datetime.utcnow(), + _utc_now(), + _utc_now(), ) ], progress=None, @@ -72,8 +72,8 @@ def test_get_online_features() -> None: "name": ValueProto(string_val="John"), "age": ValueProto(int64_val=3), }, - datetime.utcnow(), - datetime.utcnow(), + _utc_now(), + _utc_now(), ) ], progress=None, @@ -90,8 +90,8 @@ def test_get_online_features() -> None: ( customer_key, {"trips": ValueProto(int64_val=7)}, - datetime.utcnow(), - datetime.utcnow(), + _utc_now(), + _utc_now(), ) ], progress=None, @@ -318,8 +318,8 @@ def test_online_to_df(): "lat": ValueProto(double_val=d * lat_multiply), "lon": ValueProto(string_val=str(d * lon_multiply)), }, - datetime.utcnow(), - datetime.utcnow(), + _utc_now(), + _utc_now(), ) ], progress=None, @@ -348,8 +348,8 @@ def test_online_to_df(): "name": ValueProto(string_val=name + str(c)), "age": ValueProto(int64_val=c * age_multiply), }, - datetime.utcnow(), - datetime.utcnow(), + _utc_now(), + _utc_now(), ) ], progress=None, @@ -372,8 +372,8 @@ def test_online_to_df(): ( combo_keys, {"trips": ValueProto(int64_val=c * d)}, - datetime.utcnow(), - datetime.utcnow(), + _utc_now(), + _utc_now(), ) ], progress=None, @@ -468,8 +468,8 @@ def test_sqlite_get_online_documents() -> None: ) ) }, - datetime.utcnow(), - datetime.utcnow(), + _utc_now(), + _utc_now(), ) ) @@ -488,7 +488,7 @@ def test_sqlite_get_online_documents() -> None: ) for i in range(n) ], - "event_timestamp": [datetime.utcnow() for _ in range(n)], + "event_timestamp": [_utc_now() for _ in range(n)], } ) diff --git a/sdk/python/tests/unit/test_datetime.py b/sdk/python/tests/unit/test_datetime.py new file mode 100644 index 0000000000..aaab507ed0 --- /dev/null +++ b/sdk/python/tests/unit/test_datetime.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- + + +""" +Test the retirement of datetime.utcnow() function. +""" diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index b387f55d8b..981968df0d 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -1,9 +1,8 @@ -from datetime import datetime, timedelta +from datetime import timedelta import pytest from typeguard import TypeCheckError -from feast import utils from feast.batch_feature_view import BatchFeatureView from feast.data_format import AvroFormat from feast.data_source import KafkaSource @@ -13,6 +12,7 @@ from feast.infra.offline_stores.file_source import FileSource from feast.protos.feast.types.Value_pb2 import ValueType from feast.types import Float32 +from feast.utils import _utc_now, make_tzaware def test_create_feature_view_with_conflicting_entities(): @@ -143,9 +143,9 @@ def test_update_materialization_intervals(): ) assert len(updated_feature_view.materialization_intervals) == 0 - current_time = datetime.utcnow() - start_date = utils.make_tzaware(current_time - timedelta(days=1)) - end_date = utils.make_tzaware(current_time) + current_time = _utc_now() + start_date = make_tzaware(current_time - timedelta(days=1)) + end_date = make_tzaware(current_time) updated_feature_view.materialization_intervals.append((start_date, end_date)) # Update the Feature View, i.e. simply update the name diff --git a/sdk/python/tests/unit/test_stream_feature_view.py b/sdk/python/tests/unit/test_stream_feature_view.py index 77431666c3..4f93691028 100644 --- a/sdk/python/tests/unit/test_stream_feature_view.py +++ b/sdk/python/tests/unit/test_stream_feature_view.py @@ -1,9 +1,8 @@ import copy -from datetime import datetime, timedelta +from datetime import timedelta import pytest -from feast import utils from feast.aggregation import Aggregation from feast.batch_feature_view import BatchFeatureView from feast.data_format import AvroFormat @@ -16,6 +15,7 @@ ) from feast.stream_feature_view import StreamFeatureView, stream_feature_view from feast.types import Float32 +from feast.utils import _utc_now, make_tzaware def test_create_batch_feature_view(): @@ -286,9 +286,9 @@ def test_update_materialization_intervals(): udf=simple_udf, tags={}, ) - current_time = datetime.utcnow() - start_date = utils.make_tzaware(current_time - timedelta(days=1)) - end_date = utils.make_tzaware(current_time) + current_time = _utc_now() + start_date = make_tzaware(current_time - timedelta(days=1)) + end_date = make_tzaware(current_time) stored_stream_feature_view.materialization_intervals.append((start_date, end_date)) # Update the stream feature view i.e. here it's simply the name diff --git a/sdk/python/tests/utils/basic_read_write_test.py b/sdk/python/tests/utils/basic_read_write_test.py index 5a93a05a1f..c09a94083f 100644 --- a/sdk/python/tests/utils/basic_read_write_test.py +++ b/sdk/python/tests/utils/basic_read_write_test.py @@ -1,9 +1,10 @@ -from datetime import datetime, timedelta +from datetime import timedelta from typing import Optional from feast.feature_store import FeatureStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.utils import _utc_now def basic_rw_test( @@ -65,13 +66,13 @@ def _driver_rw_test(event_ts, created_ts, write, expect_read): """ 1. Basic test: write value, read it back """ - time_1 = datetime.utcnow() + time_1 = _utc_now() _driver_rw_test( event_ts=time_1, created_ts=time_1, write=(1.1, "3.1"), expect_read=(1.1, "3.1") ) """ Values with an new event_ts should overwrite older ones """ - time_3 = datetime.utcnow() + time_3 = _utc_now() _driver_rw_test( event_ts=time_1 + timedelta(hours=1), created_ts=time_3, diff --git a/sdk/python/tests/utils/dynamo_table_creator.py b/sdk/python/tests/utils/dynamo_table_creator.py index 20bac122b3..0ebc939dc1 100644 --- a/sdk/python/tests/utils/dynamo_table_creator.py +++ b/sdk/python/tests/utils/dynamo_table_creator.py @@ -1,11 +1,10 @@ -from datetime import datetime - import boto3 from feast import utils from feast.infra.online_stores.helpers import compute_entity_id from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.utils import _utc_now def create_n_customer_test_samples(n=10): @@ -19,7 +18,7 @@ def create_n_customer_test_samples(n=10): "name": ValueProto(string_val="John"), "age": ValueProto(int64_val=3), }, - datetime.utcnow(), + _utc_now(), None, ) for i in range(n) diff --git a/sdk/python/tests/utils/e2e_test_validation.py b/sdk/python/tests/utils/e2e_test_validation.py index d9104bae42..1a8bedc796 100644 --- a/sdk/python/tests/utils/e2e_test_validation.py +++ b/sdk/python/tests/utils/e2e_test_validation.py @@ -10,6 +10,7 @@ from pytz import utc from feast import FeatureStore, FeatureView, RepoConfig +from feast.utils import _utc_now from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, ) @@ -31,7 +32,7 @@ def validate_offline_online_store_consistency( fs: FeatureStore, fv: FeatureView, split_dt: datetime ) -> None: - now = datetime.utcnow() + now = _utc_now() full_feature_names = True check_offline_store: bool = True diff --git a/sdk/python/tests/utils/online_write_benchmark.py b/sdk/python/tests/utils/online_write_benchmark.py index 8a138f41db..9b1a4eb0b2 100644 --- a/sdk/python/tests/utils/online_write_benchmark.py +++ b/sdk/python/tests/utils/online_write_benchmark.py @@ -2,7 +2,7 @@ import random import string import tempfile -from datetime import datetime, timedelta +from datetime import timedelta import click import pyarrow as pa @@ -16,7 +16,7 @@ from feast.field import Field from feast.repo_config import RepoConfig from feast.types import Float32, Int32 -from feast.utils import _convert_arrow_to_proto +from feast.utils import _convert_arrow_to_proto, _utc_now def create_driver_hourly_stats_feature_view(source): @@ -69,7 +69,7 @@ def benchmark_writes(): provider = store._get_provider() - end_date = datetime.utcnow() + end_date = _utc_now() start_date = end_date - timedelta(days=14) customers = list(range(100)) data = create_driver_hourly_stats_df(customers, start_date, end_date) diff --git a/sdk/python/tests/utils/test_log_creator.py b/sdk/python/tests/utils/test_log_creator.py index ec0d92814c..f072f4c886 100644 --- a/sdk/python/tests/utils/test_log_creator.py +++ b/sdk/python/tests/utils/test_log_creator.py @@ -8,12 +8,12 @@ import numpy as np import pandas as pd import pyarrow -import pytz from feast import FeatureService, FeatureStore, FeatureView from feast.errors import FeatureViewNotFoundException from feast.feature_logging import LOG_DATE_FIELD, LOG_TIMESTAMP_FIELD, REQUEST_ID_FIELD from feast.protos.feast.serving.ServingService_pb2 import FieldStatus +from feast.utils import _utc_now def get_latest_rows( @@ -64,9 +64,7 @@ def generate_expected_logs( logs[f"{col}__status"] = FieldStatus.PRESENT if feature_view.ttl: logs[f"{col}__status"] = logs[f"{col}__status"].mask( - df[timestamp_column] - < datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) - - feature_view.ttl, + df[timestamp_column] < _utc_now() - feature_view.ttl, FieldStatus.OUTSIDE_MAX_AGE, ) @@ -119,7 +117,7 @@ def prepare_logs( f"{destination_field}__status" ].mask( logs_df[f"{destination_field}__timestamp"] - < (datetime.datetime.utcnow() - view.ttl), + < (_utc_now() - view.ttl), FieldStatus.OUTSIDE_MAX_AGE, )