Skip to content

Commit

Permalink
Update job config, add logging, improve materialization memory usage (#…
Browse files Browse the repository at this point in the history
…18)

* Update job config, add logging, improve materialization memory usage

* Use lazy formatting for logging
  • Loading branch information
mjurkus authored Nov 13, 2023
1 parent e735eb7 commit 0ae8b14
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from bytewax.execution import cluster_main
from bytewax.inputs import ManualInputConfig
from bytewax.outputs import ManualOutputConfig
from tqdm import tqdm

from feast import FeatureStore, FeatureView, RepoConfig
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping
Expand All @@ -31,14 +30,18 @@ def __init__(
self.feature_view = feature_view
self.worker_index = worker_index
self.paths = paths
self.mini_batch_size = int(
os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE)
)

self._run_dataflow()

def process_path(self, path):
logger.info("Processing path %s", path)
dataset = pq.ParquetDataset(path, use_legacy_dataset=False)
batches = []
for fragment in dataset.fragments:
for batch in fragment.to_table().to_batches():
for batch in fragment.to_table().to_batches(max_chunksize=self.mini_batch_size):
batches.append(batch)

return batches
Expand All @@ -47,13 +50,8 @@ def input_builder(self, worker_index, worker_count, _state):
return [(None, self.paths[self.worker_index])]

def output_builder(self, worker_index, worker_count):
def yield_batch(iterable, batch_size):
"""Yield mini-batches from an iterable."""
for i in range(0, len(iterable), batch_size):
yield iterable[i : i + batch_size]

def output_fn(batch):
table = pa.Table.from_batches([batch])
def output_fn(mini_batch):
table = pa.Table.from_batches([mini_batch])

if self.feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
Expand All @@ -68,19 +66,13 @@ def output_fn(batch):
rows_to_write = _convert_arrow_to_proto(
table, self.feature_view, join_key_to_value_type
)
provider = self.feature_store._get_provider()
with tqdm(total=len(rows_to_write)) as progress:
# break rows_to_write to mini-batches
batch_size = int(
os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE)
)
for mini_batch in yield_batch(rows_to_write, batch_size):
provider.online_write_batch(
config=self.config,
table=self.feature_view,
data=mini_batch,
progress=progress.update,
)

self.feature_store._get_provider().online_write_batch(
config=self.config,
table=self.feature_view,
data=rows_to_write,
progress=None,
)

return output_fn

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from feast.repo_config import FeastConfigBaseModel
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names, get_default_yaml_file_path

from .bytewax_materialization_job import BytewaxMaterializationJob

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -235,35 +234,30 @@ def _await_path_materialization(
MaterializationJobStatus.WAITING,
MaterializationJobStatus.RUNNING,
):
logger.info(
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
f"(of {total_pods}) running..."
)
logger.info("%s materialization for pods %d-%d (of %d) running...",
feature_view.name, batch_start, batch_end, total_pods)
sleep(30)
logger.info(
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
f"(of {total_pods}) complete with status {job.status()}"
)

logger.info("%s materialization for pods %d-%d (of %d) complete with status %s",
feature_view.name, batch_start, batch_end, total_pods, job.status())
except BaseException as e:
if self.batch_engine_config.print_pod_logs_on_failure:
self._print_pod_logs(job.job_id(), feature_view, batch_start)

logger.info(f"Deleting job {job.job_id()}")
logger.info("Deleting job %s", job.job_id())
try:
self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace)
except ApiException as ae:
logger.warning(f"Could not delete job due to API Error: {ae.body}")
logger.warning("Could not delete job due to API Error: %s", ae.body)
raise e
finally:
logger.info(f"Deleting configmap {self._configmap_name(job_id)}")
logger.info("Deleting configmap %s", self._configmap_name(job_id))
try:
self.v1.delete_namespaced_config_map(
self._configmap_name(job_id), self.namespace
)
except ApiException as ae:
logger.warning(
f"Could not delete configmap due to API Error: {ae.body}"
)
logger.warning("Could not delete configmap due to API Error: %s", ae.body)

return job

Expand All @@ -273,13 +267,13 @@ def _print_pod_logs(self, job_id, feature_view, offset=0):
label_selector=f"job-name={job_id}",
).items
for i, pod in enumerate(pods_list):
logger.info(f"Logging output for {feature_view.name} pod {offset+i}")
logger.info("Logging output for %s pod %d", feature_view.name, offset + i)
try:
logger.info(
self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace)
)
except ApiException as e:
logger.warning(f"Could not retrieve pod logs due to: {e.body}")
logger.warning("Could not retrieve pod logs due to: %s", e.body)

def _create_kubernetes_job(self, job_id, paths, feature_view):
try:
Expand All @@ -293,6 +287,8 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
len(paths), # Create a pod for each parquet file
self.batch_engine_config.env,
)

logger.info("Created job `dataflow-%s` in namespace `%s`", job_id, self.namespace)
except FailToCreateError as failures:
return BytewaxMaterializationJob(job_id, self.namespace, error=failures)

Expand Down Expand Up @@ -355,7 +351,7 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
},
{
"name": "BYTEWAX_REPLICAS",
"value": f"{pods}",
"value": "1",
},
{
"name": "BYTEWAX_KEEP_CONTAINER_ALIVE",
Expand Down Expand Up @@ -416,7 +412,7 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
}
],
"image": "busybox",
"imagePullPolicy": "Always",
"imagePullPolicy": "ifNotPresent",
"name": "init-hostfile",
"resources": {},
"securityContext": {
Expand Down Expand Up @@ -444,7 +440,7 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
"command": ["sh", "-c", "sh ./entrypoint.sh"],
"env": job_env,
"image": self.batch_engine_config.image,
"imagePullPolicy": "Always",
"imagePullPolicy": "ifNotPresent",
"name": "process",
"ports": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ def status(self):
if job_status.active is not None:
if job_status.completion_time is None:
return MaterializationJobStatus.RUNNING
elif job_status.failed is not None:
self._error = Exception(f"Job {self.job_id()} failed")
return MaterializationJobStatus.ERROR
elif job_status.active is None:
if job_status.completion_time is not None:
if job_status.conditions[0].type == "Complete":
return MaterializationJobStatus.SUCCEEDED
else:
if job_status.completion_time is not None and job_status.conditions[0].type == "Complete":
return MaterializationJobStatus.SUCCEEDED

if job_status.conditions is not None and job_status.conditions[0].type == "Failed":
self._error = Exception(
f"Job {self.job_id()} failed with reason: "
f"{job_status.conditions[0].message}"
)
return MaterializationJobStatus.ERROR
return MaterializationJobStatus.WAITING

def should_be_retried(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os

import yaml
Expand All @@ -8,6 +9,7 @@
)

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
with open("/var/feast/feature_store.yaml") as f:
feast_config = yaml.safe_load(f)

Expand Down

0 comments on commit 0ae8b14

Please sign in to comment.