From 6e05cf2e4a81e087f732bd88e6cc36313c933e8c Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Fri, 16 Feb 2024 09:20:07 +0800 Subject: [PATCH 1/5] fetch_arrow_all returns empty table Signed-off-by: Chester Ong --- Makefile | 2 +- sdk/python/feast/infra/offline_stores/snowflake.py | 13 ++----------- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index 6736e64078..6fe0d62f37 100644 --- a/Makefile +++ b/Makefile @@ -310,7 +310,7 @@ format-python: cd ${ROOT_DIR}/sdk/python; python -m black --target-version py38 feast tests lint-python: - cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ --follow-imports=skip feast + cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ feast cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/ cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 66e7e78651..ec3506cc4f 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -470,18 +470,9 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: pa_table = execute_snowflake_statement( self.snowflake_conn, self.to_sql() - ).fetch_arrow_all() + ).fetch_arrow_all(force_return_table=True) - if pa_table: - return pa_table - else: - empty_result = execute_snowflake_statement( - self.snowflake_conn, self.to_sql() - ) - - return pyarrow.Table.from_pandas( - pd.DataFrame(columns=[md.name for md in empty_result.description]) - ) + return pa_table def to_sql(self) -> str: """ From 04de64ce7033108d33844442c2561400c2b45f06 Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Fri, 16 Feb 2024 09:24:21 +0800 Subject: [PATCH 2/5] fix spark_kafka_processor typing errors Signed-off-by: Chester Ong --- .../feast/infra/contrib/spark_kafka_processor.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index fc4a34f17b..7237f40368 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -5,7 +5,6 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.avro.functions import from_avro from pyspark.sql.functions import col, from_json -from pyspark.sql.streaming import StreamingQuery from feast.data_format import AvroFormat, JsonFormat from feast.data_source import KafkaSource, PushMode @@ -68,13 +67,10 @@ def __init__( # data_source type has been checked to be an instance of KafkaSource. self.data_source: KafkaSource = self.data_source # type: ignore - def ingest_stream_feature_view( - self, to: PushMode = PushMode.ONLINE - ) -> StreamingQuery: + def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: ingested_stream_df = self._ingest_stream_data() transformed_df = self._construct_transformation_plan(ingested_stream_df) - online_store_query = self._write_stream_data(transformed_df, to) - return online_store_query + self._write_stream_data(transformed_df, to) # In the line 64 of __init__(), the "data_source" is assigned a stream_source (and has to be KafkaSource as in line 40). @no_type_check @@ -131,7 +127,7 @@ def _ingest_stream_data(self) -> StreamTable: def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: return self.sfv.udf.__call__(df) if self.sfv.udf else df - def _write_stream_data(self, df: StreamTable, to: PushMode) -> StreamingQuery: + def _write_stream_data(self, df: StreamTable, to: PushMode) -> None: # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. def batch_write(row: DataFrame, batch_id: int): rows: pd.DataFrame = row.toPandas() @@ -170,4 +166,3 @@ def batch_write(row: DataFrame, batch_id: int): ) query.awaitTermination(timeout=self.query_timeout) - return query From 696d5d692f396f0329b1200725c57d06a0d6c5df Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Fri, 16 Feb 2024 11:40:45 +0800 Subject: [PATCH 3/5] fix correct return type Signed-off-by: Chester Ong --- Makefile | 2 +- .../tests/data_source.py | 27 +++++++++++-------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index 6fe0d62f37..1598664f83 100644 --- a/Makefile +++ b/Makefile @@ -310,7 +310,7 @@ format-python: cd ${ROOT_DIR}/sdk/python; python -m black --target-version py38 feast tests lint-python: - cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ feast + cd ${ROOT_DIR}/sdk/python; python -m mypy feast cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/ cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py index 46d5c20e97..f50cdc4c41 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/tests/data_source.py @@ -1,5 +1,5 @@ import logging -from typing import Dict, Optional +from typing import Dict, Literal, Optional import pandas as pd import pytest @@ -12,6 +12,7 @@ PostgreSQLSource, ) from feast.infra.utils.postgres.connection_utils import df_to_postgres_table +from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) @@ -26,6 +27,10 @@ POSTGRES_DB = "test" +class PostgreSQLOnlineStoreConfig(PostgreSQLConfig): + type: Literal["postgres"] = "postgres" + + @pytest.fixture(scope="session") def postgres_container(): container = ( @@ -106,17 +111,17 @@ def create_offline_store_config(self) -> PostgreSQLOfflineStoreConfig: def get_prefixed_table_name(self, suffix: str) -> str: return f"{self.project_name}_{suffix}" - def create_online_store(self) -> Dict[str, str]: + def create_online_store(self) -> PostgreSQLOnlineStoreConfig: assert self.container - return { - "type": "postgres", - "host": "localhost", - "port": self.container.get_exposed_port(5432), - "database": POSTGRES_DB, - "db_schema": "feature_store", - "user": POSTGRES_USER, - "password": POSTGRES_PASSWORD, - } + return PostgreSQLOnlineStoreConfig( + type="postgres", + host="localhost", + port=self.container.get_exposed_port(5432), + database=POSTGRES_DB, + db_schema="feature_store", + user=POSTGRES_USER, + password=POSTGRES_PASSWORD, + ) def create_saved_dataset_destination(self): # FIXME: ... From bbcce21dd01c6bc9fca30bc2b2fea23240b1bfa8 Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Fri, 16 Feb 2024 15:01:47 +0800 Subject: [PATCH 4/5] revert _to_arrow_internal Signed-off-by: Chester Ong --- sdk/python/feast/infra/offline_stores/snowflake.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index ec3506cc4f..32cda2d6b6 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -470,9 +470,18 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: pa_table = execute_snowflake_statement( self.snowflake_conn, self.to_sql() - ).fetch_arrow_all(force_return_table=True) + ).fetch_arrow_all(force_return_table=False) - return pa_table + if pa_table: + return pa_table + else: + empty_result = execute_snowflake_statement( + self.snowflake_conn, self.to_sql() + ) + + return pyarrow.Table.from_pandas( + pd.DataFrame(columns=[md.name for md in empty_result.description]) + ) def to_sql(self) -> str: """ From 1aac1e95622e87003793a716bfc89d15419a3621 Mon Sep 17 00:00:00 2001 From: Chester Ong Date: Fri, 16 Feb 2024 15:09:57 +0800 Subject: [PATCH 5/5] revert kafkaStreamProcessor changes, change base type instead Signed-off-by: Chester Ong --- .../feast/infra/contrib/spark_kafka_processor.py | 11 ++++++++--- sdk/python/feast/infra/contrib/stream_processor.py | 8 +++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 7237f40368..fc4a34f17b 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -5,6 +5,7 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.avro.functions import from_avro from pyspark.sql.functions import col, from_json +from pyspark.sql.streaming import StreamingQuery from feast.data_format import AvroFormat, JsonFormat from feast.data_source import KafkaSource, PushMode @@ -67,10 +68,13 @@ def __init__( # data_source type has been checked to be an instance of KafkaSource. self.data_source: KafkaSource = self.data_source # type: ignore - def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: + def ingest_stream_feature_view( + self, to: PushMode = PushMode.ONLINE + ) -> StreamingQuery: ingested_stream_df = self._ingest_stream_data() transformed_df = self._construct_transformation_plan(ingested_stream_df) - self._write_stream_data(transformed_df, to) + online_store_query = self._write_stream_data(transformed_df, to) + return online_store_query # In the line 64 of __init__(), the "data_source" is assigned a stream_source (and has to be KafkaSource as in line 40). @no_type_check @@ -127,7 +131,7 @@ def _ingest_stream_data(self) -> StreamTable: def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: return self.sfv.udf.__call__(df) if self.sfv.udf else df - def _write_stream_data(self, df: StreamTable, to: PushMode) -> None: + def _write_stream_data(self, df: StreamTable, to: PushMode) -> StreamingQuery: # Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema. def batch_write(row: DataFrame, batch_id: int): rows: pd.DataFrame = row.toPandas() @@ -166,3 +170,4 @@ def batch_write(row: DataFrame, batch_id: int): ) query.awaitTermination(timeout=self.query_timeout) + return query diff --git a/sdk/python/feast/infra/contrib/stream_processor.py b/sdk/python/feast/infra/contrib/stream_processor.py index c4620f4ca1..3f1fe08510 100644 --- a/sdk/python/feast/infra/contrib/stream_processor.py +++ b/sdk/python/feast/infra/contrib/stream_processor.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from types import MethodType -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Any, Optional from pyspark.sql import DataFrame from typing_extensions import TypeAlias @@ -51,7 +51,9 @@ def __init__( self.data_source = data_source @abstractmethod - def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: + def ingest_stream_feature_view( + self, to: PushMode = PushMode.ONLINE + ) -> Optional[Any]: """ Ingests data from the stream source attached to the stream feature view; transforms the data and then persists it to the online store and/or offline store, depending on the 'to' parameter. @@ -75,7 +77,7 @@ def _construct_transformation_plan(self, table: StreamTable) -> StreamTable: raise NotImplementedError @abstractmethod - def _write_stream_data(self, table: StreamTable, to: PushMode) -> None: + def _write_stream_data(self, table: StreamTable, to: PushMode) -> Optional[Any]: """ Launches a job to persist stream data to the online store and/or offline store, depending on the 'to' parameter, and returns a handle for the job.