diff --git a/sdk/python/feast/driver_test_data.py b/sdk/python/feast/driver_test_data.py index defeb404a3..23f1f12477 100644 --- a/sdk/python/feast/driver_test_data.py +++ b/sdk/python/feast/driver_test_data.py @@ -1,10 +1,11 @@ # This module generates dummy data to be used for tests and examples. import itertools +from datetime import timedelta, timezone from enum import Enum import numpy as np import pandas as pd -from pytz import FixedOffset, timezone, utc +from zoneinfo import ZoneInfo from feast.infra.offline_stores.offline_utils import ( DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, @@ -22,11 +23,15 @@ def _convert_event_timestamp(event_timestamp: pd.Timestamp, t: EventTimestampTyp if t == EventTimestampType.TZ_NAIVE: return event_timestamp elif t == EventTimestampType.TZ_AWARE_UTC: - return event_timestamp.replace(tzinfo=utc) + return event_timestamp.replace(tzinfo=timezone.utc) elif t == EventTimestampType.TZ_AWARE_FIXED_OFFSET: - return event_timestamp.replace(tzinfo=utc).astimezone(FixedOffset(60)) + return event_timestamp.replace(tzinfo=timezone.utc).astimezone( + tz=timezone(timedelta(minutes=60)) + ) elif t == EventTimestampType.TZ_AWARE_US_PACIFIC: - return event_timestamp.replace(tzinfo=utc).astimezone(timezone("US/Pacific")) + return event_timestamp.replace(tzinfo=timezone.utc).astimezone( + tz=ZoneInfo("US/Pacific") + ) def create_orders_df( diff --git a/sdk/python/feast/embedded_go/type_map.py b/sdk/python/feast/embedded_go/type_map.py index e70dc3be86..8f467c57ca 100644 --- a/sdk/python/feast/embedded_go/type_map.py +++ b/sdk/python/feast/embedded_go/type_map.py @@ -1,12 +1,12 @@ +from datetime import timezone from typing import List import pyarrow as pa -import pytz from feast.protos.feast.types import Value_pb2 from feast.types import Array, PrimitiveFeastType -PA_TIMESTAMP_TYPE = pa.timestamp("s", tz=pytz.UTC) +PA_TIMESTAMP_TYPE = pa.timestamp("s", tz=timezone.utc) ARROW_TYPE_TO_PROTO_FIELD = { pa.int32(): "int32_val", diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index 2843f87121..9bd5d8a91c 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -1,8 +1,8 @@ import abc +from datetime import timezone from typing import TYPE_CHECKING, Dict, Optional, Type, cast import pyarrow as pa -from pytz import UTC from feast.data_source import DataSource from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE, PA_TIMESTAMP_TYPE @@ -97,7 +97,7 @@ def get_schema(self, registry: "BaseRegistry") -> pa.Schema: ) # system columns - fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC) + fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=timezone.utc) fields[LOG_DATE_FIELD] = pa.date32() fields[REQUEST_ID_FIELD] = pa.string() diff --git a/sdk/python/feast/infra/materialization/snowflake_engine.py b/sdk/python/feast/infra/materialization/snowflake_engine.py index 5d0f08c2f5..9f9f41c83d 100644 --- a/sdk/python/feast/infra/materialization/snowflake_engine.py +++ b/sdk/python/feast/infra/materialization/snowflake_engine.py @@ -1,14 +1,13 @@ import os import shutil from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timezone from typing import Callable, List, Literal, Optional, Sequence, Union import click import pandas as pd from colorama import Fore, Style from pydantic import ConfigDict, Field, StrictStr -from pytz import utc from tqdm import tqdm import feast @@ -276,7 +275,10 @@ def _materialize_one( execute_snowflake_statement(conn, query).fetchall()[0][0] / 1_000_000_000 ) - if last_commit_change_time < start_date.astimezone(tz=utc).timestamp(): + if ( + last_commit_change_time + < start_date.astimezone(tz=timezone.utc).timestamp() + ): return SnowflakeMaterializationJob( job_id=job_id, status=MaterializationJobStatus.SUCCEEDED ) diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index ce731f0198..ea0d6386cb 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -1,6 +1,6 @@ import contextlib import uuid -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import ( Callable, @@ -19,7 +19,6 @@ import pyarrow import pyarrow as pa from pydantic import StrictStr -from pytz import utc from feast import OnDemandFeatureView from feast.data_source import DataSource @@ -100,8 +99,8 @@ def pull_latest_from_table_or_query( athena_client = aws_utils.get_athena_data_client(config.offline_store.region) s3_resource = aws_utils.get_s3_resource(config.offline_store.region) - start_date = start_date.astimezone(tz=utc) - end_date = end_date.astimezone(tz=utc) + start_date = start_date.astimezone(tz=timezone.utc) + end_date = end_date.astimezone(tz=timezone.utc) query = f""" SELECT @@ -151,7 +150,7 @@ def pull_all_from_table_or_query( query = f""" SELECT {field_string} FROM {from_expression} - WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date.astimezone(tz=utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}' AND TIMESTAMP '{end_date.astimezone(tz=utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}' + WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}' AND TIMESTAMP '{end_date.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}' {"AND "+date_partition_column+" >= '"+start_date.strftime('%Y-%m-%d')+"' AND "+date_partition_column+" <= '"+end_date.strftime('%Y-%m-%d')+"' " if date_partition_column != "" and date_partition_column is not None else ''} """ diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index c4740a960e..5239cfb474 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -1,6 +1,6 @@ import contextlib from dataclasses import asdict -from datetime import datetime +from datetime import datetime, timezone from typing import ( Any, Callable, @@ -20,7 +20,6 @@ import pyarrow as pa from jinja2 import BaseLoader, Environment from psycopg import sql -from pytz import utc from feast.data_source import DataSource from feast.errors import InvalidEntityType, ZeroColumnQueryResult, ZeroRowsQueryResult @@ -214,8 +213,8 @@ def pull_all_from_table_or_query( join_key_columns + feature_name_columns + [timestamp_field] ) - start_date = start_date.astimezone(tz=utc) - end_date = end_date.astimezone(tz=utc) + start_date = start_date.astimezone(tz=timezone.utc) + end_date = end_date.astimezone(tz=timezone.utc) query = f""" SELECT {field_string} diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 2d5a00c296..2896d565d3 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -2,7 +2,7 @@ import tempfile import uuid import warnings -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Callable, Dict, List, Optional, Tuple, Union import numpy as np @@ -14,7 +14,6 @@ from pydantic import StrictStr from pyspark import SparkConf from pyspark.sql import SparkSession -from pytz import utc from feast import FeatureView, OnDemandFeatureView from feast.data_source import DataSource @@ -284,8 +283,8 @@ def pull_all_from_table_or_query( fields = ", ".join(join_key_columns + feature_name_columns + [timestamp_field]) from_expression = data_source.get_table_query_string() - start_date = start_date.astimezone(tz=utc) - end_date = end_date.astimezone(tz=utc) + start_date = start_date.astimezone(tz=timezone.utc) + end_date = end_date.astimezone(tz=timezone.utc) query = f""" SELECT {fields} @@ -520,13 +519,10 @@ def _upload_entity_df( entity_df[event_timestamp_col], utc=True ) spark_session.createDataFrame(entity_df).createOrReplaceTempView(table_name) - return elif isinstance(entity_df, str): spark_session.sql(entity_df).createOrReplaceTempView(table_name) - return elif isinstance(entity_df, pyspark.sql.DataFrame): entity_df.createOrReplaceTempView(table_name) - return else: raise InvalidEntityType(type(entity_df)) @@ -534,7 +530,7 @@ def _upload_entity_df( def _format_datetime(t: datetime) -> str: # Since Hive does not support timezone, need to transform to utc. if t.tzinfo: - t = t.astimezone(tz=utc) + t = t.astimezone(tz=timezone.utc) dt = t.strftime("%Y-%m-%d %H:%M:%S.%f") return dt diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/connectors/upload.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/connectors/upload.py index 9e2ea3708d..1b55199193 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/connectors/upload.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/connectors/upload.py @@ -18,13 +18,12 @@ ``` """ -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Dict, Iterator, Optional, Set import numpy as np import pandas as pd import pyarrow -from pytz import utc from feast.infra.offline_stores.contrib.trino_offline_store.trino_queries import Trino from feast.infra.offline_stores.contrib.trino_offline_store.trino_type_map import ( @@ -141,7 +140,7 @@ def _format_value(row: pd.Series, schema: Dict[str, Any]) -> str: def format_datetime(t: datetime) -> str: if t.tzinfo: - t = t.astimezone(tz=utc) + t = t.astimezone(tz=timezone.utc) return t.strftime("%Y-%m-%d %H:%M:%S.%f") diff --git a/sdk/python/feast/infra/offline_stores/dask.py b/sdk/python/feast/infra/offline_stores/dask.py index 4a63baf646..52ad88d299 100644 --- a/sdk/python/feast/infra/offline_stores/dask.py +++ b/sdk/python/feast/infra/offline_stores/dask.py @@ -1,6 +1,6 @@ import os import uuid -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union @@ -178,6 +178,8 @@ def evaluate_historical_retrieval(): entity_df_event_timestamp_col_type = entity_df_with_features.dtypes[ entity_df_event_timestamp_col ] + + # TODO: need to figure out why the value of entity_df_event_timestamp_col_type.tz is pytz.UTC if ( not hasattr(entity_df_event_timestamp_col_type, "tz") or entity_df_event_timestamp_col_type.tz != pytz.UTC @@ -189,7 +191,7 @@ def evaluate_historical_retrieval(): ].apply( lambda x: x if x.tzinfo is not None - else x.replace(tzinfo=pytz.utc) + else x.replace(tzinfo=timezone.utc) ) ) @@ -616,6 +618,7 @@ def _normalize_timestamp( if created_timestamp_column: created_timestamp_column_type = df_to_join_types[created_timestamp_column] + # TODO: need to figure out why the value of timestamp_field_type.tz is pytz.UTC if not hasattr(timestamp_field_type, "tz") or timestamp_field_type.tz != pytz.UTC: # if you are querying for the event timestamp field, we have to deduplicate if len(df_to_join[timestamp_field].shape) > 1: @@ -624,10 +627,11 @@ def _normalize_timestamp( # Make sure all timestamp fields are tz-aware. We default tz-naive fields to UTC df_to_join[timestamp_field] = df_to_join[timestamp_field].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), + lambda x: x if x.tzinfo else x.replace(tzinfo=timezone.utc), meta=(timestamp_field, "datetime64[ns, UTC]"), ) + # TODO: need to figure out why the value of created_timestamp_column_type.tz is pytz.UTC if created_timestamp_column and ( not hasattr(created_timestamp_column_type, "tz") or created_timestamp_column_type.tz != pytz.UTC @@ -640,7 +644,7 @@ def _normalize_timestamp( df_to_join[created_timestamp_column] = df_to_join[ created_timestamp_column ].apply( - lambda x: x if x.tzinfo is not None else x.replace(tzinfo=pytz.utc), + lambda x: x if x.tzinfo else x.replace(tzinfo=timezone.utc), meta=(timestamp_field, "datetime64[ns, UTC]"), ) diff --git a/sdk/python/feast/infra/offline_stores/ibis.py b/sdk/python/feast/infra/offline_stores/ibis.py index 4de16cbda3..61c477baec 100644 --- a/sdk/python/feast/infra/offline_stores/ibis.py +++ b/sdk/python/feast/infra/offline_stores/ibis.py @@ -1,7 +1,7 @@ import random import string import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple, Union @@ -12,7 +12,6 @@ import pyarrow from ibis.expr import datatypes as dt from ibis.expr.types import Table -from pytz import utc from feast.data_source import DataSource from feast.feature_logging import LoggingConfig, LoggingSource @@ -55,8 +54,8 @@ def pull_latest_from_table_or_query_ibis( fields = join_key_columns + feature_name_columns + [timestamp_field] if created_timestamp_column: fields.append(created_timestamp_column) - start_date = start_date.astimezone(tz=utc) - end_date = end_date.astimezone(tz=utc) + start_date = start_date.astimezone(tz=timezone.utc) + end_date = end_date.astimezone(tz=timezone.utc) table = data_source_reader(data_source) @@ -265,8 +264,8 @@ def pull_all_from_table_or_query_ibis( staging_location_endpoint_override: Optional[str] = None, ) -> RetrievalJob: fields = join_key_columns + feature_name_columns + [timestamp_field] - start_date = start_date.astimezone(tz=utc) - end_date = end_date.astimezone(tz=utc) + start_date = start_date.astimezone(tz=timezone.utc) + end_date = end_date.astimezone(tz=timezone.utc) table = data_source_reader(data_source) diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index cec21c35c1..ed76f830f3 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -1,6 +1,6 @@ import contextlib import uuid -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import ( Any, @@ -21,7 +21,6 @@ import pyarrow as pa from dateutil import parser from pydantic import StrictStr, model_validator -from pytz import utc from feast import OnDemandFeatureView, RedshiftSource from feast.data_source import DataSource @@ -127,8 +126,8 @@ def pull_latest_from_table_or_query( ) s3_resource = aws_utils.get_s3_resource(config.offline_store.region) - start_date = start_date.astimezone(tz=utc) - end_date = end_date.astimezone(tz=utc) + start_date = start_date.astimezone(tz=timezone.utc) + end_date = end_date.astimezone(tz=timezone.utc) query = f""" SELECT @@ -174,8 +173,8 @@ def pull_all_from_table_or_query( ) s3_resource = aws_utils.get_s3_resource(config.offline_store.region) - start_date = start_date.astimezone(tz=utc) - end_date = end_date.astimezone(tz=utc) + start_date = start_date.astimezone(tz=timezone.utc) + end_date = end_date.astimezone(tz=timezone.utc) query = f""" SELECT {field_string} diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index ada6c99c98..9418171a96 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -3,7 +3,7 @@ import os import uuid import warnings -from datetime import datetime +from datetime import datetime, timezone from functools import reduce from pathlib import Path from typing import ( @@ -25,7 +25,6 @@ import pandas as pd import pyarrow from pydantic import ConfigDict, Field, StrictStr -from pytz import utc from feast import OnDemandFeatureView from feast.data_source import DataSource @@ -196,8 +195,8 @@ def pull_latest_from_table_or_query( with GetSnowflakeConnection(config.offline_store) as conn: snowflake_conn = conn - start_date = start_date.astimezone(tz=utc) - end_date = end_date.astimezone(tz=utc) + start_date = start_date.astimezone(tz=timezone.utc) + end_date = end_date.astimezone(tz=timezone.utc) query = f""" SELECT @@ -248,8 +247,8 @@ def pull_all_from_table_or_query( with GetSnowflakeConnection(config.offline_store) as conn: snowflake_conn = conn - start_date = start_date.astimezone(tz=utc) - end_date = end_date.astimezone(tz=utc) + start_date = start_date.astimezone(tz=timezone.utc) + end_date = end_date.astimezone(tz=timezone.utc) query = f""" SELECT {field_string} diff --git a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py index 429327e651..c26b4199ae 100644 --- a/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py +++ b/sdk/python/feast/infra/online_stores/contrib/elasticsearch.py @@ -6,7 +6,6 @@ from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple -import pytz from elasticsearch import Elasticsearch, helpers from feast import Entity, FeatureView, RepoConfig @@ -15,6 +14,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel +from feast.utils import to_naive_utc class ElasticSearchOnlineStoreConfig(FeastConfigBaseModel): @@ -96,9 +96,9 @@ def online_write_batch( entity_key_serialization_version=config.entity_key_serialization_version, ) encoded_entity_key = base64.b64encode(entity_key_bin).decode("utf-8") - timestamp = _to_naive_utc(timestamp) + timestamp = to_naive_utc(timestamp) if created_ts is not None: - created_ts = _to_naive_utc(created_ts) + created_ts = to_naive_utc(created_ts) for feature_name, value in values.items(): encoded_value = base64.b64encode(value.SerializeToString()).decode( "utf-8" @@ -267,10 +267,3 @@ def retrieve_online_documents( ) ) return result - - -def _to_naive_utc(ts: datetime): - if ts.tzinfo is None: - return ts - else: - return ts.astimezone(pytz.utc).replace(tzinfo=None) diff --git a/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/hazelcast_online_store.py b/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/hazelcast_online_store.py index 497d8909af..c56d394c21 100644 --- a/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/hazelcast_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/hazelcast_online_store/hazelcast_online_store.py @@ -23,7 +23,6 @@ from datetime import datetime, timezone from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple -import pytz from hazelcast.client import HazelcastClient from hazelcast.core import HazelcastJsonValue from hazelcast.discovery import HazelcastCloudDiscovery @@ -167,10 +166,10 @@ def online_write_batch( entity_key_serialization_version=2, ) ).decode("utf-8") - event_ts_utc = pytz.utc.localize(event_ts, is_dst=None).timestamp() + event_ts_utc = event_ts.astimezone(tz=timezone.utc).timestamp() created_ts_utc = 0.0 if created_ts is not None: - created_ts_utc = pytz.utc.localize(created_ts, is_dst=None).timestamp() + created_ts_utc = created_ts.astimezone(tz=timezone.utc).timestamp() for feature_name, value in values.items(): feature_value = base64.b64encode(value.SerializeToString()).decode( "utf-8" diff --git a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py index 6b721bddf8..c8f0ad65c9 100644 --- a/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py +++ b/sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timezone from typing import ( Any, Callable, @@ -11,7 +11,6 @@ Tuple, ) -import pytz from google.protobuf.timestamp_pb2 import Timestamp from ikvpy.client import IKVReader, IKVWriter from ikvpy.clientoptions import ClientOptions, ClientOptionsBuilder @@ -163,7 +162,7 @@ def _decode_fields_for_primary_key( if dt_bytes: proto_timestamp = Timestamp() proto_timestamp.ParseFromString(dt_bytes) - dt = datetime.fromtimestamp(proto_timestamp.seconds, tz=pytz.utc) + dt = datetime.fromtimestamp(proto_timestamp.seconds, tz=timezone.utc) # decode other features features = {} diff --git a/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py b/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py index 26916a9fcb..64111ca42c 100644 --- a/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py +++ b/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py @@ -4,7 +4,6 @@ from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple import pymysql -import pytz from pydantic import StrictStr from pymysql.connections import Connection from pymysql.cursors import Cursor @@ -15,6 +14,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel +from feast.utils import to_naive_utc class MySQLOnlineStoreConfig(FeastConfigBaseModel): @@ -74,9 +74,9 @@ def online_write_batch( entity_key, entity_key_serialization_version=2, ).hex() - timestamp = _to_naive_utc(timestamp) + timestamp = to_naive_utc(timestamp) if created_ts is not None: - created_ts = _to_naive_utc(created_ts) + created_ts = to_naive_utc(created_ts) for feature_name, val in values.items(): self.write_to_table( @@ -223,10 +223,3 @@ def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None def _table_id(project: str, table: FeatureView) -> str: return f"{project}_{table.name}" - - -def _to_naive_utc(ts: datetime) -> datetime: - if ts.tzinfo is None: - return ts - else: - return ts.astimezone(pytz.utc).replace(tzinfo=None) diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index ff73a4a347..8c6d3e0b99 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -16,7 +16,6 @@ Union, ) -import pytz from psycopg import AsyncConnection, sql from psycopg.connection import Connection from psycopg_pool import AsyncConnectionPool, ConnectionPool @@ -24,6 +23,9 @@ from feast import Entity from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import get_list_val_str, serialize_entity_key +from feast.infra.online_stores.contrib.singlestore_online_store.singlestore import ( + _to_naive_utc, +) from feast.infra.online_stores.online_store import OnlineStore from feast.infra.utils.postgres.connection_utils import ( _get_conn, @@ -472,10 +474,3 @@ def _drop_table_and_index(table_name): sql.Identifier(table_name), sql.Identifier(f"{table_name}_ek"), ) - - -def _to_naive_utc(ts: datetime): - if ts.tzinfo is None: - return ts - else: - return ts.astimezone(pytz.utc).replace(tzinfo=None) diff --git a/sdk/python/feast/infra/online_stores/contrib/singlestore_online_store/singlestore.py b/sdk/python/feast/infra/online_stores/contrib/singlestore_online_store/singlestore.py index e17a059c1a..3e921afcea 100644 --- a/sdk/python/feast/infra/online_stores/contrib/singlestore_online_store/singlestore.py +++ b/sdk/python/feast/infra/online_stores/contrib/singlestore_online_store/singlestore.py @@ -1,10 +1,9 @@ from __future__ import absolute_import from collections import defaultdict -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple -import pytz import singlestoredb from pydantic import StrictStr from singlestoredb.connection import Connection, Cursor @@ -232,4 +231,4 @@ def _to_naive_utc(ts: datetime) -> datetime: if ts.tzinfo is None: return ts else: - return ts.astimezone(pytz.utc).replace(tzinfo=None) + return ts.astimezone(tz=timezone.utc).replace(tzinfo=None) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 5f0156f620..59892fcbe0 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -13,7 +13,7 @@ # limitations under the License. import json import logging -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from typing import ( Any, @@ -28,7 +28,6 @@ Union, ) -import pytz from google.protobuf.timestamp_pb2 import Timestamp from pydantic import StrictStr @@ -457,5 +456,5 @@ def _get_features_for_entity( if not res: return None, None else: - timestamp = datetime.fromtimestamp(res_ts.seconds, tz=pytz.utc) + timestamp = datetime.fromtimestamp(res_ts.seconds, tz=timezone.utc) return timestamp, res diff --git a/sdk/python/feast/registry_server.py b/sdk/python/feast/registry_server.py index 4a96ba76a8..53acb9f625 100644 --- a/sdk/python/feast/registry_server.py +++ b/sdk/python/feast/registry_server.py @@ -1,9 +1,8 @@ from concurrent import futures -from datetime import datetime +from datetime import datetime, timezone import grpc from google.protobuf.empty_pb2 import Empty -from pytz import utc from feast import FeatureStore from feast.data_source import DataSource @@ -314,10 +313,11 @@ def ApplyMaterialization( feature_view=FeatureView.from_proto(request.feature_view), project=request.project, start_date=datetime.fromtimestamp( - request.start_date.seconds + request.start_date.nanos / 1e9, tz=utc + request.start_date.seconds + request.start_date.nanos / 1e9, + tz=timezone.utc, ), end_date=datetime.fromtimestamp( - request.end_date.seconds + request.end_date.nanos / 1e9, tz=utc + request.end_date.seconds + request.end_date.nanos / 1e9, tz=timezone.utc ), commit=request.commit, ) diff --git a/sdk/python/feast/templates/aws/feature_repo/test_workflow.py b/sdk/python/feast/templates/aws/feature_repo/test_workflow.py index 59ac1f0ee7..092399e03c 100644 --- a/sdk/python/feast/templates/aws/feature_repo/test_workflow.py +++ b/sdk/python/feast/templates/aws/feature_repo/test_workflow.py @@ -1,9 +1,8 @@ import random import subprocess -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import pandas as pd -from pytz import utc from feast import FeatureStore from feast.data_source import PushMode @@ -71,9 +70,11 @@ def run_demo(): def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring): end_date = ( - datetime.now().replace(microsecond=0, second=0, minute=0).astimezone(tz=utc) + datetime.now() + .replace(microsecond=0, second=0, minute=0) + .astimezone(tz=timezone.utc) ) - start_date = (end_date - timedelta(days=60)).astimezone(tz=utc) + start_date = (end_date - timedelta(days=60)).astimezone(tz=timezone.utc) # For batch scoring, we want the latest timestamps if for_batch_scoring: print( diff --git a/sdk/python/feast/templates/snowflake/test_workflow.py b/sdk/python/feast/templates/snowflake/test_workflow.py index 3c44342881..f60b014874 100644 --- a/sdk/python/feast/templates/snowflake/test_workflow.py +++ b/sdk/python/feast/templates/snowflake/test_workflow.py @@ -1,10 +1,9 @@ import random import subprocess -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import pandas as pd import yaml -from pytz import utc from feast import FeatureStore from feast.data_source import PushMode @@ -75,9 +74,11 @@ def run_demo(): def fetch_historical_features_entity_sql(store: FeatureStore, for_batch_scoring): end_date = ( - datetime.now().replace(microsecond=0, second=0, minute=0).astimezone(tz=utc) + datetime.now() + .replace(microsecond=0, second=0, minute=0) + .astimezone(tz=timezone.utc) ) - start_date = (end_date - timedelta(days=60)).astimezone(tz=utc) + start_date = (end_date - timedelta(days=60)).astimezone(tz=timezone.utc) project_name = yaml.safe_load(open("feature_repo/feature_store.yaml"))["project"] table_name = f"{project_name}_feast_driver_hourly_stats" diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 0467393aa2..5862cd4630 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -25,7 +25,6 @@ import pyarrow from dateutil.tz import tzlocal from google.protobuf.timestamp_pb2 import Timestamp -from pytz import utc from feast.constants import FEAST_FS_YAML_FILE_PATH_ENV_NAME from feast.entity import Entity @@ -63,7 +62,7 @@ def get_user_agent(): def make_tzaware(t: datetime) -> datetime: """We assume tz-naive datetimes are UTC""" if t.tzinfo is None: - return t.replace(tzinfo=utc) + return t.replace(tzinfo=timezone.utc) else: return t @@ -81,7 +80,7 @@ def to_naive_utc(ts: datetime) -> datetime: if ts.tzinfo is None: return ts else: - return ts.astimezone(utc).replace(tzinfo=None) + return ts.astimezone(timezone.utc).replace(tzinfo=None) def maybe_local_tz(t: datetime) -> datetime: diff --git a/sdk/python/tests/data/data_creator.py b/sdk/python/tests/data/data_creator.py index 15d09c5a40..5d6cffeb9d 100644 --- a/sdk/python/tests/data/data_creator.py +++ b/sdk/python/tests/data/data_creator.py @@ -1,8 +1,8 @@ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Dict, List, Optional import pandas as pd -from pytz import timezone, utc +from zoneinfo import ZoneInfo from feast.types import FeastType, Float32, Int32, Int64, String from feast.utils import _utc_now @@ -27,11 +27,11 @@ def create_basic_driver_dataset( ts - timedelta(hours=3), # Use different time zones to test tz-naive -> tz-aware conversion (ts - timedelta(hours=4)) - .replace(tzinfo=utc) - .astimezone(tz=timezone("Europe/Berlin")), + .replace(tzinfo=timezone.utc) + .astimezone(tz=ZoneInfo("Europe/Berlin")), (ts - timedelta(hours=1)) - .replace(tzinfo=utc) - .astimezone(tz=timezone("US/Pacific")), + .replace(tzinfo=timezone.utc) + .astimezone(tz=ZoneInfo("US/Pacific")), ], "created_ts": [ts, ts, ts, ts, ts], } diff --git a/sdk/python/tests/integration/materialization/test_snowflake.py b/sdk/python/tests/integration/materialization/test_snowflake.py index f12191363b..f53c3ca753 100644 --- a/sdk/python/tests/integration/materialization/test_snowflake.py +++ b/sdk/python/tests/integration/materialization/test_snowflake.py @@ -1,8 +1,7 @@ import os -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import pytest -from pytz import utc from feast import Field from feast.entity import Entity @@ -150,7 +149,7 @@ def test_snowflake_materialization_consistency_internal_with_lists( now = _utc_now() full_feature_names = True - start_date = (now - timedelta(hours=5)).replace(tzinfo=utc) + start_date = (now - timedelta(hours=5)).replace(tzinfo=timezone.utc) end_date = split_dt fs.materialize( feature_views=[driver_stats_fv.name], @@ -165,7 +164,7 @@ def test_snowflake_materialization_consistency_internal_with_lists( "string": ["3"] * 2, "bytes": [b"3"] * 2, "bool": [False] * 2, - "datetime": [datetime(1981, 1, 1, tzinfo=utc)] * 2, + "datetime": [datetime(1981, 1, 1, tzinfo=timezone.utc)] * 2, } expected_value = [] if feature_is_empty_list else expected_values[feature_dtype] @@ -234,7 +233,7 @@ def test_snowflake_materialization_entityless_fv(): now = _utc_now() - start_date = (now - timedelta(hours=5)).replace(tzinfo=utc) + start_date = (now - timedelta(hours=5)).replace(tzinfo=timezone.utc) end_date = split_dt fs.materialize( feature_views=[overall_stats_fv.name], diff --git a/sdk/python/tests/integration/registration/test_universal_registry.py b/sdk/python/tests/integration/registration/test_universal_registry.py index b0738c8419..9dcd1b5b91 100644 --- a/sdk/python/tests/integration/registration/test_universal_registry.py +++ b/sdk/python/tests/integration/registration/test_universal_registry.py @@ -14,7 +14,7 @@ import logging import os import time -from datetime import timedelta +from datetime import timedelta, timezone from tempfile import mkstemp from unittest import mock @@ -22,7 +22,6 @@ import pandas as pd import pytest from pytest_lazyfixture import lazy_fixture -from pytz import utc from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs from testcontainers.minio import MinioContainer @@ -802,8 +801,8 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: # Simulate materialization current_date = _utc_now() - end_date = current_date.replace(tzinfo=utc) - start_date = (current_date - timedelta(days=1)).replace(tzinfo=utc) + end_date = current_date.replace(tzinfo=timezone.utc) + start_date = (current_date - timedelta(days=1)).replace(tzinfo=timezone.utc) test_registry.apply_materialization(feature_view, project, start_date, end_date) materialized_feature_view = test_registry.get_feature_view( "my_feature_view_1", project @@ -871,8 +870,8 @@ def odfv1(feature_df: pd.DataFrame) -> pd.DataFrame: # Simulate materialization a second time current_date = _utc_now() - end_date_1 = current_date.replace(tzinfo=utc) - start_date_1 = (current_date - timedelta(days=1)).replace(tzinfo=utc) + end_date_1 = current_date.replace(tzinfo=timezone.utc) + start_date_1 = (current_date - timedelta(days=1)).replace(tzinfo=timezone.utc) test_registry.apply_materialization( updated_feature_view, project, start_date_1, end_date_1 ) diff --git a/sdk/python/tests/utils/e2e_test_validation.py b/sdk/python/tests/utils/e2e_test_validation.py index 1a8bedc796..a08e8fef42 100644 --- a/sdk/python/tests/utils/e2e_test_validation.py +++ b/sdk/python/tests/utils/e2e_test_validation.py @@ -1,13 +1,12 @@ import math import os import time -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Dict, List, Optional, Union import pandas as pd import yaml -from pytz import utc from feast import FeatureStore, FeatureView, RepoConfig from feast.utils import _utc_now @@ -39,7 +38,7 @@ def validate_offline_online_store_consistency( # Run materialize() # use both tz-naive & tz-aware timestamps to test that they're both correctly handled - start_date = (now - timedelta(hours=5)).replace(tzinfo=utc) + start_date = (now - timedelta(hours=5)).replace(tzinfo=timezone.utc) end_date = split_dt fs.materialize(feature_views=[fv.name], start_date=start_date, end_date=end_date) @@ -87,7 +86,8 @@ def validate_offline_online_store_consistency( and updated_fv.materialization_intervals[0][0] == start_date and updated_fv.materialization_intervals[0][1] == end_date and updated_fv.materialization_intervals[1][0] == end_date - and updated_fv.materialization_intervals[1][1] == now.replace(tzinfo=utc) + and updated_fv.materialization_intervals[1][1] + == now.replace(tzinfo=timezone.utc) ) # check result of materialize_incremental() diff --git a/sdk/python/tests/utils/feature_records.py b/sdk/python/tests/utils/feature_records.py index bd3567c9ee..e81666eaa5 100644 --- a/sdk/python/tests/utils/feature_records.py +++ b/sdk/python/tests/utils/feature_records.py @@ -5,7 +5,6 @@ import pandas as pd import pytest from pandas.testing import assert_frame_equal as pd_assert_frame_equal -from pytz import utc from feast import FeatureService, FeatureStore, utils from feast.errors import FeatureNameCollisionError @@ -16,7 +15,7 @@ def convert_timestamp_records_to_utc( records: List[Dict[str, Any]], column: str ) -> List[Dict[str, Any]]: for record in records: - record[column] = utils.make_tzaware(record[column]).astimezone(utc) + record[column] = utils.make_tzaware(record[column]).astimezone(timezone.utc) return records diff --git a/sdk/python/tests/utils/test_log_creator.py b/sdk/python/tests/utils/test_log_creator.py index 987c8d77ef..3e432e11bf 100644 --- a/sdk/python/tests/utils/test_log_creator.py +++ b/sdk/python/tests/utils/test_log_creator.py @@ -1,7 +1,7 @@ import contextlib -import datetime import tempfile import uuid +from datetime import timedelta from pathlib import Path from typing import Iterator, List, Union @@ -80,7 +80,7 @@ def prepare_logs( logs_df[REQUEST_ID_FIELD] = [str(uuid.uuid4()) for _ in range(num_rows)] logs_df[LOG_TIMESTAMP_FIELD] = pd.Series( np.random.randint(0, 7 * 24 * 3600, num_rows) - ).map(lambda secs: pd.Timestamp.utcnow() - datetime.timedelta(seconds=secs)) + ).map(lambda secs: pd.Timestamp.utcnow() - timedelta(seconds=secs)) logs_df[LOG_DATE_FIELD] = logs_df[LOG_TIMESTAMP_FIELD].dt.date for projection in feature_service.feature_view_projections: