Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for DynamoDB and S3 registry #1483

Merged
merged 38 commits into from
Jul 3, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9e0e7c7
Add support for DynamoDB and S3 registry
leonid133 Apr 27, 2021
44aadd8
rcu and wcu as a parameter of dynamodb online store
leonid133 Apr 27, 2021
6383791
fix linter
leonid133 May 4, 2021
73ff67a
aws dependency to extras
leonid133 May 18, 2021
aa6d0da
FEAST_S3_ENDPOINT_URL
leonid133 May 18, 2021
0a87050
tests
leonid133 May 18, 2021
3b8bb31
merge from master
leonid133 May 18, 2021
00e8675
fix signature, after merge
leonid133 May 18, 2021
6a99cd9
aws default region name configurable
leonid133 May 18, 2021
32dc799
merge from master
leonid133 Jun 11, 2021
db616c4
add offlinestore config type to test
leonid133 Jun 11, 2021
8dcbd5a
review changes
leonid133 Jun 11, 2021
fee93dd
merge from master
leonid133 Jun 18, 2021
2bbe268
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
5d33a79
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 18, 2021
24c44ee
merge latest from master
leonid133 Jun 23, 2021
7b99cde
review requested changes
leonid133 Jun 23, 2021
3a985b0
integration test for Dynamo
leonid133 Jun 23, 2021
6973581
change the rest of table_name to table_instance (where table_name is …
leonid133 Jun 28, 2021
e928424
fix DynamoDBOnlineStore commit
leonid133 Jun 28, 2021
59d7e4c
move client to _initialize_dynamodb
leonid133 Jun 28, 2021
594b932
rename document_id to entity_id and Row to entity_id
leonid133 Jun 28, 2021
15a787c
The default value is None
leonid133 Jun 28, 2021
7eaa654
Remove Datastore from the docstring.
leonid133 Jun 28, 2021
1468117
get rid of the return call from S3RegistryStore
leonid133 Jun 28, 2021
5dbe429
merge two exceptions
leonid133 Jun 29, 2021
986d45e
For ci requirement
leonid133 Jun 29, 2021
79d85c7
remove configuration from test
leonid133 Jun 29, 2021
f50b2fb
feast-integration-tests for tests
leonid133 Jun 29, 2021
509c521
change test path
leonid133 Jun 29, 2021
cd67973
add fixture feature_store_with_s3_registry to test
leonid133 Jun 29, 2021
5466d20
merge from master
leonid133 Jun 29, 2021
3d1b78c
region required
leonid133 Jun 29, 2021
ff8d635
Merge branch 'master' of https://github.com/feast-dev/feast into feat…
leonid133 Jun 29, 2021
57a607c
Address the rest of the comments
Jul 2, 2021
e9422ea
Merge branch 'master' into feature/online_dynamodb
Jul 2, 2021
3cd9597
Update to_table to to_arrow
Jul 2, 2021
124b337
Merge branch 'master' into feature/online_dynamodb
Jul 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ install-ci-dependencies: install-python-ci-dependencies install-go-ci-dependenci
install-python-ci-dependencies:
pip install -e "sdk/python[ci]"

install-python-aws-dependencies:
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
pip install -e "sdk/python[aws]"

leonid133 marked this conversation as resolved.
Show resolved Hide resolved
package-protos:
cp -r ${ROOT_DIR}/protos ${ROOT_DIR}/sdk/python/feast/protos

Expand Down
Binary file added docs/specs/dynamodb_online_example.monopic
Binary file not shown.
Binary file added docs/specs/dynamodb_online_example.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List
@click.option(
"--template",
"-t",
type=click.Choice(["local", "gcp"], case_sensitive=False),
type=click.Choice(["local", "gcp", "aws"], case_sensitive=False),
help="Specify a template for the created project",
default="local",
)
Expand Down
20 changes: 20 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ def __init__(self, name, project=None):
super().__init__(f"Feature table {name} does not exist")


class FeatureBucketNotExist(FeastObjectNotFoundException):
tsotnet marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, bucket, project=None):
if project:
super().__init__(
f"Feature bucket {bucket} does not exist in project {project}"
)
else:
super().__init__(f"Feature bucket {bucket} does not exist")


