Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update job config, add logging, improve materialization memory usage #18

Merged
merged 2 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from bytewax.execution import cluster_main
from bytewax.inputs import ManualInputConfig
from bytewax.outputs import ManualOutputConfig
from tqdm import tqdm

from feast import FeatureStore, FeatureView, RepoConfig
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping
Expand All @@ -31,14 +30,18 @@ def __init__(
self.feature_view = feature_view
self.worker_index = worker_index
self.paths = paths
self.mini_batch_size = int(
os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE)
)

self._run_dataflow()

def process_path(self, path):
logger.info("Processing path %s", path)
dataset = pq.ParquetDataset(path, use_legacy_dataset=False)
batches = []
for fragment in dataset.fragments:
for batch in fragment.to_table().to_batches():
for batch in fragment.to_table().to_batches(max_chunksize=self.mini_batch_size):
batches.append(batch)

return batches
Expand All @@ -47,13 +50,8 @@ def input_builder(self, worker_index, worker_count, _state):
return [(None, self.paths[self.worker_index])]

def output_builder(self, worker_index, worker_count):
def yield_batch(iterable, batch_size):
"""Yield mini-batches from an iterable."""
for i in range(0, len(iterable), batch_size):
yield iterable[i : i + batch_size]

def output_fn(batch):
table = pa.Table.from_batches([batch])
def output_fn(mini_batch):
table = pa.Table.from_batches([mini_batch])

if self.feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
Expand All @@ -68,19 +66,13 @@ def output_fn(batch):
rows_to_write = _convert_arrow_to_proto(
table, self.feature_view, join_key_to_value_type
)
provider = self.feature_store._get_provider()
with tqdm(total=len(rows_to_write)) as progress:
# break rows_to_write to mini-batches
batch_size = int(
os.getenv("BYTEWAX_MINI_BATCH_SIZE", DEFAULT_BATCH_SIZE)
)
for mini_batch in yield_batch(rows_to_write, batch_size):
provider.online_write_batch(
config=self.config,
table=self.feature_view,
data=mini_batch,
progress=progress.update,
)

self.feature_store._get_provider().online_write_batch(
config=self.config,
table=self.feature_view,
data=rows_to_write,
progress=None,
)

return output_fn

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from feast.repo_config import FeastConfigBaseModel
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names, get_default_yaml_file_path

from .bytewax_materialization_job import BytewaxMaterializationJob

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -235,35 +234,30 @@ def _await_path_materialization(
MaterializationJobStatus.WAITING,
MaterializationJobStatus.RUNNING,
):
logger.info(
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
f"(of {total_pods}) running..."
)
logger.info("%s materialization for pods %d-%d (of %d) running...",
feature_view.name, batch_start, batch_end, total_pods)
sleep(30)
logger.info(
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
f"(of {total_pods}) complete with status {job.status()}"
)

logger.info("%s materialization for pods %d-%d (of %d) complete with status %s",
feature_view.name, batch_start, batch_end, total_pods, job.status())
except BaseException as e:
if self.batch_engine_config.print_pod_logs_on_failure:
self._print_pod_logs(job.job_id(), feature_view, batch_start)

logger.info(f"Deleting job {job.job_id()}")
logger.info("Deleting job %s", job.job_id())
try:
self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace)
except ApiException as ae:
logger.warning(f"Could not delete job due to API Error: {ae.body}")
logger.warning("Could not delete job due to API Error: %s", ae.body)
raise e
finally:
logger.info(f"Deleting configmap {self._configmap_name(job_id)}")
logger.info("Deleting configmap %s", self._configmap_name(job_id))
try:
self.v1.delete_namespaced_config_map(
self._configmap_name(job_id), self.namespace
)
except ApiException as ae:
logger.warning(
f"Could not delete configmap due to API Error: {ae.body}"
)
logger.warning("Could not delete configmap due to API Error: %s", ae.body)

return job

Expand All @@ -273,13 +267,13 @@ def _print_pod_logs(self, job_id, feature_view, offset=0):
label_selector=f"job-name={job_id}",
).items
for i, pod in enumerate(pods_list):
logger.info(f"Logging output for {feature_view.name} pod {offset+i}")
logger.info("Logging output for %s pod %d", feature_view.name, offset + i)
try:
logger.info(
self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace)
)
except ApiException as e:
logger.warning(f"Could not retrieve pod logs due to: {e.body}")
logger.warning("Could not retrieve pod logs due to: %s", e.body)

def _create_kubernetes_job(self, job_id, paths, feature_view):
try:
Expand All @@ -293,6 +287,8 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
len(paths), # Create a pod for each parquet file
self.batch_engine_config.env,
)

logger.info("Created job `dataflow-%s` in namespace `%s`", job_id, self.namespace)
except FailToCreateError as failures:
return BytewaxMaterializationJob(job_id, self.namespace, error=failures)

Expand Down Expand Up @@ -355,7 +351,7 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
},
{
"name": "BYTEWAX_REPLICAS",
"value": f"{pods}",
"value": "1",
vstrimaitis marked this conversation as resolved.
Show resolved Hide resolved
},
{
"name": "BYTEWAX_KEEP_CONTAINER_ALIVE",
Expand Down Expand Up @@ -416,7 +412,7 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
}
],
"image": "busybox",
"imagePullPolicy": "Always",
"imagePullPolicy": "ifNotPresent",
"name": "init-hostfile",
"resources": {},
"securityContext": {
Expand Down Expand Up @@ -444,7 +440,7 @@ def _create_job_definition(self, job_id, namespace, pods, env, index_offset=0):
"command": ["sh", "-c", "sh ./entrypoint.sh"],
"env": job_env,
"image": self.batch_engine_config.image,
"imagePullPolicy": "Always",
"imagePullPolicy": "ifNotPresent",
"name": "process",
"ports": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ def status(self):
if job_status.active is not None:
if job_status.completion_time is None:
return MaterializationJobStatus.RUNNING
elif job_status.failed is not None:
self._error = Exception(f"Job {self.job_id()} failed")
return MaterializationJobStatus.ERROR
elif job_status.active is None:
if job_status.completion_time is not None:
if job_status.conditions[0].type == "Complete":
return MaterializationJobStatus.SUCCEEDED
else:
if job_status.completion_time is not None and job_status.conditions[0].type == "Complete":
return MaterializationJobStatus.SUCCEEDED

if job_status.conditions is not None and job_status.conditions[0].type == "Failed":
self._error = Exception(
f"Job {self.job_id()} failed with reason: "
f"{job_status.conditions[0].message}"
)
return MaterializationJobStatus.ERROR
return MaterializationJobStatus.WAITING

def should_be_retried(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os

import yaml
Expand All @@ -8,6 +9,7 @@
)

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
with open("/var/feast/feature_store.yaml") as f:
feast_config = yaml.safe_load(f)

Expand Down
Loading