Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create & teardown Lambda & API Gateway resources for serverless feature server #1900

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@
# Maximum interval(secs) to wait between retries for retry function
MAX_WAIT_INTERVAL: str = "60"

AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server"
AWS_LAMBDA_FEATURE_SERVER_IMAGE = "feastdev/feature-server:aws"

# feature_store.yaml environment variable name for remote feature server
FEATURE_STORE_YAML_ENV_NAME: str = "FEATURE_STORE_YAML_BASE64"

# Environment variable for toggling usage
FEAST_USAGE = "FEAST_USAGE"
15 changes: 15 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,18 @@ def __init__(self, feature_flag_name: str):
f"You are attempting to use an experimental feature that is not enabled. Please run "
f"`feast alpha enable {feature_flag_name}` "
)


class RepoConfigPathDoesNotExist(Exception):
def __init__(self):
super().__init__("The repo_path attribute does not exist for the repo_config.")


class AwsLambdaDoesNotExist(Exception):
def __init__(self):
super().__init__("The created AWS Lambda function does not exist.")


class AwsAPIGatewayDoesNotExist(Exception):
def __init__(self):
super().__init__("The created AWS API Gateway does not exist.")
190 changes: 178 additions & 12 deletions sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,194 @@
import base64
import os
import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from typing import Sequence, Union
from urllib.parse import urlparse

from colorama import Fore, Style

import feast
from feast.constants import AWS_LAMBDA_FEATURE_SERVER_IMAGE
from feast.errors import S3RegistryBucketForbiddenAccess, S3RegistryBucketNotExist
from feast import __version__
from feast.constants import (
AWS_LAMBDA_FEATURE_SERVER_IMAGE,
FEAST_USAGE,
FEATURE_STORE_YAML_ENV_NAME,
)
from feast.entity import Entity
from feast.errors import (
AwsAPIGatewayDoesNotExist,
AwsLambdaDoesNotExist,
RepoConfigPathDoesNotExist,
S3RegistryBucketForbiddenAccess,
S3RegistryBucketNotExist,
)
from feast.feature_table import FeatureTable
from feast.feature_view import FeatureView
from feast.infra.passthrough_provider import PassthroughProvider
from feast.infra.utils import aws_utils
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig

try:
import boto3
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("aws", str(e))


class AwsProvider(PassthroughProvider):
def _upload_docker_image(self) -> None:
def _get_lambda_name(self, project: str):
return f"feast-python-server-{project}-{__version__.replace('+', '_').replace('.', '_')}"

def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

if self.repo_config.feature_server and self.repo_config.feature_server.enabled:
image_uri = self._upload_docker_image(project)
print("Deploying feature server...")

assert self.repo_config.repo_path
if not self.repo_config.repo_path:
raise RepoConfigPathDoesNotExist()
with open(self.repo_config.repo_path / "feature_store.yaml", "rb") as f:
config_bytes = f.read()
config_base64 = base64.b64encode(config_bytes).decode()

resource_name = self._get_lambda_name(project)
lambda_client = boto3.client("lambda")
api_gateway_client = boto3.client("apigatewayv2")
function = aws_utils.get_lambda_function(lambda_client, resource_name)

if function is None:
# If the Lambda function does not exist, create it.
print(" Creating AWS Lambda...")
lambda_client.create_function(
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
FunctionName=resource_name,
Role=self.repo_config.feature_server.execution_role_name,
Code={"ImageUri": image_uri},
PackageType="Image",
MemorySize=1769,
Environment={
"Variables": {
FEATURE_STORE_YAML_ENV_NAME: config_base64,
FEAST_USAGE: "False",
}
},
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": __version__.replace("+", "_").replace(
".", "_"
),
},
)
function = aws_utils.get_lambda_function(lambda_client, resource_name)
if not function:
raise AwsLambdaDoesNotExist()
else:
# If the feature_store.yaml has changed, need to update the environment variable.
env = function.get("Environment", {}).get("Variables", {})
if env.get(FEATURE_STORE_YAML_ENV_NAME) != config_base64:
# Note, that this does not update Lambda gracefully (e.g. no rolling deployment).
# It's expected that feature_store.yaml is not regularly updated while the lambda
# is serving production traffic. However, the update in registry (e.g. modifying
# feature views, feature services, and other definitions does not update lambda).
print(" Updating AWS Lambda...")

lambda_client.update_function_configuration(
FunctionName=resource_name,
Environment={
"Variables": {FEATURE_STORE_YAML_ENV_NAME: config_base64}
},
)

