Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(idempotency): handle lambda timeout scenarios for INPROGRESS records #1387

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c53d504
feat(idempotency): add option to expire inprogress invocations
rubenfonseca Jul 25, 2022
90d21fb
chore(idempotency): make existing tests pass with expire_in_progress
rubenfonseca Jul 26, 2022
9676e13
chore(idempotency): add tests for expires_in_progress
rubenfonseca Jul 26, 2022
550e466
chore(docs): added docs about `expires_in_progress`
rubenfonseca Jul 26, 2022
bf18cc2
chore(idempotency): refactored expire in-progress logic
rubenfonseca Jul 27, 2022
c88482e
chore(idempotency): add tests
rubenfonseca Jul 27, 2022
f47ab1f
chore(idempotency): remove unused fixtures in tests
rubenfonseca Jul 27, 2022
a4f8ce7
chore(idempotency): make mypy happy
rubenfonseca Jul 27, 2022
975933d
chores(documentation): update sample code for `expires_in_progress`
rubenfonseca Jul 27, 2022
cd90fc1
chore(documentation): replace idempotency diagrams with mermaid.js
rubenfonseca Jul 27, 2022
631370d
chore(idempotency): remove param `expires_in_progress`
rubenfonseca Jul 27, 2022
61f94b3
chore(idempotency): remove more of the old code
rubenfonseca Jul 27, 2022
fa80aed
chore(docs): remove bad comment
rubenfonseca Jul 27, 2022
c706b0c
feat(idempotench): add mechanism to register lambda context
rubenfonseca Jul 27, 2022
1a72214
fix(idempotency): typo
rubenfonseca Jul 27, 2022
0ce0bb2
fix(idempotency): capture the lambda context automatically
rubenfonseca Jul 28, 2022
a2b6a34
chore(idempotency): addressed review comments
rubenfonseca Jul 29, 2022
228a76d
docs(idempotency): include register_lambda_context in doc snippets
heitorlessa Jul 29, 2022
3cb7411
chore(idempotency): added tests for handle_for_status
rubenfonseca Jul 29, 2022
5c09a5a
chore(docs): add documentation to method
rubenfonseca Jul 29, 2022
2e1afd3
chore(idempotency): address comments
rubenfonseca Jul 29, 2022
0e9dfd4
chore(idempotency): simplified strings
rubenfonseca Jul 29, 2022
81ed53d
chore(documentation): addressed comments
rubenfonseca Jul 29, 2022
66b62a6
chore(idempotency): no need to update expire on update
rubenfonseca Jul 29, 2022
84ced8e
docs(idempotency): reorder wording, banners to emphasize the need, an…
heitorlessa Jul 29, 2022
1ee5c25
Merge remote-tracking branch 'rubenfonseca/feat/expire-inprogress' in…
heitorlessa Jul 29, 2022
84280af
docs(idempotency): shorten wording to fit new mermaid SVG
heitorlessa Jul 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
from copy import deepcopy
from typing import Any, Callable, Dict, Optional, Tuple
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(
self.data = deepcopy(_prepare_data(function_payload))
self.fn_args = function_args
self.fn_kwargs = function_kwargs
self.config = config

persistence_store.configure(config, self.function.__name__)
self.persistence_store = persistence_store
Expand Down Expand Up @@ -101,7 +103,9 @@ def _process_idempotency(self):
try:
# We call save_inprogress first as an optimization for the most common case where no idempotent record
# already exists. If it succeeds, there's no need to call get_record.
self.persistence_store.save_inprogress(data=self.data)
self.persistence_store.save_inprogress(
data=self.data, remaining_time_in_millis=self._get_remaining_time_in_millis()
)
except IdempotencyKeyError:
raise
except IdempotencyItemAlreadyExistsError:
Expand All @@ -113,6 +117,24 @@ def _process_idempotency(self):

return self._get_function_response()

def _get_remaining_time_in_millis(self) -> Optional[int]:
"""
Tries to determine the remaining time available for the current lambda invocation.

Currently, it only works if the idempotent handler decorator is used, since we need to acess the lambda context.
However, this could be improved if we start storing the lambda context globally during the invocation.

Returns
-------
Optional[int]
Remaining time in millis, or None if the remaining time cannot be determined.
"""

if self.config.lambda_context is not None:
return self.config.lambda_context.get_remaining_time_in_millis()

return None

def _get_idempotency_record(self) -> DataRecord:
"""
Retrieve the idempotency record from the persistence layer.
Expand Down Expand Up @@ -167,6 +189,13 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any]
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")

if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
if data_record.in_progress_expiry_timestamp is not None and data_record.in_progress_expiry_timestamp < int(
datetime.datetime.now().timestamp()
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved
):
raise IdempotencyInconsistentStateError(
"item should have been expired in-progress because it already time-outed."
)

raise IdempotencyAlreadyInProgressError(
f"Execution already in progress with idempotency key: "
f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}"
Expand Down
6 changes: 6 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Dict, Optional

from aws_lambda_powertools.utilities.typing import LambdaContext


class IdempotencyConfig:
def __init__(
Expand Down Expand Up @@ -41,3 +43,7 @@ def __init__(
self.use_local_cache = use_local_cache
self.local_cache_max_items = local_cache_max_items
self.hash_function = hash_function
self.lambda_context: Optional[LambdaContext] = None
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved

def register_lambda_context(self, lambda_context: LambdaContext):
self.lambda_context = lambda_context
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/idempotency.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def idempotent(
return handler(event, context)

config = config or IdempotencyConfig()
config.register_lambda_context(context)

args = event, context
idempotency_handler = IdempotencyHandler(
function=handler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import warnings
from abc import ABC, abstractmethod
from math import ceil
from types import MappingProxyType
from typing import Any, Dict, Optional

Expand Down Expand Up @@ -40,6 +41,7 @@ def __init__(
idempotency_key,
status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
response_data: Optional[str] = "",
payload_hash: Optional[str] = None,
) -> None:
Expand All @@ -53,6 +55,8 @@ def __init__(
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp: int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
Expand All @@ -61,6 +65,7 @@ def __init__(
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
self._status = status
self.response_data = response_data

Expand Down Expand Up @@ -328,14 +333,16 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None:

self._save_to_cache(data_record=data_record)

def save_inprogress(self, data: Dict[str, Any]) -> None:
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
"""
Save record of function's execution being in progress

Parameters
----------
data: Dict[str, Any]
Payload
remaining_time_in_millis: Optional[int]
If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
"""
data_record = DataRecord(
idempotency_key=self._get_hashed_idempotency_key(data=data),
Expand All @@ -344,6 +351,19 @@ def save_inprogress(self, data: Dict[str, Any]) -> None:
payload_hash=self._get_hashed_payload(data=data),
)

if remaining_time_in_millis:
now = datetime.datetime.now()
period = datetime.timedelta(milliseconds=remaining_time_in_millis)

# It's very important to use math.ceil here. Otherwise, we might return an integer that will be smaller
# than the current time in milliseconds, due to rounding. This will create a scenario where the record
# looks already expired in the store, but the invocation is still running.
timestamp = ceil((now + period).timestamp())

data_record.in_progress_expiry_timestamp = timestamp
else:
warnings.warn("Expires in progress is enabled but we couldn't determine the remaining time left")
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved

logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
IdempotencyItemAlreadyExistsError,
IdempotencyItemNotFoundError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord
from aws_lambda_powertools.utilities.idempotency.persistence.base import STATUS_CONSTANTS, DataRecord

logger = logging.getLogger(__name__)

Expand All @@ -25,6 +25,7 @@ def __init__(
static_pk_value: Optional[str] = None,
sort_key_attr: Optional[str] = None,
expiry_attr: str = "expiration",
in_progress_expiry_attr: str = "in_progress_expiration",
status_attr: str = "status",
data_attr: str = "data",
validation_key_attr: str = "validation",
Expand All @@ -47,6 +48,8 @@ def __init__(
DynamoDB attribute name for the sort key
expiry_attr: str, optional
DynamoDB attribute name for expiry timestamp, by default "expiration"
in_progress_expiry_attr: str, optional
DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
status_attr: str, optional
DynamoDB attribute name for status, by default "status"
data_attr: str, optional
Expand Down Expand Up @@ -85,6 +88,7 @@ def __init__(
self.static_pk_value = static_pk_value
self.sort_key_attr = sort_key_attr
self.expiry_attr = expiry_attr
self.in_progress_expiry_attr = in_progress_expiry_attr
self.status_attr = status_attr
self.data_attr = data_attr
self.validation_key_attr = validation_key_attr
Expand Down Expand Up @@ -133,6 +137,7 @@ def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
idempotency_key=item[self.key_attr],
status=item[self.status_attr],
expiry_timestamp=item[self.expiry_attr],
in_progress_expiry_timestamp=item.get(self.in_progress_expiry_attr),
response_data=item.get(self.data_attr),
payload_hash=item.get(self.validation_key_attr),
)
Expand All @@ -153,34 +158,51 @@ def _put_record(self, data_record: DataRecord) -> None:
self.status_attr: data_record.status,
}

if data_record.in_progress_expiry_timestamp is not None:
item[self.in_progress_expiry_attr] = data_record.in_progress_expiry_timestamp

if self.payload_validation_enabled:
item[self.validation_key_attr] = data_record.payload_hash

now = datetime.datetime.now()
try:
logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}")

self.table.put_item(
Item=item,
ConditionExpression="attribute_not_exists(#id) OR #now < :now",
ExpressionAttributeNames={"#id": self.key_attr, "#now": self.expiry_attr},
ExpressionAttributeValues={":now": int(now.timestamp())},
ConditionExpression=(
"attribute_not_exists(#id) OR #now < :now OR "
"(attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now AND #status = :inprogress)"
),
ExpressionAttributeNames={
"#id": self.key_attr,
"#now": self.expiry_attr,
"#in_progress_expiry": self.in_progress_expiry_attr,
"#status": self.status_attr,
},
ExpressionAttributeValues={":now": int(now.timestamp()), ":inprogress": STATUS_CONSTANTS["INPROGRESS"]},
)
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
raise IdempotencyItemAlreadyExistsError

def _update_record(self, data_record: DataRecord):
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status"
update_expression = (
"SET #response_data = :response_data, #expiry = :expiry, "
"#status = :status, #in_progress_expiry = :in_progress_expiry"
rubenfonseca marked this conversation as resolved.
Show resolved Hide resolved
)
expression_attr_values = {
":expiry": data_record.expiry_timestamp,
":response_data": data_record.response_data,
":status": data_record.status,
":in_progress_expiry": data_record.in_progress_expiry_timestamp,
}
expression_attr_names = {
"#response_data": self.data_attr,
"#expiry": self.expiry_attr,
"#status": self.status_attr,
"#in_progress_expiry": self.in_progress_expiry_attr,
}

if self.payload_validation_enabled:
Expand Down
Binary file removed docs/media/idempotent_sequence.png
Binary file not shown.
Binary file removed docs/media/idempotent_sequence_exception.png
Binary file not shown.
Loading