diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index cb5d9d2d82..f65b133189 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -8,7 +8,6 @@ from bytewax.execution import cluster_main from bytewax.inputs import ManualInputConfig from bytewax.outputs import ManualOutputConfig -from tqdm import tqdm from feast import FeatureStore, FeatureView, RepoConfig from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping @@ -31,14 +30,18 @@ def __init__( self.feature_view = feature_view self.worker_index = worker_index self.paths = paths + self.mini_batch_size = int( + os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE) + ) self._run_dataflow() def process_path(self, path): + logger.info("Processing path %s", path) dataset = pq.ParquetDataset(path, use_legacy_dataset=False) batches = [] for fragment in dataset.fragments: - for batch in fragment.to_table().to_batches(): + for batch in fragment.to_table().to_batches(max_chunksize=self.mini_batch_size): batches.append(batch) return batches @@ -47,13 +50,8 @@ def input_builder(self, worker_index, worker_count, _state): return [(None, self.paths[self.worker_index])] def output_builder(self, worker_index, worker_count): - def yield_batch(iterable, batch_size): - """Yield mini-batches from an iterable.""" - for i in range(0, len(iterable), batch_size): - yield iterable[i : i + batch_size] - - def output_fn(batch): - table = pa.Table.from_batches([batch]) + def output_fn(mini_batch): + table = pa.Table.from_batches([mini_batch]) if self.feature_view.batch_source.field_mapping is not None: table = _run_pyarrow_field_mapping( @@ -68,19 +66,13 @@ def output_fn(batch): rows_to_write = _convert_arrow_to_proto( table, self.feature_view, join_key_to_value_type ) - provider = self.feature_store._get_provider() - with tqdm(total=len(rows_to_write)) as progress: - # break rows_to_write to mini-batches - batch_size = int( - os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE) - ) - for mini_batch in yield_batch(rows_to_write, batch_size): - provider.online_write_batch( - config=self.config, - table=self.feature_view, - data=mini_batch, - progress=progress.update, - ) + + self.feature_store._get_provider().online_write_batch( + config=self.config, + table=self.feature_view, + data=rows_to_write, + progress=None, + ) return output_fn diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index a579e2baa6..b719ee58a5 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -28,7 +28,6 @@ from feast.repo_config import FeastConfigBaseModel from feast.stream_feature_view import StreamFeatureView from feast.utils import _get_column_names, get_default_yaml_file_path - from .bytewax_materialization_job import BytewaxMaterializationJob logger = logging.getLogger(__name__) @@ -235,35 +234,30 @@ def _await_path_materialization( MaterializationJobStatus.WAITING, MaterializationJobStatus.RUNNING, ): - logger.info( - f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " - f"(of {total_pods}) running..." - ) + logger.info("%s materialization for pods %d-%d (of %d) running...", + feature_view.name, batch_start, batch_end, total_pods) sleep(30) - logger.info( - f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " - f"(of {total_pods}) complete with status {job.status()}" - ) + + logger.info("%s materialization for pods %d-%d (of %d) complete with status %s", + feature_view.name, batch_start, batch_end, total_pods, job.status()) except BaseException as e: if self.batch_engine_config.print_pod_logs_on_failure: self._print_pod_logs(job.job_id(), feature_view, batch_start) - logger.info(f"Deleting job {job.job_id()}") + logger.info("Deleting job %s", job.job_id()) try: self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) except ApiException as ae: - logger.warning(f"Could not delete job due to API Error: {ae.body}") + logger.warning("Could not delete job due to API Error: %s", ae.body) raise e finally: - logger.info(f"Deleting configmap {self._configmap_name(job_id)}") + logger.info("Deleting configmap %s", self._configmap_name(job_id)) try: self.v1.delete_namespaced_config_map( self._configmap_name(job_id), self.namespace ) except ApiException as ae: - logger.warning( - f"Could not delete configmap due to API Error: {ae.body}" - ) + logger.warning("Could not delete configmap due to API Error: %s", ae.body) return job @@ -273,13 +267,13 @@ def _print_pod_logs(self, job_id, feature_view, offset=0): label_selector=f"job-name={job_id}", ).items for i, pod in enumerate(pods_list): - logger.info(f"Logging output for {feature_view.name} pod {offset+i}") + logger.info("Logging output for %s pod %d", feature_view.name, offset + i) try: logger.info( self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace) ) except ApiException as e: - logger.warning(f"Could not retrieve pod logs due to: {e.body}") + logger.warning("Could not retrieve pod logs due to: %s", e.body) def _create_kubernetes_job(self, job_id, paths, feature_view): try: @@ -293,6 +287,8 @@ def _create_kubernetes_job(self, job_id, paths, feature_view): len(paths), # Create a pod for each parquet file self.batch_engine_config.env, ) + + logger.info("Created job `dataflow-%s` in namespace `%s`", job_id, self.namespace) except FailToCreateError as failures: return BytewaxMaterializationJob(job_id, self.namespace, error=failures) @@ -355,7 +351,7 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0): }, { "name": "BYTEWAX_REPLICAS", - "value": f"{pods}", + "value": "1", }, { "name": "BYTEWAX_KEEP_CONTAINER_ALIVE", @@ -416,7 +412,7 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0): } ], "image": "busybox", - "imagePullPolicy": "Always", + "imagePullPolicy": "ifNotPresent", "name": "init-hostfile", "resources": {}, "securityContext": { @@ -444,7 +440,7 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0): "command": ["sh", "-c", "sh ./entrypoint.sh"], "env": job_env, "image": self.batch_engine_config.image, - "imagePullPolicy": "Always", + "imagePullPolicy": "ifNotPresent", "name": "process", "ports": [ { diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py index 4105be90ee..ee91a42d81 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py @@ -35,13 +35,16 @@ def status(self): if job_status.active is not None: if job_status.completion_time is None: return MaterializationJobStatus.RUNNING - elif job_status.failed is not None: - self._error = Exception(f"Job {self.job_id()} failed") - return MaterializationJobStatus.ERROR - elif job_status.active is None: - if job_status.completion_time is not None: - if job_status.conditions[0].type == "Complete": - return MaterializationJobStatus.SUCCEEDED + else: + if job_status.completion_time is not None and job_status.conditions[0].type == "Complete": + return MaterializationJobStatus.SUCCEEDED + + if job_status.conditions is not None and job_status.conditions[0].type == "Failed": + self._error = Exception( + f"Job {self.job_id()} failed with reason: " + f"{job_status.conditions[0].message}" + ) + return MaterializationJobStatus.ERROR return MaterializationJobStatus.WAITING def should_be_retried(self): diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py index 23cdc20ef3..76d149c366 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py @@ -1,3 +1,4 @@ +import logging import os import yaml @@ -8,6 +9,7 @@ ) if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) with open("/var/feast/feature_store.yaml") as f: feast_config = yaml.safe_load(f)