diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile b/sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile index 963924f38d..a26661ead3 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile @@ -25,5 +25,5 @@ COPY README.md README.md # git dir to infer the version of feast we're installing. # https://github.com/pypa/setuptools_scm#usage-from-docker # I think it also assumes that this dockerfile is being built from the root of the directory. -RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir -e '.[aws,gcp,bytewax]' +RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir '.[aws,gcp,bytewax,snowflake]' diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index e9d6a756b2..31be7a6b89 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -5,7 +5,7 @@ import pyarrow.parquet as pq from bytewax.dataflow import Dataflow # type: ignore from bytewax.execution import cluster_main -from bytewax.inputs import ManualInputConfig, distribute +from bytewax.inputs import ManualInputConfig from bytewax.outputs import ManualOutputConfig from tqdm import tqdm @@ -21,11 +21,13 @@ def __init__( config: RepoConfig, feature_view: FeatureView, paths: List[str], + worker_index: int, ): self.config = config self.feature_store = FeatureStore(config=config) self.feature_view = feature_view + self.worker_index = worker_index self.paths = paths self._run_dataflow() @@ -40,11 +42,7 @@ def process_path(self, path): return batches def input_builder(self, worker_index, worker_count, _state): - worker_paths = distribute(self.paths, worker_index, worker_count) - for path in worker_paths: - yield None, path - - return + return [(None, self.paths[self.worker_index])] def output_builder(self, worker_index, worker_count): def yield_batch(iterable, batch_size): diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 787dd585ff..1c9dc6a6be 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -1,11 +1,14 @@ +import logging import uuid from datetime import datetime +from time import sleep from typing import Callable, List, Literal, Sequence, Union import yaml from kubernetes import client from kubernetes import config as k8s_config from kubernetes import utils +from kubernetes.client.exceptions import ApiException from kubernetes.utils import FailToCreateError from pydantic import StrictStr from tqdm import tqdm @@ -16,6 +19,7 @@ from feast.infra.materialization.batch_materialization_engine import ( BatchMaterializationEngine, MaterializationJob, + MaterializationJobStatus, MaterializationTask, ) from feast.infra.offline_stores.offline_store import OfflineStore @@ -27,6 +31,8 @@ from .bytewax_materialization_job import BytewaxMaterializationJob +logger = logging.getLogger(__name__) + class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): """Batch Materialization Engine config for Bytewax""" @@ -65,11 +71,26 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): """ (optional) additional labels to append to kubernetes objects """ max_parallelism: int = 10 - """ (optional) Maximum number of pods (default 10) allowed to run in parallel per job""" + """ (optional) Maximum number of pods allowed to run in parallel""" + + synchronous: bool = False + """ (optional) If true, wait for materialization for one feature to complete before moving to the next """ + + retry_limit: int = 2 + """ (optional) Maximum number of times to retry a materialization worker pod""" mini_batch_size: int = 1000 """ (optional) Number of rows to process per write operation (default 1000)""" + active_deadline_seconds: int = 86400 + """ (optional) Maximum amount of time a materialization job is allowed to run""" + + job_batch_size: int = 100 + """ (optional) Maximum number of pods to process per job. Only applies to synchronous materialization""" + + print_pod_logs_on_failure: bool = True + """(optional) Print pod logs on job failure. Only applies to synchronous materialization""" + class BytewaxMaterializationEngine(BatchMaterializationEngine): def __init__( @@ -173,8 +194,98 @@ def _materialize_one( ) paths = offline_job.to_remote_storage() + if self.batch_engine_config.synchronous: + offset = 0 + total_pods = len(paths) + batch_size = self.batch_engine_config.job_batch_size + if batch_size < 1: + raise ValueError("job_batch_size must be a value greater than 0") + if batch_size < self.batch_engine_config.max_parallelism: + logger.warning( + "job_batch_size is less than max_parallelism. Setting job_batch_size = max_parallelism" + ) + batch_size = self.batch_engine_config.max_parallelism + + while True: + next_offset = min(offset + batch_size, total_pods) + job = self._await_path_materialization( + paths[offset:next_offset], + feature_view, + offset, + next_offset, + total_pods, + ) + offset += batch_size + if ( + offset >= total_pods + or job.status() == MaterializationJobStatus.ERROR + ): + break + else: + job_id = str(uuid.uuid4()) + job = self._create_kubernetes_job(job_id, paths, feature_view) + + return job + + def _await_path_materialization( + self, paths, feature_view, batch_start, batch_end, total_pods + ): job_id = str(uuid.uuid4()) - return self._create_kubernetes_job(job_id, paths, feature_view) + job = self._create_kubernetes_job(job_id, paths, feature_view) + + try: + while job.status() in ( + MaterializationJobStatus.WAITING, + MaterializationJobStatus.RUNNING, + ): + logger.info( + f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " + f"(of {total_pods}) running..." + ) + sleep(30) + logger.info( + f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " + f"(of {total_pods}) complete with status {job.status()}" + ) + except BaseException as e: + logger.info(f"Deleting job {job.job_id()}") + try: + self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) + except ApiException as ae: + logger.warning(f"Could not delete job due to API Error: {ae.body}") + raise e + finally: + logger.info(f"Deleting configmap {self._configmap_name(job_id)}") + try: + self.v1.delete_namespaced_config_map( + self._configmap_name(job_id), self.namespace + ) + except ApiException as ae: + logger.warning( + f"Could not delete configmap due to API Error: {ae.body}" + ) + + if ( + job.status() == MaterializationJobStatus.ERROR + and self.batch_engine_config.print_pod_logs_on_failure + ): + self._print_pod_logs(job.job_id(), feature_view, batch_start) + + return job + + def _print_pod_logs(self, job_id, feature_view, offset=0): + pods_list = self.v1.list_namespaced_pod( + namespace=self.namespace, + label_selector=f"job-name={job_id}", + ).items + for i, pod in enumerate(pods_list): + logger.info(f"Logging output for {feature_view.name} pod {offset+i}") + try: + logger.info( + self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace) + ) + except ApiException as e: + logger.warning(f"Could not retrieve pod logs due to: {e.body}") def _create_kubernetes_job(self, job_id, paths, feature_view): try: @@ -210,7 +321,7 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace): "kind": "ConfigMap", "apiVersion": "v1", "metadata": { - "name": f"feast-{job_id}", + "name": self._configmap_name(job_id), "labels": {**labels, **self.batch_engine_config.labels}, }, "data": { @@ -223,7 +334,10 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace): body=configmap_manifest, ) - def _create_job_definition(self, job_id, namespace, pods, env): + def _configmap_name(self, job_id): + return f"feast-{job_id}" + + def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0): """Create a kubernetes job definition.""" job_env = [ {"name": "RUST_BACKTRACE", "value": "full"}, @@ -284,8 +398,10 @@ def _create_job_definition(self, job_id, namespace, pods, env): }, "spec": { "ttlSecondsAfterFinished": 3600, + "backoffLimit": self.batch_engine_config.retry_limit, "completions": pods, "parallelism": min(pods, self.batch_engine_config.max_parallelism), + "activeDeadlineSeconds": self.batch_engine_config.active_deadline_seconds, "completionMode": "Indexed", "template": { "metadata": { @@ -324,7 +440,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): }, { "mountPath": "/var/feast/", - "name": f"feast-{job_id}", + "name": self._configmap_name(job_id), }, ], } @@ -355,7 +471,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): {"mountPath": "/etc/bytewax", "name": "hostfile"}, { "mountPath": "/var/feast/", - "name": f"feast-{job_id}", + "name": self._configmap_name(job_id), }, ], } @@ -365,13 +481,13 @@ def _create_job_definition(self, job_id, namespace, pods, env): { "configMap": { "defaultMode": 420, - "name": f"feast-{job_id}", + "name": self._configmap_name(job_id), }, "name": "python-files", }, { - "configMap": {"name": f"feast-{job_id}"}, - "name": f"feast-{job_id}", + "configMap": {"name": self._configmap_name(job_id)}, + "name": self._configmap_name(job_id), }, ], }, diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py index 77d2149eb5..4105be90ee 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py @@ -36,10 +36,13 @@ def status(self): if job_status.completion_time is None: return MaterializationJobStatus.RUNNING elif job_status.failed is not None: + self._error = Exception(f"Job {self.job_id()} failed") return MaterializationJobStatus.ERROR - elif job_status.active is None and job_status.succeeded is not None: - if job_status.conditions[0].type == "Complete": - return MaterializationJobStatus.SUCCEEDED + elif job_status.active is None: + if job_status.completion_time is not None: + if job_status.conditions[0].type == "Complete": + return MaterializationJobStatus.SUCCEEDED + return MaterializationJobStatus.WAITING def should_be_retried(self): return False diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py index e3d95e2a75..23cdc20ef3 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py @@ -1,3 +1,5 @@ +import os + import yaml from feast import FeatureStore, RepoConfig @@ -19,4 +21,5 @@ config, store.get_feature_view(bytewax_config["feature_view"]), bytewax_config["paths"], + int(os.environ["JOB_COMPLETION_INDEX"]), )