Skip to content

Commit

Permalink
Dynamodb infra object (#2131)
Browse files Browse the repository at this point in the history
* Rename files

Signed-off-by: Felix Wang <[email protected]>

* Add DynamoDB table as an InfraObject

Signed-off-by: Felix Wang <[email protected]>
  • Loading branch information
felixwang9817 authored Dec 13, 2021
1 parent 7302eb5 commit 192d3ad
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 39 deletions.
31 changes: 31 additions & 0 deletions protos/feast/core/DynamoDBTable.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//
// * Copyright 2021 The Feast Authors
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * https://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
//

syntax = "proto3";

package feast.core;
option java_package = "feast.proto.core";
option java_outer_classname = "DynamoDBTableProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

// Represents a DynamoDB table
message DynamoDBTable {
// Name of the table
string name = 1;

// Region of the table
string region = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ syntax = "proto3";

package feast.core;
option java_package = "feast.proto.core";
option java_outer_classname = "InfraObjectsProto";
option java_outer_classname = "InfraObjectProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "feast/core/DynamoDBTable.proto";

// Represents a set of infrastructure objects managed by Feast
message Infra {
// List of infrastructure objects managed by Feast
Expand All @@ -34,6 +36,7 @@ message InfraObject {

// The infrastructure object
oneof infra_object {
DynamoDBTable dynamodb_table = 2;
CustomInfra custom_infra = 100;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from typing import Any, List

from feast.importer import get_class_from_type
from feast.protos.feast.core.InfraObjects_pb2 import Infra as InfraProto
from feast.protos.feast.core.InfraObjects_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto


class InfraObject(ABC):
Expand Down
129 changes: 93 additions & 36 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
from pydantic.typing import Literal

from feast import Entity, FeatureTable, FeatureView, utils
from feast.infra.infra_object import InfraObject
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.core.DynamoDBTable_pb2 import (
DynamoDBTable as DynamoDBTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
Expand Down Expand Up @@ -65,7 +70,7 @@ def update(
):
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_client, dynamodb_resource = self._initialize_dynamodb(online_config)
dynamodb_client, dynamodb_resource = _initialize_dynamodb(online_config.region)

for table_instance in tables_to_keep:
try:
Expand All @@ -89,7 +94,8 @@ def update(
TableName=f"{config.project}.{table_instance.name}"
)

self._delete_tables_idempotent(dynamodb_resource, config, tables_to_delete)
for table_to_delete in tables_to_delete:
_delete_table_idempotent(dynamodb_resource, table_to_delete.name)

def teardown(
self,
Expand All @@ -99,9 +105,10 @@ def teardown(
):
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = self._initialize_dynamodb(online_config)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)

self._delete_tables_idempotent(dynamodb_resource, config, tables)
for table in tables:
_delete_table_idempotent(dynamodb_resource, table.name)

@log_exceptions_and_usage(online_store="dynamodb")
def online_write_batch(
Expand All @@ -115,7 +122,7 @@ def online_write_batch(
) -> None:
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = self._initialize_dynamodb(online_config)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)

table_instance = dynamodb_resource.Table(f"{config.project}.{table.name}")
with table_instance.batch_writer() as batch:
Expand Down Expand Up @@ -144,7 +151,7 @@ def online_read(
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = self._initialize_dynamodb(online_config)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
Expand All @@ -165,35 +172,85 @@ def online_read(
result.append((None, None))
return result

def _initialize_dynamodb(self, online_config: DynamoDBOnlineStoreConfig):
return (
boto3.client("dynamodb", region_name=online_config.region),
boto3.resource("dynamodb", region_name=online_config.region),

def _initialize_dynamodb(region: str):
return (
boto3.client("dynamodb", region_name=region),
boto3.resource("dynamodb", region_name=region),
)


def _delete_table_idempotent(
dynamodb_resource, table_name: str,
):
try:
table = dynamodb_resource.Table(table_name)
table.delete()
logger.info(f"Dynamo table {table_name} was deleted")
except ClientError as ce:
# If the table deletion fails with ResourceNotFoundException,
# it means the table has already been deleted.
# Otherwise, re-raise the exception
if ce.response["Error"]["Code"] != "ResourceNotFoundException":
raise
else:
logger.warning(f"Trying to delete table that doesn't exist: {table_name}")


class DynamoDBTable(InfraObject):
"""
A DynamoDB table managed by Feast.
Attributes:
name: The name of the table.
region: The region of the table.
"""

name: str
region: str

def __init__(self, name: str, region: str):
self.name = name
self.region = region

def to_proto(self) -> InfraObjectProto:
dynamodb_table_proto = DynamoDBTableProto()
dynamodb_table_proto.name = self.name
dynamodb_table_proto.region = self.region

return InfraObjectProto(
infra_object_class_type="feast.infra.online_stores.dynamodb.DynamoDBTable",
dynamodb_table=dynamodb_table_proto,
)

def _delete_tables_idempotent(
self,
dynamodb_resource,
config: RepoConfig,
tables: Sequence[Union[FeatureTable, FeatureView]],
):
for table_instance in tables:
try:
table = dynamodb_resource.Table(
f"{config.project}.{table_instance.name}"
)
table.delete()
logger.info(
f"Dynamo table {config.project}.{table_instance.name} was deleted"
)
except ClientError as ce:
# If the table deletion fails with ResourceNotFoundException,
# it means the table has already been deleted.
# Otherwise, re-raise the exception
if ce.response["Error"]["Code"] != "ResourceNotFoundException":
raise
else:
logger.warning(
f"Trying to delete table that doesn't exist:"
f" {config.project}.{table_instance.name}"
)
@staticmethod
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
return DynamoDBTable(
name=infra_object_proto.dynamodb_table.name,
region=infra_object_proto.dynamodb_table.region,
)

def update(self):
dynamodb_client, dynamodb_resource = _initialize_dynamodb(self.region)

try:
dynamodb_resource.create_table(
TableName=f"{self.name}",
KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "entity_id", "AttributeType": "S"}
],
BillingMode="PAY_PER_REQUEST",
)
except ClientError as ce:
# If the table creation fails with ResourceInUseException,
# it means the table already exists or is being created.
# Otherwise, re-raise the exception
if ce.response["Error"]["Code"] != "ResourceInUseException":
raise

dynamodb_client.get_waiter("table_exists").wait(TableName=f"{self.name}")

def teardown(self):
_, dynamodb_resource = _initialize_dynamodb(self.region)
_delete_table_idempotent(dynamodb_resource, self.name)

0 comments on commit 192d3ad

Please sign in to comment.