Skip to content

Commit

Permalink
feat: Initial Bytewax materialization engine (#2974)
Browse files Browse the repository at this point in the history
* feat: Initial Bytewax materialization engine

Signed-off-by: Dan Herrera <[email protected]>

* Respond to PR feedback

- Add integration test, by factoring out shared consistency test.
- Make the number of Pods dynamic, based on the number of .parquet
  file paths.
- Add instructions for creating a bytewax test cluster for
  integration testing.

Signed-off-by: Dan Herrera <[email protected]>

* Mark bytewax test to be skipped.

Signed-off-by: Dan Herrera <[email protected]>

* Remove unused offline store reference.

Signed-off-by: Dan Herrera <[email protected]>

Signed-off-by: Dan Herrera <[email protected]>
  • Loading branch information
whoahbot authored Aug 15, 2022
1 parent 41851be commit 55c61f9
Show file tree
Hide file tree
Showing 16 changed files with 784 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs/reference/batch-materialization/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Batch materialization

Please see [Batch Materialization Engine](../../getting-started/architecture-and-components/batch-materialization-engine.md) for an explanation of batch materialization engines.

{% page-ref page="bytewax.md" %}
59 changes: 59 additions & 0 deletions docs/reference/batch-materialization/bytewax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Bytewax

## Description

The [Bytewax](https://bytewax.io) batch materialization engine provides an execution
engine for batch materializing operations (`materialize` and `materialize-incremental`).

### Guide

In order to use the Bytewax materialization engine, you will need a [Kubernetes](https://kubernetes.io/) cluster running version 1.22.10 or greater.

#### Kubernetes Authentication

The Bytewax materialization engine loads authentication and cluster information from the [kubeconfig file](https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/). By default, kubectl looks for a file named `config` in the `$HOME/.kube directory`. You can specify other kubeconfig files by setting the `KUBECONFIG` environment variable.

#### Resource Authentication

Bytewax jobs can be configured to access [Kubernetes secrets](https://kubernetes.io/docs/concepts/configuration/secret/) as environment variables to access online and offline stores during job runs.

To configure secrets, first create them using `kubectl`:

``` shell
kubectl create secret generic -n bytewax aws-credentials --from-literal=aws-access-key-id='<access key id>' --from-literal=aws-secret-access-key='<secret access key>'
```

Then configure them in the batch_engine section of `feature_store.yaml`:

``` yaml
batch_engine:
type: bytewax
namespace: bytewax
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: aws-access-key-id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-credentials
key: aws-secret-access-key
```
#### Configuration
The Bytewax materialization engine is configured through the The `feature_store.yaml` configuration file:

``` yaml
batch_engine:
type: bytewax
namespace: bytewax
image: bytewax/bytewax-feast:latest
```

The `namespace` configuration directive specifies which Kubernetes [namespace](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/) jobs, services and configuration maps will be created in.

The `image` parameter specifies which container image to use when running the materialization job. To create a custom image based on this container, please see the [GitHub repository](https://github.com/bytewax/bytewax-feast) for this image.

10 changes: 9 additions & 1 deletion sdk/python/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,14 @@ Local Engine
(Alpha) Lambda Based Engine
---------------------------

.. autoclass:: feast.infra.materialization.lambda.lambda_engine
.. automodule:: feast.infra.materialization.lambda.lambda_engine
:members:
:noindex:


Bytewax Engine
---------------------------

.. automodule:: feast.infra.materialization.contrib.bytewax
:members:
:noindex:
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
feast.infra.materialization.contrib.bytewax package
=================================================================

Submodules
----------

feast.infra.materialization.contrib.bytewax.bytewax\_materialization\_engine
----------------------------------------------------------------------

.. automodule:: feast.infra.materialization.contrib.bytewax.bytewax_materialization_engine
:members:
:undoc-members:
:show-inheritance:

feast.infra.materialization.contrib.bytewax.bytewax\_materialization\_job
----------------------------------------------------------------------

.. automodule:: feast.infra.materialization.contrib.bytewax.bytewax_materialization_job
:members:
:undoc-members:
:show-inheritance:

Module contents
---------------

.. automodule:: feast.infra.materialization.contrib.bytewax
:members:
:undoc-members:
:show-inheritance:
10 changes: 10 additions & 0 deletions sdk/python/docs/source/feast.infra.materialization.contrib.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
feast.infra.materialization.contrib package
==========================================

Subpackages
-----------

.. toctree::
:maxdepth: 4

feast.infra.materialization.contrib.bytewax
8 changes: 8 additions & 0 deletions sdk/python/docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,11 @@ Local Engine
.. autoclass:: feast.infra.materialization.lambda.lambda_engine
:members:
:noindex:


Bytewax Engine
---------------------------

.. automodule:: feast.infra.materialization.contrib.bytewax
:members:
:noindex:
15 changes: 15 additions & 0 deletions sdk/python/feast/infra/materialization/contrib/bytewax/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .bytewax_materialization_dataflow import BytewaxMaterializationDataflow
from .bytewax_materialization_engine import (
BytewaxMaterializationEngine,
BytewaxMaterializationEngineConfig,
)
from .bytewax_materialization_job import BytewaxMaterializationJob
from .bytewax_materialization_task import BytewaxMaterializationTask

__all__ = [
"BytewaxMaterializationTask",
"BytewaxMaterializationJob",
"BytewaxMaterializationDataflow",
"BytewaxMaterializationEngine",
"BytewaxMaterializationEngineConfig",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import List

import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
from bytewax import Dataflow, cluster_main # type: ignore
from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute
from bytewax.parse import proc_env
from tqdm import tqdm

from feast import FeatureStore, FeatureView, RepoConfig
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping


class BytewaxMaterializationDataflow:
def __init__(
self,
config: RepoConfig,
feature_view: FeatureView,
paths: List[str],
):
self.config = config
self.feature_store = FeatureStore(config=config)

self.feature_view = feature_view
self.paths = paths

self._run_dataflow()

def process_path(self, path):
fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False)
batches = []
for fragment in dataset.fragments:
for batch in fragment.to_table().to_batches():
batches.append(batch)

return batches

def input_builder(self, worker_index, worker_count, resume_epoch):
worker_paths = distribute(self.paths, worker_index, worker_count)
epoch = 0
for path in worker_paths:
yield AdvanceTo(epoch)
yield Emit(path)
epoch += 1

return

def output_builder(self, worker_index, worker_count):
def output_fn(epoch_batch):
_, batch = epoch_batch

table = pa.Table.from_batches([batch])

if self.feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, self.feature_view.batch_source.field_mapping
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in self.feature_view.entity_columns
}

rows_to_write = _convert_arrow_to_proto(
table, self.feature_view, join_key_to_value_type
)
provider = self.feature_store._get_provider()
with tqdm(total=len(rows_to_write)) as progress:
provider.online_write_batch(
config=self.config,
table=self.feature_view,
data=rows_to_write,
progress=progress.update,
)

return output_fn

def _run_dataflow(self):
flow = Dataflow()
flow.flat_map(self.process_path)
flow.capture()
cluster_main(
flow,
ManualInputConfig(self.input_builder),
self.output_builder,
**proc_env(),
)
Loading

0 comments on commit 55c61f9

Please sign in to comment.