From 9b227d7d44f30d28d1faadc8015f25dc4a6f56b5 Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Tue, 16 May 2023 13:50:59 +0900 Subject: [PATCH] fix: Fix timestamp consistency in push api (#3614) Signed-off-by: Jiwon Park --- sdk/python/feast/infra/offline_stores/bigquery.py | 2 +- sdk/python/feast/infra/offline_stores/offline_utils.py | 5 +++-- sdk/python/feast/type_map.py | 8 +++++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 770477f251..47335c411f 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -363,7 +363,7 @@ def offline_write_batch( assert isinstance(feature_view.batch_source, BigQuerySource) pa_schema, column_names = offline_utils.get_pyarrow_schema_from_batch_source( - config, feature_view.batch_source + config, feature_view.batch_source, timestamp_unit="ns" ) if column_names != table.column_names: raise ValueError( diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 42b8f8497a..2d4fa268e4 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -232,7 +232,7 @@ def get_offline_store_from_config(offline_store_config: Any) -> OfflineStore: def get_pyarrow_schema_from_batch_source( - config: RepoConfig, batch_source: DataSource + config: RepoConfig, batch_source: DataSource, timestamp_unit: str = "us" ) -> Tuple[pa.Schema, List[str]]: """Returns the pyarrow schema and column names for the given batch source.""" column_names_and_types = batch_source.get_table_column_names_and_types(config) @@ -244,7 +244,8 @@ def get_pyarrow_schema_from_batch_source( ( column_name, feast_value_type_to_pa( - batch_source.source_datatype_to_feast_value_type()(column_type) + batch_source.source_datatype_to_feast_value_type()(column_type), + timestamp_unit=timestamp_unit, ), ) ) diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 0188c58021..689f0268d2 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -844,7 +844,9 @@ def pg_type_to_feast_value_type(type_str: str) -> ValueType: return value -def feast_value_type_to_pa(feast_type: ValueType) -> "pyarrow.DataType": +def feast_value_type_to_pa( + feast_type: ValueType, timestamp_unit: str = "us" +) -> "pyarrow.DataType": import pyarrow type_map = { @@ -855,7 +857,7 @@ def feast_value_type_to_pa(feast_type: ValueType) -> "pyarrow.DataType": ValueType.STRING: pyarrow.string(), ValueType.BYTES: pyarrow.binary(), ValueType.BOOL: pyarrow.bool_(), - ValueType.UNIX_TIMESTAMP: pyarrow.timestamp("us"), + ValueType.UNIX_TIMESTAMP: pyarrow.timestamp(timestamp_unit), ValueType.INT32_LIST: pyarrow.list_(pyarrow.int32()), ValueType.INT64_LIST: pyarrow.list_(pyarrow.int64()), ValueType.DOUBLE_LIST: pyarrow.list_(pyarrow.float64()), @@ -863,7 +865,7 @@ def feast_value_type_to_pa(feast_type: ValueType) -> "pyarrow.DataType": ValueType.STRING_LIST: pyarrow.list_(pyarrow.string()), ValueType.BYTES_LIST: pyarrow.list_(pyarrow.binary()), ValueType.BOOL_LIST: pyarrow.list_(pyarrow.bool_()), - ValueType.UNIX_TIMESTAMP_LIST: pyarrow.list_(pyarrow.timestamp("us")), + ValueType.UNIX_TIMESTAMP_LIST: pyarrow.list_(pyarrow.timestamp(timestamp_unit)), ValueType.NULL: pyarrow.null(), } return type_map[feast_type]