api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
if not api:
# If the API Gateway doesn't exist, create it
print(" Creating AWS API Gateway...")
api = api_gateway_client.create_api(
Name=resource_name,
ProtocolType="HTTP",
Target=function["FunctionArn"],
RouteKey="POST /get-online-features",
Tags={
"feast-owned": "True",
"project": project,
"feast-sdk-version": __version__.replace("+", "_").replace(
".", "_"
),
},
)
if not api:
raise AwsAPIGatewayDoesNotExist()
# Make sure to give AWS Lambda a permission to be invoked by the newly created API Gateway
api_id = api["ApiId"]
region = lambda_client.meta.region_name
account_id = aws_utils.get_account_id()
lambda_client.add_permission(
FunctionName=function["FunctionArn"],
StatementId=str(uuid.uuid4()),
Action="lambda:InvokeFunction",
Principal="apigateway.amazonaws.com",
SourceArn=f"arn:aws:execute-api:{region}:{account_id}:{api_id}/*/*/get-online-features",
)

def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
self.online_store.teardown(self.repo_config, tables, entities)

if (
self.repo_config.feature_server is not None
and self.repo_config.feature_server.enabled
):
print("Tearing down feature server...")
resource_name = self._get_lambda_name(project)
lambda_client = boto3.client("lambda")
api_gateway_client = boto3.client("apigatewayv2")

function = aws_utils.get_lambda_function(lambda_client, resource_name)

if function is not None:
print(" Tearing down AWS Lambda...")
aws_utils.delete_lambda_function(lambda_client, resource_name)

api = aws_utils.get_first_api_gateway(api_gateway_client, resource_name)
if api is not None:
print(" Tearing down AWS API Gateway...")
aws_utils.delete_api_gateway(api_gateway_client, api["ApiId"])

def _upload_docker_image(self, project: str) -> str:
"""
Pulls the AWS Lambda docker image from Dockerhub and uploads it to AWS ECR.

Args:
project: Feast project name

Returns:
The URI of the uploaded docker image.
"""
import base64

try:
Expand Down Expand Up @@ -47,8 +218,8 @@ def _upload_docker_image(self) -> None:
)
docker_client.images.pull(AWS_LAMBDA_FEATURE_SERVER_IMAGE)

version = ".".join(feast.__version__.split(".")[:3])
repository_name = f"feast-python-server-{version}"
version = __version__.replace("+", "_").replace(".", "_")
repository_name = f"feast-python-server-{project}-{version}"
ecr_client = boto3.client("ecr")
try:
print(
Expand Down Expand Up @@ -77,17 +248,12 @@ def _upload_docker_image(self) -> None:
)
image.tag(image_remote_name)
docker_client.api.push(repository_uri, tag=version)
return image_remote_name


class S3RegistryStore(RegistryStore):
def __init__(self, registry_config: RegistryConfig, repo_path: Path):
uri = registry_config.path
try:
import boto3
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("aws", str(e))
self._uri = urlparse(uri)
self._bucket = self._uri.hostname
self._key = self._uri.path.lstrip("/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import tempfile
from pathlib import Path

import yaml
from mangum import Mangum

from feast import FeatureStore
Expand All @@ -13,24 +12,12 @@
config_base64 = os.environ["FEAST_CONFIG_BASE64"]
config_bytes = base64.b64decode(config_base64)

# Override the registry path
config_yaml = yaml.safe_load(config_bytes)
config_yaml["registry"] = "registry.db"
config_bytes = yaml.safe_dump(config_yaml).encode()

# Load Registry
registry_base64 = os.environ["FEAST_REGISTRY_BASE64"]
registry_bytes = base64.b64decode(registry_base64)

# Create a new unique directory for writing feature_store.yaml and registry.db files
# Create a new unique directory for writing feature_store.yaml
repo_path = Path(tempfile.mkdtemp())

with open(repo_path / "feature_store.yaml", "wb") as f:
f.write(config_bytes)

with open(repo_path / "registry.db", "wb") as f:
f.write(registry_bytes)

# Initialize the feature store
store = FeatureStore(repo_path=str(repo_path.resolve()))

Expand Down
7 changes: 0 additions & 7 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ def update_infra(
partial=partial,
)

if self.repo_config.feature_server and self.repo_config.feature_server.enabled:
self._upload_docker_image()

def teardown_infra(
self,
project: str,
Expand Down Expand Up @@ -150,7 +147,3 @@ def get_historical_features(
full_feature_names=full_feature_names,
)
return job

def _upload_docker_image(self) -> None:
"""Upload the docker image for the feature server to the cloud."""
pass
Loading