class FeatureBucketForbiddenAccess(FeastObjectNotFoundException):
def __init__(self, bucket, project=None):
if project:
super().__init__(
f"Private Registry Bucket {bucket} forbidden Access in project {project}"
)
else:
super().__init__(f"Private Registry Bucket {bucket} forbidden Access")
leonid133 marked this conversation as resolved.
Show resolved Hide resolved


class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""

Expand Down
136 changes: 136 additions & 0 deletions sdk/python/feast/infra/aws_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from datetime import datetime
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas
from tqdm import tqdm

from feast import FeatureTable, utils
tsotnet marked this conversation as resolved.
Show resolved Hide resolved
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.helpers import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import (
Provider,
RetrievalJob,
_convert_arrow_to_proto,
_get_column_names,
_run_field_mapping,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig


class AwsProvider(Provider):
def __init__(self, config: RepoConfig):
self.repo_config = config
self.offline_store = get_offline_store_from_config(config.offline_store)
self.online_store = get_online_store_from_config(config.online_store)

def update_infra(
self,
project: str,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
self.online_store.update(
config=self.repo_config,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_keep=entities_to_keep,
entities_to_delete=entities_to_delete,
partial=partial,
)

def teardown_infra(
self,
project: str,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
) -> None:
self.online_store.teardown(self.repo_config, tables, entities)

def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
self.online_store.online_write_batch(config, table, data, progress)

def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: List[str] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
result = self.online_store.online_read(config, table, entity_keys)

return result

def materialize_single_feature_view(
self,
feature_view: FeatureView,
start_date: datetime,
end_date: datetime,
registry: Registry,
project: str,
tqdm_builder: Callable[[int], tqdm],
) -> None:
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
event_timestamp_column,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

start_date = utils.make_tzaware(start_date)
end_date = utils.make_tzaware(end_date)

leonid133 marked this conversation as resolved.
Show resolved Hide resolved
table = self.offline_store.pull_latest_from_table_or_query(
data_source=feature_view.input,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
event_timestamp_column=event_timestamp_column,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

if feature_view.input.field_mapping is not None:
table = _run_field_mapping(table, feature_view.input.field_mapping)

join_keys = [entity.join_key for entity in entities]
rows_to_write = _convert_arrow_to_proto(table, feature_view, join_keys)

with tqdm_builder(len(rows_to_write)) as pbar:
self.online_write_batch(
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
)

feature_view.materialization_intervals.append((start_date, end_date))
registry.apply_feature_view(feature_view, project)
leonid133 marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def get_historical_features(
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pandas.DataFrame, str],
registry: Registry,
project: str,
) -> RetrievalJob:
# TODO implement me
pass
tsotnet marked this conversation as resolved.
Show resolved Hide resolved
186 changes: 186 additions & 0 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Copyright 2021 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
tsotnet marked this conversation as resolved.
Show resolved Hide resolved
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import mmh3
from pydantic import PositiveInt, StrictStr
from pydantic.typing import Literal

from feast import Entity, FeatureTable, FeatureView, utils
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig

try:
import boto3
from botocore.exceptions import ClientError
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("aws", str(e))


class DynamoDbOnlineStoreConfig(FeastConfigBaseModel):
tsotnet marked this conversation as resolved.
Show resolved Hide resolved
"""Online store config for DynamoDB store"""

type: Literal["dynamodb"] = "dynamodb"
"""Online store type selector"""

rcu: Optional[PositiveInt] = 5
""" Read capacity unit """

wcu: Optional[PositiveInt] = 5
""" Write capacity unit """
leonid133 marked this conversation as resolved.
Show resolved Hide resolved

region_name: Optional[StrictStr] = None
""" AWS Region Name """
leonid133 marked this conversation as resolved.
Show resolved Hide resolved


class DynamoDbOnlineStore(OnlineStore):
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
def _initialize_dynamodb(self, online_config: DynamoDbOnlineStoreConfig):
return boto3.resource("dynamodb", region_name=online_config.region_name)
tsotnet marked this conversation as resolved.
Show resolved Hide resolved

def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[Union[FeatureTable, FeatureView]],
tables_to_keep: Sequence[Union[FeatureTable, FeatureView]],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
online_config = config.online_store
assert isinstance(online_config, DynamoDbOnlineStoreConfig)
dynamodb = self._initialize_dynamodb(online_config)

for table_name in tables_to_keep:
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
table = None
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
try:
table = dynamodb.create_table(
TableName=table_name.name,
KeySchema=[
{"AttributeName": "Row", "KeyType": "HASH"},
{"AttributeName": "Project", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "Row", "AttributeType": "S"},
{"AttributeName": "Project", "AttributeType": "S"},
],
ProvisionedThroughput={
"ReadCapacityUnits": online_config.rcu,
"WriteCapacityUnits": online_config.wcu,
},
)
tsotnet marked this conversation as resolved.
Show resolved Hide resolved
table.meta.client.get_waiter("table_exists").wait(
TableName=table_name.name
)
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
except ClientError as ce:
print(ce)
if ce.response["Error"]["Code"] == "ResourceNotFoundException":
table = dynamodb.Table(table_name.name)
tsotnet marked this conversation as resolved.
Show resolved Hide resolved

for table_name in tables_to_delete:
table = dynamodb.Table(table_name.name)
table.delete()

def teardown(
self,
config: RepoConfig,
tables: Sequence[Union[FeatureTable, FeatureView]],
entities: Sequence[Entity],
):
online_config = config.online_store
assert isinstance(online_config, DynamoDbOnlineStoreConfig)
dynamodb = self._initialize_dynamodb(online_config)

for table_name in tables:
try:
table = dynamodb.Table(table_name)
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
table.delete()
except Exception as e:
print(str(e))
leonid133 marked this conversation as resolved.
Show resolved Hide resolved

def online_write_batch(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
online_config = config.online_store
assert isinstance(online_config, DynamoDbOnlineStoreConfig)
dynamodb = self._initialize_dynamodb(online_config)

table_instance = dynamodb.Table(table.name)
with table_instance.batch_writer() as batch:
for entity_key, features, timestamp, created_ts in data:
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
document_id = compute_datastore_entity_id(entity_key) # TODO check id
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
# TODO compression encoding
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
batch.put_item(
Item={
"Row": document_id, # PartitionKey
tsotnet marked this conversation as resolved.
Show resolved Hide resolved
"Project": config.project, # SortKey
"event_ts": str(utils.make_tzaware(timestamp)),
"values": {
k: v.SerializeToString()
for k, v in features.items() # Serialized Features
},
}
tsotnet marked this conversation as resolved.
Show resolved Hide resolved
)

def online_read(
self,
config: RepoConfig,
table: Union[FeatureTable, FeatureView],
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
online_config = config.online_store
assert isinstance(online_config, DynamoDbOnlineStoreConfig)
dynamodb = self._initialize_dynamodb(online_config)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
table_instace = dynamodb.Table(table.name)
document_id = compute_datastore_entity_id(entity_key) # TODO check id
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
response = table_instace.get_item(
Key={"Row": document_id, "Project": config.project}
)
value = response["Item"]
tsotnet marked this conversation as resolved.
Show resolved Hide resolved

if value is not None:
res = {}
for feature_name, value_bin in value["values"].items():
val = ValueProto()
val.ParseFromString(value_bin.value)
res[feature_name] = val
result.append((value["event_ts"], res))
else:
result.append((None, None))
return result


def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
leonid133 marked this conversation as resolved.
Show resolved Hide resolved
"""
Compute Datastore Entity id given Feast Entity Key.

Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to
do with the Entity concept we have in Feast.
"""
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
4 changes: 4 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
from feast.infra.gcp import GcpProvider

return GcpProvider(config)
elif config.provider == "aws":
from feast.infra.aws_provider import AwsProvider

return AwsProvider(config)
elif config.provider == "local":
from feast.infra.local import LocalProvider

Expand Down
Loading