diff --git a/sdk/python/feast/loaders/abstract_producer.py b/sdk/python/feast/loaders/abstract_producer.py index 6030d14ecc..14d9bc42b7 100644 --- a/sdk/python/feast/loaders/abstract_producer.py +++ b/sdk/python/feast/loaders/abstract_producer.py @@ -25,8 +25,6 @@ class AbstractProducer: def __init__(self, brokers: str, row_count: int, disable_progress_bar: bool): self.brokers = brokers self.row_count = row_count - self.error_count = 0 - self.last_exception = "" # Progress bar will always display average rate self.pbar = tqdm( @@ -45,8 +43,7 @@ def _inc_pbar(self, meta): self.pbar.update(1) def _set_error(self, exception: str): - self.error_count += 1 - self.last_exception = exception + raise Exception(exception) def print_results(self) -> None: """ @@ -62,24 +59,7 @@ def print_results(self) -> None: print("Ingestion complete!") - failed_message = ( - "" - if self.error_count == 0 - else f"\nFail: {self.error_count / self.row_count}" - ) - - last_exception_message = ( - "" - if self.last_exception == "" - else f"\nLast exception:\n{self.last_exception}" - ) - - print( - f"\nIngestion statistics:" - f"\nSuccess: {self.pbar.n}/{self.row_count}" - f"{failed_message}" - f"{last_exception_message}" - ) + print(f"\nIngestion statistics:" f"\nSuccess: {self.pbar.n}/{self.row_count}") return None @@ -129,7 +109,10 @@ def flush(self, timeout: Optional[int]): Returns: int: Number of messages still in queue. """ - return self.producer.flush(timeout=timeout) + messages = self.producer.flush(timeout=timeout) + if messages: + raise Exception("Not all Kafka messages are successfully delivered.") + return messages def _delivery_callback(self, err: str, msg) -> None: """ @@ -200,7 +183,10 @@ def flush(self, timeout: Optional[int]): KafkaTimeoutError: failure to flush buffered records within the provided timeout """ - return self.producer.flush(timeout=timeout) + messages = self.producer.flush(timeout=timeout) + if messages: + raise Exception("Not all Kafka messages are successfully delivered.") + return messages def get_producer( diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index e87c857335..a39c3a3381 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -601,6 +601,38 @@ def test_feature_set_ingest_success(self, dataframe, test_client, mocker): # Ingest data into Feast test_client.ingest("driver-feature-set", dataframe) + @pytest.mark.parametrize( + "dataframe,test_client,exception", + [(dataframes.GOOD, pytest.lazy_fixture("client"), Exception)], + ) + def test_feature_set_ingest_throws_exception_if_kafka_down( + self, dataframe, test_client, exception, mocker + ): + + test_client.set_project("project1") + driver_fs = FeatureSet( + "driver-feature-set", + source=KafkaSource(brokers="localhost:4412", topic="test"), + ) + driver_fs.add(Feature(name="feature_1", dtype=ValueType.FLOAT)) + driver_fs.add(Feature(name="feature_2", dtype=ValueType.STRING)) + driver_fs.add(Feature(name="feature_3", dtype=ValueType.INT64)) + driver_fs.add(Entity(name="entity_id", dtype=ValueType.INT64)) + + # Register with Feast core + test_client.apply(driver_fs) + driver_fs = driver_fs.to_proto() + driver_fs.meta.status = FeatureSetStatusProto.STATUS_READY + + mocker.patch.object( + test_client._core_service_stub, + "GetFeatureSet", + return_value=GetFeatureSetResponse(feature_set=driver_fs), + ) + + with pytest.raises(exception): + test_client.ingest("driver-feature-set", dataframe) + @pytest.mark.parametrize( "dataframe,exception,test_client", [