Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add an experimental lambda-based materialization engine #2923

Merged
merged 10 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ def update_infra(

self._deploy_feature_server(project, image_uri)

if self.batch_engine:
self.batch_engine.update(
project,
tables_to_delete,
tables_to_keep,
entities_to_delete,
entities_to_keep,
)

def _deploy_feature_server(self, project: str, image_uri: str):
_logger.info("Deploying feature server...")

Expand Down Expand Up @@ -198,8 +207,7 @@ def _deploy_feature_server(self, project: str, image_uri: str):
def teardown_infra(
self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity],
) -> None:
if self.online_store:
self.online_store.teardown(self.repo_config, tables, entities)
super(AwsProvider, self).teardown_infra(project, tables, entities)

if (
self.repo_config.feature_server is not None
Expand Down
25 changes: 25 additions & 0 deletions sdk/python/feast/infra/materialization/lambda/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM public.ecr.aws/lambda/python:3.9

RUN yum install -y git


# Copy app handler code
COPY sdk/python/feast/infra/materialization/lambda/app.py ${LAMBDA_TASK_ROOT}

# Copy necessary parts of the Feast codebase
COPY sdk/python sdk/python
COPY protos protos
COPY go go
COPY setup.py setup.py
COPY pyproject.toml pyproject.toml
COPY README.md README.md

# Install Feast for AWS with Lambda dependencies
# We need this mount thingy because setuptools_scm needs access to the
# git dir to infer the version of feast we're installing.
# https://github.com/pypa/setuptools_scm#usage-from-docker
# I think it also assumes that this dockerfile is being built from the root of the directory.
RUN --mount=source=.git,target=.git,type=bind pip3 install --no-cache-dir -e '.[aws,redis]'

# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.handler" ]
11 changes: 11 additions & 0 deletions sdk/python/feast/infra/materialization/lambda/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .lambda_engine import (
LambdaMaterializationEngine,
LambdaMaterializationEngineConfig,
LambdaMaterializationJob,
)

__all__ = [
"LambdaMaterializationEngineConfig",
"LambdaMaterializationJob",
"LambdaMaterializationEngine",
]
82 changes: 82 additions & 0 deletions sdk/python/feast/infra/materialization/lambda/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import base64
import json
import sys
import tempfile
import traceback
from pathlib import Path

import pyarrow.parquet as pq

from feast import FeatureStore
from feast.constants import FEATURE_STORE_YAML_ENV_NAME
from feast.infra.materialization.local_engine import DEFAULT_BATCH_SIZE
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping


def handler(event, context):
"""Provide an event that contains the following keys:

- operation: one of the operations in the operations dict below
- tableName: required for operations that interact with DynamoDB
- payload: a parameter to pass to the operation being performed
"""
print("Received event: " + json.dumps(event, indent=2), flush=True)

try:

config_base64 = event[FEATURE_STORE_YAML_ENV_NAME]

config_bytes = base64.b64decode(config_base64)

# Create a new unique directory for writing feature_store.yaml
repo_path = Path(tempfile.mkdtemp())

with open(repo_path / "feature_store.yaml", "wb") as f:
f.write(config_bytes)

# Initialize the feature store
store = FeatureStore(repo_path=str(repo_path.resolve()))

view_name = event["view_name"]
view_type = event["view_type"]
path = event["path"]

bucket = path[len("s3://") :].split("/", 1)[0]
achals marked this conversation as resolved.
Show resolved Hide resolved
key = path[len("s3://") :].split("/", 1)[1]
print(f"Inferred Bucket: `{bucket}` Key: `{key}`", flush=True)

if view_type == "batch":
# TODO: This probably needs to be become `store.get_batch_feature_view` at some point.
feature_view = store.get_feature_view(view_name)
achals marked this conversation as resolved.
Show resolved Hide resolved
else:
feature_view = store.get_stream_feature_view(view_name)

print(f"Got Feature View: `{feature_view}`", flush=True)

table = pq.read_table(path)
if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}

written_rows = 0

for batch in table.to_batches(DEFAULT_BATCH_SIZE):
achals marked this conversation as resolved.
Show resolved Hide resolved
rows_to_write = _convert_arrow_to_proto(
batch, feature_view, join_key_to_value_type
)
store._provider.online_write_batch(
store.config, feature_view, rows_to_write, lambda x: None,
)
written_rows += len(rows_to_write)
return {"written_rows": written_rows}
except Exception as e:
print(f"Exception: {e}", flush=True)
print("Traceback:", flush=True)
print(traceback.format_exc(), flush=True)
sys.exit(1)
238 changes: 238 additions & 0 deletions sdk/python/feast/infra/materialization/lambda/lambda_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import base64
import json
import logging
from concurrent.futures import ThreadPoolExecutor, wait
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, List, Literal, Optional, Sequence, Union

import boto3
from pydantic import StrictStr
from tqdm import tqdm

