Skip to content

Commit

Permalink
Cache dynamodb client and resource in DynamoDB online store implement…
Browse files Browse the repository at this point in the history
…ation (#2138)

Signed-off-by: Felix Wang <[email protected]>
  • Loading branch information
felixwang9817 authored Dec 14, 2021
1 parent 37c0abe commit 59031ca
Showing 1 changed file with 27 additions and 11 deletions.
38 changes: 27 additions & 11 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class DynamoDBOnlineStore(OnlineStore):
Online feature store for AWS DynamoDB.
"""

_dynamodb_client = None
_dynamodb_resource = None

@log_exceptions_and_usage(online_store="dynamodb")
def update(
self,
Expand All @@ -70,7 +73,8 @@ def update(
):
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_client, dynamodb_resource = _initialize_dynamodb(online_config.region)
dynamodb_client = self._get_dynamodb_client(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)

for table_instance in tables_to_keep:
try:
Expand Down Expand Up @@ -105,7 +109,7 @@ def teardown(
):
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)

for table in tables:
_delete_table_idempotent(dynamodb_resource, table.name)
Expand All @@ -122,7 +126,7 @@ def online_write_batch(
) -> None:
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)

table_instance = dynamodb_resource.Table(f"{config.project}.{table.name}")
with table_instance.batch_writer() as batch:
Expand Down Expand Up @@ -151,7 +155,7 @@ def online_read(
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
_, dynamodb_resource = _initialize_dynamodb(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)

result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
Expand All @@ -172,12 +176,23 @@ def online_read(
result.append((None, None))
return result

def _get_dynamodb_client(self, region: str):
if self._dynamodb_client is None:
self._dynamodb_client = _initialize_dynamodb_client(region)
return self._dynamodb_client

def _get_dynamodb_resource(self, region: str):
if self._dynamodb_resource is None:
self._dynamodb_resource = _initialize_dynamodb_resource(region)
return self._dynamodb_resource


def _initialize_dynamodb_client(region: str):
return boto3.client("dynamodb", region_name=region)


def _initialize_dynamodb(region: str):
return (
boto3.client("dynamodb", region_name=region),
boto3.resource("dynamodb", region_name=region),
)
def _initialize_dynamodb_resource(region: str):
return boto3.resource("dynamodb", region_name=region)


def _delete_table_idempotent(
Expand Down Expand Up @@ -231,7 +246,8 @@ def from_proto(infra_object_proto: InfraObjectProto) -> Any:
)

def update(self):
dynamodb_client, dynamodb_resource = _initialize_dynamodb(self.region)
dynamodb_client = _initialize_dynamodb_client(region=self.region)
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)

try:
dynamodb_resource.create_table(
Expand All @@ -252,5 +268,5 @@ def update(self):
dynamodb_client.get_waiter("table_exists").wait(TableName=f"{self.name}")

def teardown(self):
_, dynamodb_resource = _initialize_dynamodb(self.region)
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)
_delete_table_idempotent(dynamodb_resource, self.name)

0 comments on commit 59031ca

Please sign in to comment.