Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…tools-python into develop

* 'develop' of https://github.com/awslabs/aws-lambda-powertools-python:
  feat(idempotency): handle lambda timeout scenarios for INPROGRESS records (#1387)
  chore(deps): bump jsii from 1.57.0 to 1.63.1 (#1390)
  chore(deps): bump constructs from 10.1.1 to 10.1.59 (#1396)
  chore(deps-dev): bump flake8-isort from 4.1.1 to 4.1.2.post0 (#1384)
  docs(examples): enforce and fix all mypy errors (#1393)
  chore(ci): drop 3.6 from workflows (#1395)
  • Loading branch information
heitorlessa committed Jul 29, 2022
2 parents 77fc1bc + 160feae commit 449b183
Show file tree
Hide file tree
Showing 39 changed files with 814 additions and 284 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,4 @@ changelog:
docker run -v "${PWD}":/workdir quay.io/git-chglog/git-chglog > CHANGELOG.md

mypy:
poetry run mypy --pretty aws_lambda_powertools
poetry run mypy --pretty aws_lambda_powertools examples
9 changes: 8 additions & 1 deletion aws_lambda_powertools/event_handler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@
Event handler decorators for common Lambda events
"""

from .api_gateway import ALBResolver, APIGatewayHttpResolver, ApiGatewayResolver, APIGatewayRestResolver, CORSConfig, Response
from .api_gateway import (
ALBResolver,
APIGatewayHttpResolver,
ApiGatewayResolver,
APIGatewayRestResolver,
CORSConfig,
Response,
)
from .appsync import AppSyncResolver

__all__ = [
Expand Down
1 change: 1 addition & 0 deletions aws_lambda_powertools/event_handler/appsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def lambda_handler(event, context):
ValueError
If we could not find a field resolver
"""
# Maintenance: revisit generics/overload to fix [attr-defined] in mypy usage
BaseRouter.current_event = data_model(event)
BaseRouter.lambda_context = context
resolver = self._get_resolver(BaseRouter.current_event.type_name, BaseRouter.current_event.field_name)
Expand Down
118 changes: 59 additions & 59 deletions aws_lambda_powertools/tracing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,34 @@
import numbers
import traceback
from contextlib import contextmanager
from typing import Any, AsyncContextManager, ContextManager, List, NoReturn, Optional, Set, Union
from typing import Any, Generator, List, NoReturn, Optional, Sequence, Union


class BaseProvider(abc.ABC):
@abc.abstractmethod # type: ignore
@contextmanager
def in_subsegment(self, name=None, **kwargs) -> ContextManager:
"""Return a subsegment context manger.
class BaseSegment(abc.ABC):
"""Holds common properties and methods on segment and subsegment."""

@abc.abstractmethod
def close(self, end_time: Optional[int] = None):
"""Close the trace entity by setting `end_time`
and flip the in progress flag to False.
Parameters
----------
name: str
Subsegment name
kwargs: Optional[dict]
Optional parameters to be propagated to segment
end_time: int
Time in epoch seconds, by default current time will be used.
"""

@abc.abstractmethod # type: ignore
@contextmanager
def in_subsegment_async(self, name=None, **kwargs) -> AsyncContextManager:
"""Return a subsegment async context manger.
@abc.abstractmethod
def add_subsegment(self, subsegment: Any):
"""Add input subsegment as a child subsegment."""

Parameters
----------
name: str
Subsegment name
kwargs: Optional[dict]
Optional parameters to be propagated to segment
"""
@abc.abstractmethod
def remove_subsegment(self, subsegment: Any):
"""Remove input subsegment from child subsegments."""

@abc.abstractmethod
def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]) -> NoReturn:
"""Annotate current active trace entity with a key-value pair.
"""Annotate segment or subsegment with a key-value pair.
Note: Annotations will be indexed for later search query.
Expand All @@ -48,9 +43,8 @@ def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]) -> N

@abc.abstractmethod
def put_metadata(self, key: str, value: Any, namespace: str = "default") -> NoReturn:
"""Add metadata to the current active trace entity.
Note: Metadata is not indexed but can be later retrieved by BatchGetTraces API.
"""Add metadata to segment or subsegment. Metadata is not indexed
but can be later retrieved by BatchGetTraces API.
Parameters
----------
Expand All @@ -63,45 +57,52 @@ def put_metadata(self, key: str, value: Any, namespace: str = "default") -> NoRe
"""

@abc.abstractmethod
def patch(self, modules: Set[str]) -> NoReturn:
"""Instrument a set of supported libraries
def add_exception(self, exception: BaseException, stack: List[traceback.StackSummary], remote: bool = False):
"""Add an exception to trace entities.
Parameters
----------
modules: Set[str]
Set of modules to be patched
"""

@abc.abstractmethod
def patch_all(self) -> NoReturn:
"""Instrument all supported libraries"""
exception: Exception
Caught exception
stack: List[traceback.StackSummary]
List of traceback summaries
Output from `traceback.extract_stack()`.
remote: bool
Whether it's a client error (False) or downstream service error (True), by default False
"""

class BaseSegment(abc.ABC):
"""Holds common properties and methods on segment and subsegment."""

class BaseProvider(abc.ABC):
@abc.abstractmethod
def close(self, end_time: Optional[int] = None):
"""Close the trace entity by setting `end_time`
and flip the in progress flag to False.
@contextmanager
def in_subsegment(self, name=None, **kwargs) -> Generator[BaseSegment, None, None]:
"""Return a subsegment context manger.
Parameters
----------
end_time: int
Time in epoch seconds, by default current time will be used.
name: str
Subsegment name
kwargs: Optional[dict]
Optional parameters to be propagated to segment
"""

@abc.abstractmethod
def add_subsegment(self, subsegment: Any):
"""Add input subsegment as a child subsegment."""
@contextmanager
def in_subsegment_async(self, name=None, **kwargs) -> Generator[BaseSegment, None, None]:
"""Return a subsegment async context manger.
@abc.abstractmethod
def remove_subsegment(self, subsegment: Any):
"""Remove input subsegment from child subsegments."""
Parameters
----------
name: str
Subsegment name
kwargs: Optional[dict]
Optional parameters to be propagated to segment
"""

@abc.abstractmethod
def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]) -> NoReturn:
"""Annotate segment or subsegment with a key-value pair.
"""Annotate current active trace entity with a key-value pair.
Note: Annotations will be indexed for later search query.
Expand All @@ -115,8 +116,9 @@ def put_annotation(self, key: str, value: Union[str, numbers.Number, bool]) -> N

@abc.abstractmethod
def put_metadata(self, key: str, value: Any, namespace: str = "default") -> NoReturn:
"""Add metadata to segment or subsegment. Metadata is not indexed
but can be later retrieved by BatchGetTraces API.
"""Add metadata to the current active trace entity.
Note: Metadata is not indexed but can be later retrieved by BatchGetTraces API.
Parameters
----------
Expand All @@ -129,17 +131,15 @@ def put_metadata(self, key: str, value: Any, namespace: str = "default") -> NoRe
"""

@abc.abstractmethod
def add_exception(self, exception: BaseException, stack: List[traceback.StackSummary], remote: bool = False):
"""Add an exception to trace entities.
def patch(self, modules: Sequence[str]) -> NoReturn:
"""Instrument a set of supported libraries
Parameters
----------
exception: Exception
Caught exception
stack: List[traceback.StackSummary]
List of traceback summaries
Output from `traceback.extract_stack()`.
remote: bool
Whether it's a client error (False) or downstream service error (True), by default False
modules: Set[str]
Set of modules to be patched
"""

@abc.abstractmethod
def patch_all(self) -> NoReturn:
"""Instrument all supported libraries"""
2 changes: 1 addition & 1 deletion aws_lambda_powertools/tracing/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def __init__(
self.__build_config(
service=service, disabled=disabled, auto_patch=auto_patch, patch_modules=patch_modules, provider=provider
)
self.provider = self._config["provider"]
self.provider: BaseProvider = self._config["provider"]
self.disabled = self._config["disabled"]
self.service = self._config["service"]
self.auto_patch = self._config["auto_patch"]
Expand Down
32 changes: 31 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
from copy import deepcopy
from typing import Any, Callable, Dict, Optional, Tuple
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(
self.data = deepcopy(_prepare_data(function_payload))
self.fn_args = function_args
self.fn_kwargs = function_kwargs
self.config = config

persistence_store.configure(config, self.function.__name__)
self.persistence_store = persistence_store
Expand Down Expand Up @@ -101,7 +103,9 @@ def _process_idempotency(self):
try:
# We call save_inprogress first as an optimization for the most common case where no idempotent record
# already exists. If it succeeds, there's no need to call get_record.
self.persistence_store.save_inprogress(data=self.data)
self.persistence_store.save_inprogress(
data=self.data, remaining_time_in_millis=self._get_remaining_time_in_millis()
)
except IdempotencyKeyError:
raise
except IdempotencyItemAlreadyExistsError:
Expand All @@ -113,6 +117,25 @@ def _process_idempotency(self):

return self._get_function_response()

def _get_remaining_time_in_millis(self) -> Optional[int]:
"""
Tries to determine the remaining time available for the current lambda invocation.
This only works if the idempotent handler decorator is used, since we need to access the lambda context.
However, this could be improved if we start storing the lambda context globally during the invocation. One
way to do this is to register the lambda context when configuring the IdempotencyConfig object.
Returns
-------
Optional[int]
Remaining time in millis, or None if the remaining time cannot be determined.
"""

if self.config.lambda_context is not None:
return self.config.lambda_context.get_remaining_time_in_millis()

return None

def _get_idempotency_record(self) -> DataRecord:
"""
Retrieve the idempotency record from the persistence layer.
Expand Down Expand Up @@ -167,6 +190,13 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any]
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")

if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
if data_record.in_progress_expiry_timestamp is not None and data_record.in_progress_expiry_timestamp < int(
datetime.datetime.now().timestamp() * 1000
):
raise IdempotencyInconsistentStateError(
"item should have been expired in-progress because it already time-outed."
)

raise IdempotencyAlreadyInProgressError(
f"Execution already in progress with idempotency key: "
f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}"
Expand Down
10 changes: 10 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Dict, Optional

from aws_lambda_powertools.utilities.typing import LambdaContext


class IdempotencyConfig:
def __init__(
Expand All @@ -12,6 +14,7 @@ def __init__(
use_local_cache: bool = False,
local_cache_max_items: int = 256,
hash_function: str = "md5",
lambda_context: Optional[LambdaContext] = None,
):
"""
Initialize the base persistence layer
Expand All @@ -32,6 +35,8 @@ def __init__(
Max number of items to store in local cache, by default 1024
hash_function: str, optional
Function to use for calculating hashes, by default md5.
lambda_context: LambdaContext, optional
Lambda Context containing information about the invocation, function and execution environment.
"""
self.event_key_jmespath = event_key_jmespath
self.payload_validation_jmespath = payload_validation_jmespath
Expand All @@ -41,3 +46,8 @@ def __init__(
self.use_local_cache = use_local_cache
self.local_cache_max_items = local_cache_max_items
self.hash_function = hash_function
self.lambda_context: Optional[LambdaContext] = lambda_context

def register_lambda_context(self, lambda_context: LambdaContext):
"""Captures the Lambda context, to calculate the remaining time before the invocation times out"""
self.lambda_context = lambda_context
2 changes: 2 additions & 0 deletions aws_lambda_powertools/utilities/idempotency/idempotency.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def idempotent(
return handler(event, context)

config = config or IdempotencyConfig()
config.register_lambda_context(context)

args = event, context
idempotency_handler = IdempotencyHandler(
function=handler,
Expand Down
20 changes: 19 additions & 1 deletion aws_lambda_powertools/utilities/idempotency/persistence/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
idempotency_key,
status: str = "",
expiry_timestamp: Optional[int] = None,
in_progress_expiry_timestamp: Optional[int] = None,
response_data: Optional[str] = "",
payload_hash: Optional[str] = None,
) -> None:
Expand All @@ -53,6 +54,8 @@ def __init__(
status of the idempotent record
expiry_timestamp: int, optional
time before the record should expire, in seconds
in_progress_expiry_timestamp: int, optional
time before the record should expire while in the INPROGRESS state, in seconds
payload_hash: str, optional
hashed representation of payload
response_data: str, optional
Expand All @@ -61,6 +64,7 @@ def __init__(
self.idempotency_key = idempotency_key
self.payload_hash = payload_hash
self.expiry_timestamp = expiry_timestamp
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
self._status = status
self.response_data = response_data

Expand Down Expand Up @@ -328,14 +332,16 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None:

self._save_to_cache(data_record=data_record)

def save_inprogress(self, data: Dict[str, Any]) -> None:
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
"""
Save record of function's execution being in progress
Parameters
----------
data: Dict[str, Any]
Payload
remaining_time_in_millis: Optional[int]
If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
"""
data_record = DataRecord(
idempotency_key=self._get_hashed_idempotency_key(data=data),
Expand All @@ -344,6 +350,18 @@ def save_inprogress(self, data: Dict[str, Any]) -> None:
payload_hash=self._get_hashed_payload(data=data),
)

if remaining_time_in_millis:
now = datetime.datetime.now()
period = datetime.timedelta(milliseconds=remaining_time_in_millis)
timestamp = (now + period).timestamp()

data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
else:
warnings.warn(
"Couldn't determine the remaining time left. "
"Did you call register_lambda_context on IdempotencyConfig?"
)

logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")

if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
Expand Down
Loading

0 comments on commit 449b183

Please sign in to comment.