Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(idempotency): handle lambda timeout scenarios for INPROGRESS records #1387

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c53d504
feat(idempotency): add option to expire inprogress invocations
rubenfonseca Jul 25, 2022
90d21fb
chore(idempotency): make existing tests pass with expire_in_progress
rubenfonseca Jul 26, 2022
9676e13
chore(idempotency): add tests for expires_in_progress
rubenfonseca Jul 26, 2022
550e466
chore(docs): added docs about `expires_in_progress`
rubenfonseca Jul 26, 2022
bf18cc2
chore(idempotency): refactored expire in-progress logic
rubenfonseca Jul 27, 2022
c88482e
chore(idempotency): add tests
rubenfonseca Jul 27, 2022
f47ab1f
chore(idempotency): remove unused fixtures in tests
rubenfonseca Jul 27, 2022
a4f8ce7
chore(idempotency): make mypy happy
rubenfonseca Jul 27, 2022
975933d
chores(documentation): update sample code for `expires_in_progress`
rubenfonseca Jul 27, 2022
cd90fc1
chore(documentation): replace idempotency diagrams with mermaid.js
rubenfonseca Jul 27, 2022
631370d
chore(idempotency): remove param `expires_in_progress`
rubenfonseca Jul 27, 2022
61f94b3
chore(idempotency): remove more of the old code
rubenfonseca Jul 27, 2022
fa80aed
chore(docs): remove bad comment
rubenfonseca Jul 27, 2022
c706b0c
feat(idempotench): add mechanism to register lambda context
rubenfonseca Jul 27, 2022
1a72214
fix(idempotency): typo
rubenfonseca Jul 27, 2022
0ce0bb2
fix(idempotency): capture the lambda context automatically
rubenfonseca Jul 28, 2022
a2b6a34
chore(idempotency): addressed review comments
rubenfonseca Jul 29, 2022
228a76d
docs(idempotency): include register_lambda_context in doc snippets
heitorlessa Jul 29, 2022
3cb7411
chore(idempotency): added tests for handle_for_status
rubenfonseca Jul 29, 2022
5c09a5a
chore(docs): add documentation to method
rubenfonseca Jul 29, 2022
2e1afd3
chore(idempotency): address comments
rubenfonseca Jul 29, 2022
0e9dfd4
chore(idempotency): simplified strings
rubenfonseca Jul 29, 2022
81ed53d
chore(documentation): addressed comments
rubenfonseca Jul 29, 2022
66b62a6
chore(idempotency): no need to update expire on update
rubenfonseca Jul 29, 2022
84ced8e
docs(idempotency): reorder wording, banners to emphasize the need, an…
heitorlessa Jul 29, 2022
1ee5c25
Merge remote-tracking branch 'rubenfonseca/feat/expire-inprogress' in…
heitorlessa Jul 29, 2022
84280af
docs(idempotency): shorten wording to fit new mermaid SVG
heitorlessa Jul 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
from copy import deepcopy
from typing import Any, Callable, Dict, Optional, Tuple
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(
self.data = deepcopy(_prepare_data(function_payload))
self.fn_args = function_args
self.fn_kwargs = function_kwargs
self.config = config

persistence_store.configure(config, self.function.__name__)
self.persistence_store = persistence_store
Expand Down Expand Up @@ -101,7 +103,9 @@ def _process_idempotency(self):
try:
# We call save_inprogress first as an optimization for the most common case where no idempotent record
# already exists. If it succeeds, there's no need to call get_record.
self.persistence_store.save_inprogress(data=self.data)
self.persistence_store.save_inprogress(
data=self.data, remaining_time_in_millis=self._get_remaining_time_in_millis()
)
except IdempotencyKeyError:
raise
except IdempotencyItemAlreadyExistsError:
Expand All @@ -113,6 +117,25 @@ def _process_idempotency(self):

return self._get_function_response()

def _get_remaining_time_in_millis(self) -> Optional[int]:
"""
Tries to determine the remaining time available for the current lambda invocation.
This only works if the idempotent handler decorator is used, since we need to access the lambda context.
However, this could be improved if we start storing the lambda context globally during the invocation. One
way to do this is to register the lambda context when configuring the IdempotencyConfig object.
Returns
-------
Optional[int]
Remaining time in millis, or None if the remaining time cannot be determined.
"""

if self.config.lambda_context is not None:
return self.config.lambda_context.get_remaining_time_in_millis()

return None

def _get_idempotency_record(self) -> DataRecord:
"""
Retrieve the idempotency record from the persistence layer.
Expand Down Expand Up @@ -167,6 +190,13 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any]
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")

if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
if data_record.in_progress_expiry_timestamp is not None and data_record.in_progress_expiry_timestamp < int(
datetime.datetime.now().timestamp() * 1000
):
raise IdempotencyInconsistentStateError(
"item should have been expired in-progress because it already time-outed."
)

raise IdempotencyAlreadyInProgressError(
f"Execution already in progress with idempotency key: "
f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}"
Expand Down
10 changes: 10 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Dict, Optional

from aws_lambda_powertools.utilities.typing import LambdaContext


class IdempotencyConfig:
def __init__(
Expand All @@ -12,6 +14,7 @@ def __init__(
use_local_cache: bool = False,
local_cache_max_items: int = 256,
hash_function: str = "md5",
lambda_context: Optional[LambdaContext] = None,
):
"""
Initialize the base persistence layer
Expand All @@ -32,6 +35,8 @@ def __init__(
Max number of items to store in local cache, by default 1024
hash_function: str, optional
Function to use for calculating hashes, by default md5.
lambda_context: LambdaContext, optional
Lambda Context containing information about the invocation, function and execution environment.
"""
self.event_key_jmespath = event_key_jmespath
self.payload_validation_jmespath = payload_validation_jmespath
Expand All @@ -41,3 +46,8 @@ def __init__(
self.use_local_cache = use_local_cache
self.local_cache_max_items = local_cache_max_items
self.hash_function = hash_function
self.lambda_context: Optional[LambdaContext] = lambda_context

def register_lambda_context(self, lambda_context: LambdaContext):
"""Captures the Lambda context, to calculate the remaining time before the invocation times out"""
self.lambda_context = lambda_context
2 changes: 2 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/idempotency.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def idempotent(
return handler(event, context)

config = config or IdempotencyConfig()
config.register_lambda_context(context)

args = event, context
idempotency_handler = IdempotencyHandler(
function=handler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
idempotency_key,
status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
response_data: Optional[str] = "",
payload_hash: Optional[str] = None,
) -> None:
Expand All @@ -53,6 +54,8 @@ def __init__(
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp: int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
Expand All @@ -61,6 +64,7 @@ def __init__(
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
self._status = status
self.response_data = response_data

Expand Down Expand Up @@ -328,14 +332,16 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None:

self._save_to_cache(data_record=data_record)

def save_inprogress(self, data: Dict[str, Any]) -> None:
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
"""
Save record of function's execution being in progress
Parameters
----------
data: Dict[str, Any]
Payload
remaining_time_in_millis: Optional[int]
If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
"""
data_record = DataRecord(
idempotency_key=self._get_hashed_idempotency_key(data=data),
Expand All @@ -344,6 +350,18 @@ def save_inprogress(self, data: Dict[str, Any]) -> None:
payload_hash=self._get_hashed_payload(data=data),
)

if remaining_time_in_millis:
now = datetime.datetime.now()
period = datetime.timedelta(milliseconds=remaining_time_in_millis)
timestamp = (now + period).timestamp()

data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
else:
warnings.warn(
"Couldn't determine the remaining time left. "
"Did you call register_lambda_context on IdempotencyConfig?"
)

logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
IdempotencyItemAlreadyExistsError,
IdempotencyItemNotFoundError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord
from aws_lambda_powertools.utilities.idempotency.persistence.base import STATUS_CONSTANTS, DataRecord

logger = logging.getLogger(__name__)

Expand All @@ -25,6 +25,7 @@ def __init__(
static_pk_value: Optional[str] = None,
sort_key_attr: Optional[str] = None,
expiry_attr: str = "expiration",
in_progress_expiry_attr: str = "in_progress_expiration",
status_attr: str = "status",
data_attr: str = "data",
validation_key_attr: str = "validation",
Expand All @@ -47,6 +48,8 @@ def __init__(
DynamoDB attribute name for the sort key
expiry_attr: str, optional
DynamoDB attribute name for expiry timestamp, by default "expiration"
in_progress_expiry_attr: str, optional
DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
status_attr: str, optional
DynamoDB attribute name for status, by default "status"
data_attr: str, optional
Expand Down Expand Up @@ -85,6 +88,7 @@ def __init__(
self.static_pk_value = static_pk_value
self.sort_key_attr = sort_key_attr
self.expiry_attr = expiry_attr
self.in_progress_expiry_attr = in_progress_expiry_attr
self.status_attr = status_attr
self.data_attr = data_attr
self.validation_key_attr = validation_key_attr
Expand Down Expand Up @@ -133,6 +137,7 @@ def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
idempotency_key=item[self.key_attr],
status=item[self.status_attr],
expiry_timestamp=item[self.expiry_attr],
in_progress_expiry_timestamp=item.get(self.in_progress_expiry_attr),
response_data=item.get(self.data_attr),
payload_hash=item.get(self.validation_key_attr),
)
Expand All @@ -153,33 +158,75 @@ def _put_record(self, data_record: DataRecord) -> None:
self.status_attr: data_record.status,
}

if data_record.in_progress_expiry_timestamp is not None:
item[self.in_progress_expiry_attr] = data_record.in_progress_expiry_timestamp

if self.payload_validation_enabled:
item[self.validation_key_attr] = data_record.payload_hash

now = datetime.datetime.now()
try:
logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}")

# | LOCKED | RETRY if status = "INPROGRESS" | RETRY
# |----------------|-------------------------------------------------------|-------------> .... (time)
# | Lambda Idempotency Record
# | Timeout Timeout
# | (in_progress_expiry) (expiry)

# Conditions to successfully save a record:

# The idempotency key does not exist:
# - first time that this invocation key is used
# - previous invocation with the same key was deleted due to TTL
idempotency_key_not_exist = "attribute_not_exists(#id)"

# The idempotency record exists but it's expired:
idempotency_expiry_expired = "#expiry < :now"

# The status of the record is "INPROGRESS", there is an in-progress expiry timestamp, but it's expired
inprogress_expiry_expired = " AND ".join(
[
"#status = :inprogress",
"attribute_exists(#in_progress_expiry)",
"#in_progress_expiry < :now_in_millis",
]
)

condition_expression = (
f"{idempotency_key_not_exist} OR {idempotency_expiry_expired} OR ({inprogress_expiry_expired})"
)

self.table.put_item(
Item=item,
ConditionExpression="attribute_not_exists(#id) OR #now < :now",
ExpressionAttributeNames={"#id": self.key_attr, "#now": self.expiry_attr},
ExpressionAttributeValues={":now": int(now.timestamp())},
ConditionExpression=condition_expression,
ExpressionAttributeNames={
"#id": self.key_attr,
"#expiry": self.expiry_attr,
"#in_progress_expiry": self.in_progress_expiry_attr,
"#status": self.status_attr,
},
ExpressionAttributeValues={
":now": int(now.timestamp()),
":now_in_millis": int(now.timestamp() * 1000),
":inprogress": STATUS_CONSTANTS["INPROGRESS"],
},
)
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
raise IdempotencyItemAlreadyExistsError

def _update_record(self, data_record: DataRecord):
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status"
update_expression = "SET #response_data = :response_data, #expiry = :expiry, " "#status = :status"
expression_attr_values = {
":expiry": data_record.expiry_timestamp,
":response_data": data_record.response_data,
":status": data_record.status,
}
expression_attr_names = {
"#response_data": self.data_attr,
"#expiry": self.expiry_attr,
"#response_data": self.data_attr,
"#status": self.status_attr,
}

Expand Down
Binary file removed docs/media/idempotent_sequence.png
Binary file not shown.
Binary file removed docs/media/idempotent_sequence_exception.png
Binary file not shown.
Loading