Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush meter provider at end of lambda function handler #1613

Merged
merged 3 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1553](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1553))
- `opentelemetry/sdk/extension/aws` Implement [`aws.ecs.*`](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/resource/semantic_conventions/cloud_provider/aws/ecs.md) and [`aws.logs.*`](https://opentelemetry.io/docs/reference/specification/resource/semantic_conventions/cloud_provider/aws/logs/) resource attributes in the `AwsEcsResourceDetector` detector when the ECS Metadata v4 is available
([#1212](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1212))
- `opentelemetry-instrumentation-aws-lambda` Flush `MeterProvider` at end of function invocation.
([#1613](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1613))
- Fix aiohttp bug with unset `trace_configs` ([#1592](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1592))

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def lambda_handler(event, context):
The `instrument` method accepts the following keyword args:

tracer_provider (TracerProvider) - an optional tracer provider
meter_provider (MeterProvider) - an optional meter provider
event_context_extractor (Callable) - a function that returns an OTel Trace
Context given the Lambda Event the AWS Lambda was invoked with
this function signature is: def event_context_extractor(lambda_event: Any) -> Context
Expand All @@ -68,6 +69,7 @@ def custom_event_context_extractor(lambda_event):

import logging
import os
import time
from importlib import import_module
from typing import Any, Callable, Collection
from urllib.parse import urlencode
Expand All @@ -79,6 +81,10 @@ def custom_event_context_extractor(lambda_event):
from opentelemetry.instrumentation.aws_lambda.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.metrics import (
MeterProvider,
get_meter_provider,
)
from opentelemetry.propagate import get_global_textmap
from opentelemetry.propagators.aws.aws_xray_propagator import (
TRACE_HEADER_KEY,
Expand Down Expand Up @@ -274,6 +280,7 @@ def _instrument(
event_context_extractor: Callable[[Any], Context],
tracer_provider: TracerProvider = None,
disable_aws_context_propagation: bool = False,
meter_provider: MeterProvider = None,
):
def _instrumented_lambda_handler_call(
call_wrapped, instance, args, kwargs
Expand Down Expand Up @@ -352,15 +359,33 @@ def _instrumented_lambda_handler_call(
result.get("statusCode"),
)

now = time.time()
_tracer_provider = tracer_provider or get_tracer_provider()
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
# Assumes we are using the OpenTelemetry SDK implementation of the
# `TracerProvider`.
_tracer_provider.force_flush(flush_timeout)
except Exception: # pylint: disable=broad-except
logger.error(
"TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation."
if hasattr(_tracer_provider, "force_flush"):
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
_tracer_provider.force_flush(flush_timeout)
except Exception: # pylint: disable=broad-except
logger.exception(
f"TracerProvider failed to flush traces"
)
else:
logger.warning("TracerProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation.")

_meter_provider = meter_provider or get_meter_provider()
if hasattr(_meter_provider, "force_flush"):
rem = flush_timeout - (time.time()-now)*1000
if rem > 0:
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
_meter_provider.force_flush(rem)
except Exception: # pylint: disable=broad-except
logger.exception(
f"MeterProvider failed to flush metrics"
)
else:
logger.warning(
"MeterProvider was missing `force_flush` method. This is necessary in case of a Lambda freeze and would exist in the OTel SDK implementation."
)

return result
Expand All @@ -385,6 +410,7 @@ def _instrument(self, **kwargs):
Args:
**kwargs: Optional arguments
``tracer_provider``: a TracerProvider, defaults to global
``meter_provider``: a MeterProvider, defaults to global
``event_context_extractor``: a method which takes the Lambda
Event as input and extracts an OTel Context from it. By default,
the context is extracted from the HTTP headers of an API Gateway
Expand Down Expand Up @@ -432,6 +458,7 @@ def _instrument(self, **kwargs):
),
tracer_provider=kwargs.get("tracer_provider"),
disable_aws_context_propagation=disable_aws_context_propagation,
meter_provider=kwargs.get("meter_provider"),
)

def _uninstrument(self, **kwargs):
Expand Down