From dc1cc5e33a75e30ee05c57cc3d6c6b469811ada0 Mon Sep 17 00:00:00 2001 From: Povilas Versockas Date: Mon, 25 Sep 2023 11:06:25 +0300 Subject: [PATCH] feat: add eventbridge integration (#4) --- .../instrumentation/aws_lambda/__init__.py | 43 +++++++++++++++++++ .../instrumentation/botocore/__init__.py | 22 +++++++++- 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py index 8434cae78f..8b322b2a94 100644 --- a/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aws-lambda/src/opentelemetry/instrumentation/aws_lambda/__init__.py @@ -495,6 +495,31 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches except Exception as ex: pass + eventBridgeTriggerSpan = None + try: + if type(lambda_event) is dict and lambda_event.get("source") is not None and type(lambda_event.get("source")) is str: + span_name = 'EventBridge event' + if lambda_event.get("detail-type") is not None: + span_name = lambda_event.get("detail-type") + + links = [] + if lambda_event.get("detail") is not None and lambda_event["detail"].get("_context") is not None: + ctx = get_global_textmap().extract(carrier=lambda_event["detail"].get("_context")) + links.append(Link(get_current_span(ctx).get_span_context())) + + eventBridgeTriggerSpan = tracer.start_span(span_name, context=parent_context, kind=SpanKind.CONSUMER, links=links) + eventBridgeTriggerSpan.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub") + eventBridgeTriggerSpan.set_attribute("faas.trigger.type", "EventBridge") + eventBridgeTriggerSpan.set_attribute("aws.event.bridge.trigger.source", lambda_event.get("source")) + parent_context = set_span_in_context(eventBridgeTriggerSpan) + + eventBridgeTriggerSpan.set_attribute( + "rpc.request.body", + json.dumps(lambda_event), + ) + except Exception as ex: + pass + try: with tracer.start_as_current_span( name=orig_handler_name, @@ -651,6 +676,22 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches except Exception: pass cognitoTriggerSpan.end() + + if lambda_event and eventBridgeTriggerSpan is not None: + try: + if isinstance(result, dict) and result.get("statusCode"): + eventBridgeTriggerSpan.set_attribute( + SpanAttributes.HTTP_STATUS_CODE, + result.get("statusCode"), + ) + if isinstance(result, dict) and result.get("body"): + eventBridgeTriggerSpan.set_attribute( + "rpc.response.body", + result.get("body"), + ) + except Exception: + pass + eventBridgeTriggerSpan.end() now = time.time() _tracer_provider = tracer_provider or get_tracer_provider() @@ -678,6 +719,8 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches dynamoTriggerSpan.end() if cognitoTriggerSpan is not None: cognitoTriggerSpan.end() + if eventBridgeTriggerSpan is not None: + eventBridgeTriggerSpan.end() now = time.time() _tracer_provider = tracer_provider or get_tracer_provider() diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py index eeeb7d5190..f80f13915e 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py @@ -108,7 +108,7 @@ def response_hook(span, service_name, operation_name, result): from opentelemetry.propagate import inject from opentelemetry.propagators import textmap from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.trace import get_tracer +from opentelemetry.trace import get_tracer, SpanKind from opentelemetry.trace.span import Span import base64 import typing @@ -218,6 +218,8 @@ def _patched_api_call(self, original_func, instance, args, kwargs): body = call_context.params.get("Message") if body is not None: attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit,json.dumps(body, default=str)) + elif call_context.service == "events" and call_context.operation == "PutEvents": + call_context.span_kind = SpanKind.PRODUCER else: attributes["rpc.request.payload"] = limit_string_size(self.payload_size_limit, json.dumps(call_context.params, default=str)) except Exception as ex: @@ -274,7 +276,23 @@ def _patched_api_call(self, original_func, instance, args, kwargs): inject(carrier = entry.get("MessageAttributes"), setter=SQSSetter()) except Exception as ex: - pass + pass + + try: + if call_context.service == "events" and call_context.operation == "PutEvents": + if args[1].get("Entries") is not None: + for entry in args[1].get("Entries"): + if entry.get("Detail") is not None: + detailJson = json.loads(entry.get("Detail")) + detailJson['_context'] = {} + inject(carrier = detailJson['_context']) + entry['Detail'] = json.dumps(detailJson) + else: + detailJson = {'_context': {}} + inject(carrier = detailJson['_context']) + entry['Detail'] = json.dumps(detailJson) + except Exception as ex: + pass result = None try: