From f6b0089c9a9930728206c1d1f516ad64fcd82231 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Tue, 22 Mar 2022 09:57:23 -0400 Subject: [PATCH] fix(tracing/botocore): fix incorrect context propagation type for SNS (#3404) Making sure msg attributes are encoded by the tracer (by changing the Type) allows our lambda library to decode the message attribute and create inferred spans for sns and sqs. Checkout this dd-lambda-python [PR](https://github.com/DataDog/datadog-lambda-python/pull/211) for more context. ## Checklist - [ ] Added to the correct milestone. - [ ] Tests provided or description of manual testing performed is included in the code or PR. - [ ] Library documentation is updated. - [ ] [Corp site](https://github.com/DataDog/documentation/) documentation is updated (link to the PR). (cherry picked from commit bd8e1762814f8bc56ef7ad0fee75efe68bc5dbf6) --- ddtrace/contrib/botocore/patch.py | 42 ++++++++++++------- docker-compose.yml | 2 +- ...sg-attributes-as-b64-7818aec10f533534.yaml | 4 ++ tests/contrib/botocore/test.py | 8 +++- 4 files changed, 39 insertions(+), 17 deletions(-) create mode 100644 releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index fa3f6caaca9..ec5195d67ba 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -61,28 +61,41 @@ class TraceInjectionDecodingError(Exception): pass -def inject_trace_data_to_message_attributes(trace_data, entry): - # type: (Dict[str, str], Dict[str, Any]) -> None +def inject_trace_data_to_message_attributes(trace_data, entry, endpoint=None): + # type: (Dict[str, str], Dict[str, Any], Optional[str]) -> None """ :trace_data: trace headers to be stored in the entry's MessageAttributes :entry: an SQS or SNS record + :endpoint: endpoint of message, "sqs" or "sns" Inject trace headers into the an SQS or SNS record's MessageAttributes """ if "MessageAttributes" not in entry: entry["MessageAttributes"] = {} - # An Amazon SQS message can contain up to 10 metadata attributes. + # Max of 10 message attributes. if len(entry["MessageAttributes"]) < 10: - entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + if endpoint == "sqs": + # Use String since changing this to Binary would be a breaking + # change as other tracers expect this to be a String. + entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + elif endpoint == "sns": + # Use Binary since SNS subscription filter policies fail silently + # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 + # AWS will encode our value if it sees "Binary" + entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} + else: + log.warning("skipping trace injection, endpoint is not SNS or SQS") else: + # In the event a record has 10 or more msg attributes we cannot add our _datadog msg attribute log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") -def inject_trace_to_sqs_or_sns_batch_message(params, span): - # type: (Any, Span) -> None +def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint=None): + # type: (Any, Span, Optional[str]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated + :endpoint: endpoint of message, "sqs" or "sns" Inject trace headers into MessageAttributes for all SQS or SNS records inside a batch """ @@ -94,21 +107,22 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span): # or PublishBatchRequestEntries (in case of PublishBatch). entries = params.get("Entries", params.get("PublishBatchRequestEntries", [])) for entry in entries: - inject_trace_data_to_message_attributes(trace_data, entry) + inject_trace_data_to_message_attributes(trace_data, entry, endpoint) -def inject_trace_to_sqs_or_sns_message(params, span): - # type: (Any, Span) -> None +def inject_trace_to_sqs_or_sns_message(params, span, endpoint=None): + # type: (Any, Span, Optional[str]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated + :endpoint: endpoint of message, "sqs" or "sns" Inject trace headers into MessageAttributes for the SQS or SNS record """ trace_data = {} HTTPPropagator.inject(span.context, trace_data) - inject_trace_data_to_message_attributes(trace_data, params) + inject_trace_data_to_message_attributes(trace_data, params, endpoint) def inject_trace_to_eventbridge_detail(params, span): @@ -293,17 +307,17 @@ def patched_api_call(original_func, instance, args, kwargs): if endpoint_name == "lambda" and operation == "Invoke": inject_trace_to_client_context(params, span) if endpoint_name == "sqs" and operation == "SendMessage": - inject_trace_to_sqs_or_sns_message(params, span) + inject_trace_to_sqs_or_sns_message(params, span, endpoint_name) if endpoint_name == "sqs" and operation == "SendMessageBatch": - inject_trace_to_sqs_or_sns_batch_message(params, span) + inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name) if endpoint_name == "events" and operation == "PutEvents": inject_trace_to_eventbridge_detail(params, span) if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords"): inject_trace_to_kinesis_stream(params, span) if endpoint_name == "sns" and operation == "Publish": - inject_trace_to_sqs_or_sns_message(params, span) + inject_trace_to_sqs_or_sns_message(params, span, endpoint_name) if endpoint_name == "sns" and operation == "PublishBatch": - inject_trace_to_sqs_or_sns_batch_message(params, span) + inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name) except Exception: log.warning("Unable to inject trace context", exc_info=True) diff --git a/docker-compose.yml b/docker-compose.yml index c097d40e88d..c17fff6bb02 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -132,7 +132,7 @@ services: - ./.ddriot:/root/project/.riot localstack: - image: localstack/localstack:0.13.1 + image: localstack/localstack:0.14.1 network_mode: bridge ports: - "127.0.0.1:4566:4566" diff --git a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml new file mode 100644 index 00000000000..69e39e21c66 --- /dev/null +++ b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + botocore: fix incorrect context propagation message attribute types for SNS. This addresses `Datadog/serverless-plugin-datadog#232 `_ diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index fe1b158fa71..5853b52762c 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1003,7 +1003,9 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self): assert msg_str == "test" msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None - headers = json.loads(msg_attr["_datadog"]["Value"]) + assert msg_attr["_datadog"]["Type"] == "Binary" + datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"]) + headers = json.loads(datadog_value_decoded.decode()) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) @@ -1068,7 +1070,9 @@ def test_sns_send_message_trace_injection_with_message_attributes(self): assert msg_str == "test" msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None - headers = json.loads(msg_attr["_datadog"]["Value"]) + assert msg_attr["_datadog"]["Type"] == "Binary" + datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"]) + headers = json.loads(datadog_value_decoded.decode()) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)