Skip to content

Commit

Permalink
feat: Add templating for dynamodb table name (#2394)
Browse files Browse the repository at this point in the history
Signed-off-by: Danny Boland <[email protected]>
  • Loading branch information
dannyboland authored Mar 10, 2022
1 parent 9aa96b9 commit f591088
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
region: StrictStr
""" AWS Region Name """

table_name_template: StrictStr = "{project}.{table_name}"
""" DynamoDB table name template """


class DynamoDBOnlineStore(OnlineStore):
"""
Expand Down Expand Up @@ -91,7 +94,7 @@ def update(
for table_instance in tables_to_keep:
try:
dynamodb_resource.create_table(
TableName=_get_table_name(config, table_instance),
TableName=_get_table_name(online_config, config, table_instance),
KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "entity_id", "AttributeType": "S"}
Expand All @@ -107,12 +110,13 @@ def update(

for table_instance in tables_to_keep:
dynamodb_client.get_waiter("table_exists").wait(
TableName=_get_table_name(config, table_instance)
TableName=_get_table_name(online_config, config, table_instance)
)

for table_to_delete in tables_to_delete:
_delete_table_idempotent(
dynamodb_resource, _get_table_name(config, table_to_delete)
dynamodb_resource,
_get_table_name(online_config, config, table_to_delete),
)

def teardown(
Expand All @@ -133,7 +137,9 @@ def teardown(
dynamodb_resource = self._get_dynamodb_resource(online_config.region)

for table in tables:
_delete_table_idempotent(dynamodb_resource, _get_table_name(config, table))
_delete_table_idempotent(
dynamodb_resource, _get_table_name(online_config, config, table)
)

@log_exceptions_and_usage(online_store="dynamodb")
def online_write_batch(
Expand Down Expand Up @@ -164,7 +170,9 @@ def online_write_batch(
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)

table_instance = dynamodb_resource.Table(_get_table_name(config, table))
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)
with table_instance.batch_writer() as batch:
for entity_key, features, timestamp, created_ts in data:
entity_id = compute_entity_id(entity_key)
Expand Down Expand Up @@ -206,7 +214,9 @@ def online_read(

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
table_instance = dynamodb_resource.Table(_get_table_name(config, table))
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)
entity_id = compute_entity_id(entity_key)
with tracing_span(name="remote_call"):
response = table_instance.get_item(Key={"entity_id": entity_id})
Expand Down Expand Up @@ -242,8 +252,12 @@ def _initialize_dynamodb_resource(region: str):
return boto3.resource("dynamodb", region_name=region)


def _get_table_name(config: RepoConfig, table: FeatureView) -> str:
return f"{config.project}.{table.name}"
def _get_table_name(
online_config: DynamoDBOnlineStoreConfig, config: RepoConfig, table: FeatureView
) -> str:
return online_config.table_name_template.format(
project=config.project, table_name=table.name
)


def _delete_table_idempotent(
Expand Down

0 comments on commit f591088

Please sign in to comment.