Skip to content

Commit

Permalink
fix: Using one single function call for utcnow(). (#4307)
Browse files Browse the repository at this point in the history
Signed-off-by: Shuchu Han <[email protected]>
  • Loading branch information
shuchu authored Jul 2, 2024
1 parent 398ea3b commit 98ff63c
Show file tree
Hide file tree
Showing 41 changed files with 176 additions and 157 deletions.
9 changes: 5 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
from feast.repo_contents import RepoContents
from feast.saved_dataset import SavedDataset, SavedDatasetStorage, ValidationReference
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _utc_now
from feast.version import get_version

warnings.simplefilter("once", DeprecationWarning)
Expand Down Expand Up @@ -1246,7 +1247,7 @@ def materialize_incremental(
>>> from feast import FeatureStore, RepoConfig
>>> from datetime import datetime, timedelta
>>> fs = FeatureStore(repo_path="project/feature_repo")
>>> fs.materialize_incremental(end_date=datetime.utcnow() - timedelta(minutes=5))
>>> fs.materialize_incremental(end_date=_utc_now() - timedelta(minutes=5))
Materializing...
<BLANKLINE>
...
Expand All @@ -1270,15 +1271,15 @@ def materialize_incremental(
f" either a ttl to be set or for materialize() to have been run at least once."
)
elif feature_view.ttl.total_seconds() > 0:
start_date = datetime.utcnow() - feature_view.ttl
start_date = _utc_now() - feature_view.ttl
else:
# TODO(felixwang9817): Find the earliest timestamp for this specific feature
# view from the offline store, and set the start date to that timestamp.
print(
f"Since the ttl is 0 for feature view {Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}, "
"the start date will be set to 1 year before the current time."
)
start_date = datetime.utcnow() - timedelta(weeks=52)
start_date = _utc_now() - timedelta(weeks=52)
provider = self._get_provider()
print(
f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}"
Expand Down Expand Up @@ -1335,7 +1336,7 @@ def materialize(
>>> from datetime import datetime, timedelta
>>> fs = FeatureStore(repo_path="project/feature_repo")
>>> fs.materialize(
... start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10)
... start_date=_utc_now() - timedelta(hours=3), end_date=_utc_now() - timedelta(minutes=10)
... )
Materializing...
<BLANKLINE>
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.utils import get_user_agent
from feast.utils import _utc_now, get_user_agent

from .bigquery_source import (
BigQueryLoggingDestination,
Expand Down Expand Up @@ -701,7 +701,7 @@ def _upload_entity_df(

# Ensure that the table expires after some time
table = client.get_table(table=table_name)
table.expires = datetime.utcnow() + timedelta(minutes=30)
table.expires = _utc_now() + timedelta(minutes=30)
client.update_table(table, ["expires"])

return table
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import datetime
import signal
from dataclasses import dataclass
from enum import Enum
Expand All @@ -16,6 +15,7 @@
from feast.infra.offline_stores.contrib.trino_offline_store.trino_type_map import (
trino_to_pa_value_type,
)
from feast.utils import _utc_now


class QueryStatus(Enum):
Expand Down Expand Up @@ -97,12 +97,12 @@ def __init__(self, query_text: str, cursor: Cursor):
def execute(self) -> Results:
try:
self.status = QueryStatus.RUNNING
start_time = datetime.datetime.utcnow()
start_time = _utc_now()

self._cursor.execute(operation=self.query_text)
rows = self._cursor.fetchall()

end_time = datetime.datetime.utcnow()
end_time = _utc_now()
self.execution_time = end_time - start_time
self.status = QueryStatus.COMPLETED

Expand Down
6 changes: 3 additions & 3 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.utils import get_user_agent
from feast.utils import _utc_now, get_user_agent

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -122,7 +122,7 @@ def update(
entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)
entity.update({"created_ts": datetime.utcnow()})
entity.update({"created_ts": _utc_now()})
client.put(entity)

for table in tables_to_delete:
Expand Down Expand Up @@ -457,7 +457,7 @@ def update(self):
entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)
entity.update({"created_ts": datetime.utcnow()})
entity.update({"created_ts": _utc_now()})
client.put(entity)

def teardown(self):
Expand Down
9 changes: 5 additions & 4 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from abc import abstractmethod
from datetime import datetime, timedelta
from datetime import timedelta
from threading import Lock
from typing import List, Optional

Expand All @@ -15,6 +15,7 @@
from feast.project_metadata import ProjectMetadata
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _utc_now

logger = logging.getLogger(__name__)

Expand All @@ -27,7 +28,7 @@ def __init__(
):
self.cached_registry_proto = self.proto()
proto_registry_utils.init_project_metadata(self.cached_registry_proto, project)
self.cached_registry_proto_created = datetime.utcnow()
self.cached_registry_proto_created = _utc_now()
self._refresh_lock = Lock()
self.cached_registry_proto_ttl = timedelta(
seconds=cache_ttl_seconds if cache_ttl_seconds is not None else 0
Expand Down Expand Up @@ -318,7 +319,7 @@ def refresh(self, project: Optional[str] = None):
self.cached_registry_proto, project
)
self.cached_registry_proto = self.proto()
self.cached_registry_proto_created = datetime.utcnow()
self.cached_registry_proto_created = _utc_now()

def _refresh_cached_registry_if_necessary(self):
with self._refresh_lock:
Expand All @@ -329,7 +330,7 @@ def _refresh_cached_registry_if_necessary(self):
self.cached_registry_proto_ttl.total_seconds()
> 0 # 0 ttl means infinity
and (
datetime.utcnow()
_utc_now()
> (
self.cached_registry_proto_created
+ self.cached_registry_proto_ttl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

import os
import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from urllib.parse import urlparse

from feast.infra.registry.registry import RegistryConfig
from feast.infra.registry.registry_store import RegistryStore
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.utils import _utc_now

REGISTRY_SCHEMA_VERSION = "1"

Expand Down Expand Up @@ -89,7 +89,7 @@ def teardown(self):

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
registry_proto.last_updated.FromDatetime(_utc_now())

file_obj = TemporaryFile()
file_obj.write(registry_proto.SerializeToString())
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/registry/file.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import uuid
from datetime import datetime
from pathlib import Path

from feast.infra.registry.registry_store import RegistryStore
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.repo_config import RegistryConfig
from feast.utils import _utc_now


class FileRegistryStore(RegistryStore):
Expand Down Expand Up @@ -37,7 +37,7 @@ def teardown(self):

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
registry_proto.last_updated.FromDatetime(_utc_now())
file_dir = self._filepath.parent
file_dir.mkdir(exist_ok=True)
with open(self._filepath, mode="wb", buffering=0) as f:
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/registry/gcs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from urllib.parse import urlparse

from feast.infra.registry.registry_store import RegistryStore
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.repo_config import RegistryConfig
from feast.utils import _utc_now


class GCSRegistryStore(RegistryStore):
Expand Down Expand Up @@ -62,7 +62,7 @@ def teardown(self):

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
registry_proto.last_updated.FromDatetime(_utc_now())
# we have already checked the bucket exists so no need to do it again
gs_bucket = self.gcs_client.get_bucket(self._bucket)
blob = gs_bucket.blob(self._blob)
Expand Down
21 changes: 11 additions & 10 deletions sdk/python/feast/infra/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from feast.repo_contents import RepoContents
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _utc_now

REGISTRY_SCHEMA_VERSION = "1"

Expand Down Expand Up @@ -217,7 +218,7 @@ def clone(self) -> "Registry":
if self.cached_registry_proto
else RegistryProto()
)
new_registry.cached_registry_proto_created = datetime.utcnow()
new_registry.cached_registry_proto_created = _utc_now()
new_registry._registry_store = NoopRegistryStore()
return new_registry

Expand Down Expand Up @@ -248,7 +249,7 @@ def get_infra(self, project: str, allow_cache: bool = False) -> Infra:
def apply_entity(self, entity: Entity, project: str, commit: bool = True):
entity.is_valid()

now = datetime.utcnow()
now = _utc_now()
if not entity.created_timestamp:
entity.created_timestamp = now
entity.last_updated_timestamp = now
Expand Down Expand Up @@ -334,7 +335,7 @@ def delete_data_source(self, name: str, project: str, commit: bool = True):
def apply_feature_service(
self, feature_service: FeatureService, project: str, commit: bool = True
):
now = datetime.utcnow()
now = _utc_now()
if not feature_service.created_timestamp:
feature_service.created_timestamp = now
feature_service.last_updated_timestamp = now
Expand Down Expand Up @@ -390,7 +391,7 @@ def apply_feature_view(
):
feature_view.ensure_valid()

now = datetime.utcnow()
now = _utc_now()
if not feature_view.created_timestamp:
feature_view.created_timestamp = now
feature_view.last_updated_timestamp = now
Expand Down Expand Up @@ -517,7 +518,7 @@ def apply_materialization(
existing_feature_view.materialization_intervals.append(
(start_date, end_date)
)
existing_feature_view.last_updated_timestamp = datetime.utcnow()
existing_feature_view.last_updated_timestamp = _utc_now()
feature_view_proto = existing_feature_view.to_proto()
feature_view_proto.spec.project = project
del self.cached_registry_proto.feature_views[idx]
Expand All @@ -539,7 +540,7 @@ def apply_materialization(
existing_stream_feature_view.materialization_intervals.append(
(start_date, end_date)
)
existing_stream_feature_view.last_updated_timestamp = datetime.utcnow()
existing_stream_feature_view.last_updated_timestamp = _utc_now()
stream_feature_view_proto = existing_stream_feature_view.to_proto()
stream_feature_view_proto.spec.project = project
del self.cached_registry_proto.stream_feature_views[idx]
Expand Down Expand Up @@ -664,7 +665,7 @@ def apply_saved_dataset(
project: str,
commit: bool = True,
):
now = datetime.utcnow()
now = _utc_now()
if not saved_dataset.created_timestamp:
saved_dataset.created_timestamp = now
saved_dataset.last_updated_timestamp = now
Expand Down Expand Up @@ -812,7 +813,7 @@ def _prepare_registry_for_changes(self, project: str):
registry_proto = RegistryProto()
registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION
self.cached_registry_proto = registry_proto
self.cached_registry_proto_created = datetime.utcnow()
self.cached_registry_proto_created = _utc_now()

# Initialize project metadata if needed
assert self.cached_registry_proto
Expand Down Expand Up @@ -848,7 +849,7 @@ def _get_registry_proto(
self.cached_registry_proto_ttl.total_seconds()
> 0 # 0 ttl means infinity
and (
datetime.utcnow()
_utc_now()
> (
self.cached_registry_proto_created
+ self.cached_registry_proto_ttl
Expand All @@ -871,7 +872,7 @@ def _get_registry_proto(
logger.info("Registry cache expired, so refreshing")
registry_proto = self._registry_store.get_registry_proto()
self.cached_registry_proto = registry_proto
self.cached_registry_proto_created = datetime.utcnow()
self.cached_registry_proto_created = _utc_now()

if not project:
return registry_proto
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/infra/registry/s3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from urllib.parse import urlparse
Expand All @@ -9,6 +8,7 @@
from feast.infra.registry.registry_store import RegistryStore
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.repo_config import RegistryConfig
from feast.utils import _utc_now

try:
import boto3
Expand Down Expand Up @@ -70,7 +70,7 @@ def teardown(self):

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
registry_proto.last_updated.FromDatetime(_utc_now())
# we have already checked the bucket exists so no need to do it again
file_obj = TemporaryFile()
file_obj.write(registry_proto.SerializeToString())
Expand Down
Loading

0 comments on commit 98ff63c

Please sign in to comment.