from feast.batch_feature_view import BatchFeatureView
from feast.constants import FEATURE_STORE_YAML_ENV_NAME
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.materialization.batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
MaterializationJobStatus,
MaterializationTask,
)
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names
from feast.version import get_version

DEFAULT_BATCH_SIZE = 10_000

logger = logging.getLogger(__name__)


class LambdaMaterializationEngineConfig(FeastConfigBaseModel):
"""Batch Materialization Engine config for lambda based engine"""

type: Literal["lambda"] = "lambda"
""" Type selector"""

materialization_image: StrictStr
""" The URI of a container image in the Amazon ECR registry, which should be used for materialization. """

lambda_role: StrictStr
""" Role that should be used by the materialization lambda """


@dataclass
class LambdaMaterializationJob(MaterializationJob):
def __init__(self, job_id: str, status: MaterializationJobStatus) -> None:
super().__init__()
self._job_id: str = job_id
self._status = status
self._error = None

def status(self) -> MaterializationJobStatus:
return self._status

def error(self) -> Optional[BaseException]:
return self._error

def should_be_retried(self) -> bool:
return False

def job_id(self) -> str:
return self._job_id

def url(self) -> Optional[str]:
return None


class LambdaMaterializationEngine(BatchMaterializationEngine):
"""
WARNING: This engine should be considered "Alpha" functionality.
"""

def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
# This should be setting up the lambda function.
r = self.lambda_client.create_function(
FunctionName=self.lambda_name,
PackageType="Image",
Role=self.repo_config.batch_engine.lambda_role,
Code={"ImageUri": self.repo_config.batch_engine.materialization_image},
Timeout=600,
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": get_version(),
},
)
logger.info("Creating lambda function %s, %s", self.lambda_name, r)

logger.info("Waiting for function %s to be active", self.lambda_name)
waiter = self.lambda_client.get_waiter("function_active")
waiter.wait(FunctionName=self.lambda_name)

def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
entities: Sequence[Entity],
):
# This should be tearing down the lambda function.
logger.info("Tearing down lambda %s", self.lambda_name)
r = self.lambda_client.delete_function(FunctionName=self.lambda_name)
logger.info("Finished tearing down lambda %s: %s", self.lambda_name, r)

def __init__(
self,
*,
repo_config: RepoConfig,
offline_store: OfflineStore,
online_store: OnlineStore,
**kwargs,
):
super().__init__(
repo_config=repo_config,
offline_store=offline_store,
online_store=online_store,
**kwargs,
)
repo_path = self.repo_config.repo_path
assert repo_path
feature_store_path = repo_path / "feature_store.yaml"
self.feature_store_base64 = str(
base64.b64encode(bytes(feature_store_path.read_text(), "UTF-8")), "UTF-8"
)

self.lambda_name = f"feast-materialize-{self.repo_config.project}"
if len(self.lambda_name) > 64:
self.lambda_name = self.lambda_name[:64]
self.lambda_client = boto3.client("lambda")

def materialize(
self, registry, tasks: List[MaterializationTask]
) -> List[MaterializationJob]:
return [
self._materialize_one(
registry,
task.feature_view,
task.start_time,
task.end_time,
task.project,
task.tqdm_builder,
)
for task in tasks
]

def _materialize_one(
self,
registry: BaseRegistry,
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
start_date: datetime,
end_date: datetime,
project: str,
tqdm_builder: Callable[[int], tqdm],
):
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
timestamp_field,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

job_id = f"{feature_view.name}-{start_date}-{end_date}"

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

paths = offline_job.to_remote_storage()
max_workers = len(paths) if len(paths) <= 20 else 20
executor = ThreadPoolExecutor(max_workers=max_workers)
futures = []

for path in paths:
payload = {
FEATURE_STORE_YAML_ENV_NAME: self.feature_store_base64,
"view_name": feature_view.name,
"view_type": "batch",
"path": path,
}
# Invoke a lambda to materialize this file.

logger.info("Invoking materialization for %s", path)
futures.append(
executor.submit(
self.lambda_client.invoke,
FunctionName=self.lambda_name,
InvocationType="RequestResponse",
Payload=json.dumps(payload),
)
)

done, not_done = wait(futures)
logger.info("Done: %s Not Done: %s", done, not_done)
for f in done:
response = f.result()
output = json.loads(response["Payload"].read())

logger.info(
f"Ingested task; request id {response['ResponseMetadata']['RequestId']}, "
f"rows written: {output['written_rows']}"
)

for f in not_done:
response = f.result()
logger.error(f"Ingestion failed: {response}")

return LambdaMaterializationJob(
job_id=job_id,
status=MaterializationJobStatus.SUCCEEDED
if not not_done
else MaterializationJobStatus.ERROR,
)
3 changes: 2 additions & 1 deletion sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ def online_read(
break
batch_entity_ids = {
table_instance.name: {
"Keys": [{"entity_id": entity_id} for entity_id in batch]
"Keys": [{"entity_id": entity_id} for entity_id in batch],
"ConsistentRead": True,
}
}
with tracing_span(name="remote_call"):
Expand Down
Loading