From 948c7fbccd1672018553f89a8a87f77027d3ec4c Mon Sep 17 00:00:00 2001 From: James Crabtree Date: Tue, 12 Sep 2023 15:53:23 -0500 Subject: [PATCH 01/10] SAASMLOPS-767 wait for jobs to complete Signed-off-by: James Crabtree --- .../bytewax/bytewax_materialization_engine.py | 48 ++++++++++++++++++- .../bytewax/bytewax_materialization_job.py | 9 ++-- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 787dd585ff..dfa2f75003 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -1,3 +1,4 @@ +import sys import uuid from datetime import datetime from typing import Callable, List, Literal, Sequence, Union @@ -7,14 +8,17 @@ from kubernetes import config as k8s_config from kubernetes import utils from kubernetes.utils import FailToCreateError +from kubernetes.client.exceptions import ApiException from pydantic import StrictStr from tqdm import tqdm +import logging from feast import FeatureView, RepoConfig from feast.batch_feature_view import BatchFeatureView from feast.entity import Entity from feast.infra.materialization.batch_materialization_engine import ( BatchMaterializationEngine, + MaterializationJobStatus, MaterializationJob, MaterializationTask, ) @@ -24,9 +28,16 @@ from feast.repo_config import FeastConfigBaseModel from feast.stream_feature_view import StreamFeatureView from feast.utils import _get_column_names, get_default_yaml_file_path +from time import sleep +import signal from .bytewax_materialization_job import BytewaxMaterializationJob +logger = logging.getLogger(__name__) + +def term_handler(signum, frame): + logger.info("Received SIGTERM. Shutting down") + sys.exit(0) class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): """Batch Materialization Engine config for Bytewax""" @@ -65,7 +76,13 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): """ (optional) additional labels to append to kubernetes objects """ max_parallelism: int = 10 - """ (optional) Maximum number of pods (default 10) allowed to run in parallel per job""" + """ (optional) Maximum number of pods allowed to run in parallel per job""" + + synchronous: bool = False + """ (optional) If true, wait for materialization for one feature to complete before moving to the next """ + + retry_limit: int = 2 + """ (optional) Maximum number of times to retry a materialization worker pod""" mini_batch_size: int = 1000 """ (optional) Number of rows to process per write operation (default 1000)""" @@ -99,6 +116,8 @@ def __init__( self.batch_engine_config = repo_config.batch_engine self.namespace = self.batch_engine_config.namespace + signal.signal(signal.SIGTERM, term_handler) + def update( self, project: str, @@ -174,7 +193,31 @@ def _materialize_one( paths = offline_job.to_remote_storage() job_id = str(uuid.uuid4()) - return self._create_kubernetes_job(job_id, paths, feature_view) + job = self._create_kubernetes_job(job_id, paths, feature_view) + if self.batch_engine_config.synchronous: + try: + while job.status() in (MaterializationJobStatus.WAITING, MaterializationJobStatus.RUNNING): + logger.info(f"{feature_view.name} materialization still running...") + sleep(30) + logger.info(f"{feature_view.name} materialization complete with status {job.status()}") + except (KeyboardInterrupt, BaseException) as e: + logger.info(f"Killing job {job.job_id()}") + self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) + raise e + self._print_pod_logs(job.job_id(), feature_view) + return job + + def _print_pod_logs(self, job_id, feature_view): + pods_list = self.v1.list_namespaced_pod( + namespace=self.namespace, + label_selector=f"job-name={job_id}", + ).items + for i, pod in enumerate(pods_list): + logger.info(f"Logging output for {feature_view.name} pod {i}") + try: + logger.info(self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace)) + except ApiException as e: + logger.warning(f"Could not retrieve pod logs due to: {e.body}") def _create_kubernetes_job(self, job_id, paths, feature_view): try: @@ -284,6 +327,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): }, "spec": { "ttlSecondsAfterFinished": 3600, + "backoffLimit": self.batch_engine_config.retry_limit, "completions": pods, "parallelism": min(pods, self.batch_engine_config.max_parallelism), "completionMode": "Indexed", diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py index 77d2149eb5..4105be90ee 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py @@ -36,10 +36,13 @@ def status(self): if job_status.completion_time is None: return MaterializationJobStatus.RUNNING elif job_status.failed is not None: + self._error = Exception(f"Job {self.job_id()} failed") return MaterializationJobStatus.ERROR - elif job_status.active is None and job_status.succeeded is not None: - if job_status.conditions[0].type == "Complete": - return MaterializationJobStatus.SUCCEEDED + elif job_status.active is None: + if job_status.completion_time is not None: + if job_status.conditions[0].type == "Complete": + return MaterializationJobStatus.SUCCEEDED + return MaterializationJobStatus.WAITING def should_be_retried(self): return False From 0ec030fb79f3ba8f0275a582865409c20d6592fa Mon Sep 17 00:00:00 2001 From: James Crabtree Date: Tue, 19 Sep 2023 13:20:23 -0500 Subject: [PATCH 02/10] SAASMLOPS-805 Stopgap change to fix duplicate materialization of data Signed-off-by: James Crabtree --- .../infra/materialization/contrib/bytewax/Dockerfile | 2 +- .../contrib/bytewax/bytewax_materialization_engine.py | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile b/sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile index 963924f38d..a26661ead3 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/Dockerfile @@ -25,5 +25,5 @@ COPY README.md README.md # git dir to infer the version of feast we're installing. # https://github.com/pypa/setuptools_scm#usage-from-docker # I think it also assumes that this dockerfile is being built from the root of the directory. -RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir -e '.[aws,gcp,bytewax]' +RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir '.[aws,gcp,bytewax,snowflake]' diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index dfa2f75003..e2be9ce3ea 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -200,9 +200,12 @@ def _materialize_one( logger.info(f"{feature_view.name} materialization still running...") sleep(30) logger.info(f"{feature_view.name} materialization complete with status {job.status()}") - except (KeyboardInterrupt, BaseException) as e: + except BaseException as e: logger.info(f"Killing job {job.job_id()}") - self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) + try: + self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) + except ApiException as de: + logger.warning(f"Could not delete job due to API Error: {ae.body}") raise e self._print_pod_logs(job.job_id(), feature_view) return job @@ -328,8 +331,8 @@ def _create_job_definition(self, job_id, namespace, pods, env): "spec": { "ttlSecondsAfterFinished": 3600, "backoffLimit": self.batch_engine_config.retry_limit, - "completions": pods, - "parallelism": min(pods, self.batch_engine_config.max_parallelism), + "completions": 1, + "parallelism": 1, "completionMode": "Indexed", "template": { "metadata": { From 71c02f0930338a06ed99ef1c4e4a54cb7cb896c1 Mon Sep 17 00:00:00 2001 From: James Crabtree Date: Tue, 19 Sep 2023 13:26:32 -0500 Subject: [PATCH 03/10] SAASMLOPS-805 save BYTEWAX_REPLICAS=1 Signed-off-by: James Crabtree --- .../contrib/bytewax/bytewax_materialization_engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index e2be9ce3ea..f5e3902e99 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -293,7 +293,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): }, { "name": "BYTEWAX_REPLICAS", - "value": f"{pods}", + "value": "1", }, { "name": "BYTEWAX_KEEP_CONTAINER_ALIVE", @@ -349,7 +349,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): "env": [ { "name": "BYTEWAX_REPLICAS", - "value": f"{pods}", + "value": "1", } ], "image": "busybox", From 2313c1dc5bfd4c7f9ec3f41843ab37be6132df9b Mon Sep 17 00:00:00 2001 From: James Crabtree <77012363+james-crabtree-sp@users.noreply.github.com> Date: Wed, 20 Sep 2023 10:14:45 -0500 Subject: [PATCH 04/10] SAASMLOPS-809 fix bytewax workers so they only process a single file (#6) * SAASMLOPS-809 fix bytewax workers so they only process a single file * SAASMLOPS-809 fix newlines Signed-off-by: James Crabtree --- .../bytewax/bytewax_materialization_dataflow.py | 16 ++++++++++------ .../bytewax/bytewax_materialization_engine.py | 10 +++++----- .../materialization/contrib/bytewax/dataflow.py | 2 ++ 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index e9d6a756b2..c75eb7c2a1 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -5,7 +5,7 @@ import pyarrow.parquet as pq from bytewax.dataflow import Dataflow # type: ignore from bytewax.execution import cluster_main -from bytewax.inputs import ManualInputConfig, distribute +from bytewax.inputs import ManualInputConfig from bytewax.outputs import ManualOutputConfig from tqdm import tqdm @@ -21,17 +21,25 @@ def __init__( config: RepoConfig, feature_view: FeatureView, paths: List[str], + worker_index: int ): self.config = config self.feature_store = FeatureStore(config=config) self.feature_view = feature_view + self.worker_index = worker_index self.paths = paths self._run_dataflow() def process_path(self, path): +<<<<<<< HEAD dataset = pq.ParquetDataset(path, use_legacy_dataset=False) +======= + fs = s3fs.S3FileSystem() + logger.info(f"Processing path {path}") + dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False) +>>>>>>> 15c523a2 (SAASMLOPS-809 fix bytewax workers so they only process a single file (#6)) batches = [] for fragment in dataset.fragments: for batch in fragment.to_table().to_batches(): @@ -40,11 +48,7 @@ def process_path(self, path): return batches def input_builder(self, worker_index, worker_count, _state): - worker_paths = distribute(self.paths, worker_index, worker_count) - for path in worker_paths: - yield None, path - - return + return [(None, self.paths[self.worker_index])] def output_builder(self, worker_index, worker_count): def yield_batch(iterable, batch_size): diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index f5e3902e99..1e7a1df9fe 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -205,7 +205,7 @@ def _materialize_one( try: self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) except ApiException as de: - logger.warning(f"Could not delete job due to API Error: {ae.body}") + logger.warning(f"Could not delete job due to API Error: {de.body}") raise e self._print_pod_logs(job.job_id(), feature_view) return job @@ -293,7 +293,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): }, { "name": "BYTEWAX_REPLICAS", - "value": "1", + "value": f"{pods}", }, { "name": "BYTEWAX_KEEP_CONTAINER_ALIVE", @@ -331,8 +331,8 @@ def _create_job_definition(self, job_id, namespace, pods, env): "spec": { "ttlSecondsAfterFinished": 3600, "backoffLimit": self.batch_engine_config.retry_limit, - "completions": 1, - "parallelism": 1, + "completions": pods, + "parallelism": min(pods, self.batch_engine_config.max_parallelism), "completionMode": "Indexed", "template": { "metadata": { @@ -349,7 +349,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): "env": [ { "name": "BYTEWAX_REPLICAS", - "value": "1", + "value": f"{pods}", } ], "image": "busybox", diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py index e3d95e2a75..1013ee0014 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py @@ -1,3 +1,4 @@ +import os import yaml from feast import FeatureStore, RepoConfig @@ -19,4 +20,5 @@ config, store.get_feature_view(bytewax_config["feature_view"]), bytewax_config["paths"], + int(os.environ["JOB_COMPLETION_INDEX"]) ) From 5f97ed089f08fb529780c984955cc383ceee1350 Mon Sep 17 00:00:00 2001 From: James Crabtree <77012363+james-crabtree-sp@users.noreply.github.com> Date: Fri, 29 Sep 2023 10:16:14 -0500 Subject: [PATCH 05/10] SAASMLOPS-833 add configurable job timeout (#7) * SAASMLOPS-833 add configurable job timeout * SAASMLOPS-833 fix whitespace Signed-off-by: James Crabtree --- .../contrib/bytewax/bytewax_materialization_engine.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 1e7a1df9fe..3c3993b09e 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -87,6 +87,9 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): mini_batch_size: int = 1000 """ (optional) Number of rows to process per write operation (default 1000)""" + active_deadline_seconds: int = 86400 + """ (optional) Maximum amount of time a materialization job is allowed to run""" + class BytewaxMaterializationEngine(BatchMaterializationEngine): def __init__( @@ -333,6 +336,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): "backoffLimit": self.batch_engine_config.retry_limit, "completions": pods, "parallelism": min(pods, self.batch_engine_config.max_parallelism), + "activeDeadlineSeconds": self.batch_engine_config.active_deadline_seconds, "completionMode": "Indexed", "template": { "metadata": { From 65c039adde784ab8a16e218418e7af9836c9d469 Mon Sep 17 00:00:00 2001 From: James Crabtree Date: Fri, 6 Oct 2023 16:35:51 -0500 Subject: [PATCH 06/10] develop Run large materializations in batches of pods Signed-off-by: James Crabtree --- .../bytewax_materialization_dataflow.py | 8 +- .../bytewax/bytewax_materialization_engine.py | 122 +++++++++++++----- .../contrib/bytewax/dataflow.py | 3 +- 3 files changed, 91 insertions(+), 42 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index c75eb7c2a1..31be7a6b89 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -21,7 +21,7 @@ def __init__( config: RepoConfig, feature_view: FeatureView, paths: List[str], - worker_index: int + worker_index: int, ): self.config = config self.feature_store = FeatureStore(config=config) @@ -33,13 +33,7 @@ def __init__( self._run_dataflow() def process_path(self, path): -<<<<<<< HEAD dataset = pq.ParquetDataset(path, use_legacy_dataset=False) -======= - fs = s3fs.S3FileSystem() - logger.info(f"Processing path {path}") - dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False) ->>>>>>> 15c523a2 (SAASMLOPS-809 fix bytewax workers so they only process a single file (#6)) batches = [] for fragment in dataset.fragments: for batch in fragment.to_table().to_batches(): diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 3c3993b09e..2f9e68381e 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -1,25 +1,25 @@ -import sys +import logging import uuid from datetime import datetime +from time import sleep from typing import Callable, List, Literal, Sequence, Union import yaml from kubernetes import client from kubernetes import config as k8s_config from kubernetes import utils -from kubernetes.utils import FailToCreateError from kubernetes.client.exceptions import ApiException +from kubernetes.utils import FailToCreateError from pydantic import StrictStr from tqdm import tqdm -import logging from feast import FeatureView, RepoConfig from feast.batch_feature_view import BatchFeatureView from feast.entity import Entity from feast.infra.materialization.batch_materialization_engine import ( BatchMaterializationEngine, - MaterializationJobStatus, MaterializationJob, + MaterializationJobStatus, MaterializationTask, ) from feast.infra.offline_stores.offline_store import OfflineStore @@ -28,16 +28,11 @@ from feast.repo_config import FeastConfigBaseModel from feast.stream_feature_view import StreamFeatureView from feast.utils import _get_column_names, get_default_yaml_file_path -from time import sleep -import signal from .bytewax_materialization_job import BytewaxMaterializationJob logger = logging.getLogger(__name__) -def term_handler(signum, frame): - logger.info("Received SIGTERM. Shutting down") - sys.exit(0) class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): """Batch Materialization Engine config for Bytewax""" @@ -90,6 +85,12 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): active_deadline_seconds: int = 86400 """ (optional) Maximum amount of time a materialization job is allowed to run""" + job_batch_size: int = 100 + """ (optional) Maximum number of pods to process per job. Only applies to synchronous materialization""" + + print_pod_logs_on_failure: bool = True + """(optional) Print pod logs on job failure. Only applies to synchronous materialization""" + class BytewaxMaterializationEngine(BatchMaterializationEngine): def __init__( @@ -119,8 +120,6 @@ def __init__( self.batch_engine_config = repo_config.batch_engine self.namespace = self.batch_engine_config.namespace - signal.signal(signal.SIGTERM, term_handler) - def update( self, project: str, @@ -195,33 +194,85 @@ def _materialize_one( ) paths = offline_job.to_remote_storage() + if self.batch_engine_config.synchronous: + offset = 0 + total_pods = len(paths) + batch_size = self.batch_engine_config.job_batch_size + if batch_size < 1: + raise ValueError("job_batch_size must be a value greater than 0") + + while True: + next_offset = min(offset + batch_size, total_pods) + job = self._await_path_materialization( + paths[offset:next_offset], + feature_view, + offset, + next_offset, + total_pods, + ) + offset += batch_size + if offset >= total_pods: + break + else: + job_id = str(uuid.uuid4()) + job = self._create_kubernetes_job(job_id, paths, feature_view) + + return job + + def _await_path_materialization( + self, paths, feature_view, batch_start, batch_end, total_pods + ): job_id = str(uuid.uuid4()) job = self._create_kubernetes_job(job_id, paths, feature_view) - if self.batch_engine_config.synchronous: + + try: + while job.status() in ( + MaterializationJobStatus.WAITING, + MaterializationJobStatus.RUNNING, + ): + logger.info( + f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " + f"(of {total_pods}) running..." + ) + sleep(30) + logger.info( + f"{feature_view.name} materialization for pods {batch_start}-{batch_end} " + f"(of {total_pods}) complete with status {job.status()}" + ) + except BaseException as e: + if self.batch_engine_config.print_pod_logs_on_failure: + self._print_pod_logs(job.job_id(), feature_view, batch_start) + + logger.info(f"Deleting job {job.job_id()}") try: - while job.status() in (MaterializationJobStatus.WAITING, MaterializationJobStatus.RUNNING): - logger.info(f"{feature_view.name} materialization still running...") - sleep(30) - logger.info(f"{feature_view.name} materialization complete with status {job.status()}") - except BaseException as e: - logger.info(f"Killing job {job.job_id()}") - try: - self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) - except ApiException as de: - logger.warning(f"Could not delete job due to API Error: {de.body}") - raise e - self._print_pod_logs(job.job_id(), feature_view) + self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) + except ApiException as ae: + logger.warning(f"Could not delete job due to API Error: {ae.body}") + raise e + finally: + logger.info(f"Deleting configmap {self._configmap_name(job_id)}") + try: + self.v1.delete_namespaced_config_map( + self._configmap_name(job_id), self.namespace + ) + except ApiException as ae: + logger.warning( + f"Could not delete configmap due to API Error: {ae.body}" + ) + return job - def _print_pod_logs(self, job_id, feature_view): + def _print_pod_logs(self, job_id, feature_view, offset=0): pods_list = self.v1.list_namespaced_pod( namespace=self.namespace, label_selector=f"job-name={job_id}", ).items for i, pod in enumerate(pods_list): - logger.info(f"Logging output for {feature_view.name} pod {i}") + logger.info(f"Logging output for {feature_view.name} pod {offset+i}") try: - logger.info(self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace)) + logger.info( + self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace) + ) except ApiException as e: logger.warning(f"Could not retrieve pod logs due to: {e.body}") @@ -259,7 +310,7 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace): "kind": "ConfigMap", "apiVersion": "v1", "metadata": { - "name": f"feast-{job_id}", + "name": self._configmap_name(job_id), "labels": {**labels, **self.batch_engine_config.labels}, }, "data": { @@ -272,7 +323,10 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace): body=configmap_manifest, ) - def _create_job_definition(self, job_id, namespace, pods, env): + def _configmap_name(self, job_id): + return f"feast-{job_id}" + + def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0): """Create a kubernetes job definition.""" job_env = [ {"name": "RUST_BACKTRACE", "value": "full"}, @@ -375,7 +429,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): }, { "mountPath": "/var/feast/", - "name": f"feast-{job_id}", + "name": self._configmap_name(job_id), }, ], } @@ -406,7 +460,7 @@ def _create_job_definition(self, job_id, namespace, pods, env): {"mountPath": "/etc/bytewax", "name": "hostfile"}, { "mountPath": "/var/feast/", - "name": f"feast-{job_id}", + "name": self._configmap_name(job_id), }, ], } @@ -416,13 +470,13 @@ def _create_job_definition(self, job_id, namespace, pods, env): { "configMap": { "defaultMode": 420, - "name": f"feast-{job_id}", + "name": self._configmap_name(job_id), }, "name": "python-files", }, { - "configMap": {"name": f"feast-{job_id}"}, - "name": f"feast-{job_id}", + "configMap": {"name": self._configmap_name(job_id)}, + "name": self._configmap_name(job_id), }, ], }, diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py index 1013ee0014..23cdc20ef3 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/dataflow.py @@ -1,4 +1,5 @@ import os + import yaml from feast import FeatureStore, RepoConfig @@ -20,5 +21,5 @@ config, store.get_feature_view(bytewax_config["feature_view"]), bytewax_config["paths"], - int(os.environ["JOB_COMPLETION_INDEX"]) + int(os.environ["JOB_COMPLETION_INDEX"]), ) From cbda2bfbefb5b02a39f1340ee3629d77b1c2b315 Mon Sep 17 00:00:00 2001 From: James Crabtree Date: Thu, 12 Oct 2023 11:31:24 -0500 Subject: [PATCH 07/10] master Set job_batch_size at least equal to max_parallelism Signed-off-by: James Crabtree --- .../contrib/bytewax/bytewax_materialization_engine.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 2f9e68381e..fda8453d68 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -200,6 +200,11 @@ def _materialize_one( batch_size = self.batch_engine_config.job_batch_size if batch_size < 1: raise ValueError("job_batch_size must be a value greater than 0") + if batch_size < self.batch_engine_config.max_parallelism: + logger.warning( + "job_batch_size is less than max_parallelism. Setting job_batch_size = max_parallelism" + ) + batch_size = self.batch_engine_config.max_parallelism while True: next_offset = min(offset + batch_size, total_pods) From c35b0c144d3902e9b445a8e5e5145803e878466f Mon Sep 17 00:00:00 2001 From: James Crabtree Date: Thu, 12 Oct 2023 11:41:31 -0500 Subject: [PATCH 08/10] master clarity max_parallelism description Signed-off-by: James Crabtree --- .../contrib/bytewax/bytewax_materialization_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index fda8453d68..29916b47db 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -71,7 +71,7 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): """ (optional) additional labels to append to kubernetes objects """ max_parallelism: int = 10 - """ (optional) Maximum number of pods allowed to run in parallel per job""" + """ (optional) Maximum number of pods allowed to run in parallel""" synchronous: bool = False """ (optional) If true, wait for materialization for one feature to complete before moving to the next """ From 5a8018d40dca3f66e1cadc7650614e0a43db8e97 Mon Sep 17 00:00:00 2001 From: James Crabtree Date: Tue, 24 Oct 2023 20:15:07 -0500 Subject: [PATCH 09/10] master resolve bug that causes materialization to continue after job error Signed-off-by: James Crabtree --- .../contrib/bytewax/bytewax_materialization_engine.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 29916b47db..5f85a1f001 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -216,7 +216,10 @@ def _materialize_one( total_pods, ) offset += batch_size - if offset >= total_pods: + if ( + offset >= total_pods + or job.status() == MaterializationJobStatus.ERROR + ): break else: job_id = str(uuid.uuid4()) From 938b9030e3e78078e587c0261cde01600a146495 Mon Sep 17 00:00:00 2001 From: James Crabtree Date: Tue, 24 Oct 2023 20:21:09 -0500 Subject: [PATCH 10/10] master resolve bug causing pod logs to not be printed Signed-off-by: James Crabtree --- .../contrib/bytewax/bytewax_materialization_engine.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 5f85a1f001..1c9dc6a6be 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -248,9 +248,6 @@ def _await_path_materialization( f"(of {total_pods}) complete with status {job.status()}" ) except BaseException as e: - if self.batch_engine_config.print_pod_logs_on_failure: - self._print_pod_logs(job.job_id(), feature_view, batch_start) - logger.info(f"Deleting job {job.job_id()}") try: self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace) @@ -268,6 +265,12 @@ def _await_path_materialization( f"Could not delete configmap due to API Error: {ae.body}" ) + if ( + job.status() == MaterializationJobStatus.ERROR + and self.batch_engine_config.print_pod_logs_on_failure + ): + self._print_pod_logs(job.job_id(), feature_view, batch_start) + return job def _print_pod_logs(self, job_id, feature_view, offset=0):