Skip to content

Commit

Permalink
feat: Add boto3 session based auth for dynamodb online store for cros…
Browse files Browse the repository at this point in the history
…s account access (#4606)

* added session auth for dynamodb online store

Signed-off-by: asingh9530 <[email protected]>

* lint

Signed-off-by: asingh9530 <[email protected]>

* Update sdk/python/feast/infra/online_stores/dynamodb.py

Co-authored-by: Francisco Arceo <[email protected]>
Signed-off-by: asingh9530 <[email protected]>

* Update sdk/python/feast/infra/online_stores/dynamodb.py

Co-authored-by: Francisco Arceo <[email protected]>
Signed-off-by: asingh9530 <[email protected]>

* dummy commit

Signed-off-by: asingh9530 <[email protected]>

* dummy commit

Signed-off-by: asingh9530 <[email protected]>

---------

Signed-off-by: asingh9530 <[email protected]>
Co-authored-by: Francisco Arceo <[email protected]>
  • Loading branch information
asingh9530 and franciscojavierarceo authored Oct 10, 2024
1 parent ef9e0bb commit 00eaf74
Showing 1 changed file with 64 additions and 18 deletions.
82 changes: 64 additions & 18 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
tags: Union[Dict[str, str], None] = None
"""AWS resource tags added to each table"""

session_based_auth: bool = False
"""AWS session based client authentication"""


class DynamoDBOnlineStore(OnlineStore):
"""
Expand Down Expand Up @@ -104,10 +107,14 @@ def update(
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_client = self._get_dynamodb_client(
online_config.region, online_config.endpoint_url
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
)
# Add Tags attribute to creation request only if configured to prevent
# TagResource permission issues, even with an empty Tags array.
Expand Down Expand Up @@ -166,7 +173,9 @@ def teardown(
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
)

for table in tables:
Expand Down Expand Up @@ -201,7 +210,9 @@ def online_write_batch(
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
)

table_instance = dynamodb_resource.Table(
Expand All @@ -228,7 +239,9 @@ def online_read(
assert isinstance(online_config, DynamoDBOnlineStoreConfig)

dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
online_config.region,
online_config.endpoint_url,
online_config.session_based_auth,
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
Expand Down Expand Up @@ -323,15 +336,27 @@ def _get_aioboto_session(self):
def _get_aiodynamodb_client(self, region: str):
return self._get_aioboto_session().create_client("dynamodb", region_name=region)

def _get_dynamodb_client(self, region: str, endpoint_url: Optional[str] = None):
def _get_dynamodb_client(
self,
region: str,
endpoint_url: Optional[str] = None,
session_based_auth: Optional[bool] = False,
):
if self._dynamodb_client is None:
self._dynamodb_client = _initialize_dynamodb_client(region, endpoint_url)
self._dynamodb_client = _initialize_dynamodb_client(
region, endpoint_url, session_based_auth
)
return self._dynamodb_client

def _get_dynamodb_resource(self, region: str, endpoint_url: Optional[str] = None):
def _get_dynamodb_resource(
self,
region: str,
endpoint_url: Optional[str] = None,
session_based_auth: Optional[bool] = False,
):
if self._dynamodb_resource is None:
self._dynamodb_resource = _initialize_dynamodb_resource(
region, endpoint_url
region, endpoint_url, session_based_auth
)
return self._dynamodb_resource

Expand Down Expand Up @@ -443,17 +468,38 @@ def _to_client_batch_get_payload(online_config, table_name, batch):
}


def _initialize_dynamodb_client(region: str, endpoint_url: Optional[str] = None):
return boto3.client(
"dynamodb",
region_name=region,
endpoint_url=endpoint_url,
config=Config(user_agent=get_user_agent()),
)
def _initialize_dynamodb_client(
region: str,
endpoint_url: Optional[str] = None,
session_based_auth: Optional[bool] = False,
):
if session_based_auth:
return boto3.Session().client(
"dynamodb",
region_name=region,
endpoint_url=endpoint_url,
config=Config(user_agent=get_user_agent()),
)
else:
return boto3.client(
"dynamodb",
region_name=region,
endpoint_url=endpoint_url,
config=Config(user_agent=get_user_agent()),
)


def _initialize_dynamodb_resource(region: str, endpoint_url: Optional[str] = None):
return boto3.resource("dynamodb", region_name=region, endpoint_url=endpoint_url)
def _initialize_dynamodb_resource(
region: str,
endpoint_url: Optional[str] = None,
session_based_auth: Optional[bool] = False,
):
if session_based_auth:
return boto3.Session().resource(
"dynamodb", region_name=region, endpoint_url=endpoint_url
)
else:
return boto3.resource("dynamodb", region_name=region, endpoint_url=endpoint_url)


# TODO(achals): This form of user-facing templating is experimental.
Expand Down

0 comments on commit 00eaf74

Please sign in to comment.