Skip to content

Commit

Permalink
fix(tracing/botocore): fix incorrect context propagation type for SNS (
Browse files Browse the repository at this point in the history
…#3404)

Making sure msg attributes are encoded by the tracer (by changing the Type) allows our lambda library to decode the message attribute and create inferred spans for sns and sqs.
Checkout this dd-lambda-python [PR](DataDog/datadog-lambda-python#211) for more context.
## Checklist
- [ ] Added to the correct milestone.
- [ ] Tests provided or description of manual testing performed is included in the code or PR.
- [ ] Library documentation is updated.
- [ ] [Corp site](https://github.com/DataDog/documentation/) documentation is updated (link to the PR).

(cherry picked from commit bd8e176)
  • Loading branch information
zARODz11z authored and mergify-bot committed Mar 22, 2022
1 parent 6cbc05b commit f6b0089
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 17 deletions.
42 changes: 28 additions & 14 deletions ddtrace/contrib/botocore/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,41 @@ class TraceInjectionDecodingError(Exception):
pass


def inject_trace_data_to_message_attributes(trace_data, entry):
# type: (Dict[str, str], Dict[str, Any]) -> None
def inject_trace_data_to_message_attributes(trace_data, entry, endpoint=None):
# type: (Dict[str, str], Dict[str, Any], Optional[str]) -> None
"""
:trace_data: trace headers to be stored in the entry's MessageAttributes
:entry: an SQS or SNS record
:endpoint: endpoint of message, "sqs" or "sns"
Inject trace headers into the an SQS or SNS record's MessageAttributes
"""
if "MessageAttributes" not in entry:
entry["MessageAttributes"] = {}
# An Amazon SQS message can contain up to 10 metadata attributes.
# Max of 10 message attributes.
if len(entry["MessageAttributes"]) < 10:
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)}
if endpoint == "sqs":
# Use String since changing this to Binary would be a breaking
# change as other tracers expect this to be a String.
entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)}
elif endpoint == "sns":
# Use Binary since SNS subscription filter policies fail silently
# with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269
# AWS will encode our value if it sees "Binary"
entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)}
else:
log.warning("skipping trace injection, endpoint is not SNS or SQS")
else:
# In the event a record has 10 or more msg attributes we cannot add our _datadog msg attribute
log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded")


def inject_trace_to_sqs_or_sns_batch_message(params, span):
# type: (Any, Span) -> None
def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint=None):
# type: (Any, Span, Optional[str]) -> None
"""
:params: contains the params for the current botocore action
:span: the span which provides the trace context to be propagated
:endpoint: endpoint of message, "sqs" or "sns"
Inject trace headers into MessageAttributes for all SQS or SNS records inside a batch
"""
Expand All @@ -94,21 +107,22 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span):
# or PublishBatchRequestEntries (in case of PublishBatch).
entries = params.get("Entries", params.get("PublishBatchRequestEntries", []))
for entry in entries:
inject_trace_data_to_message_attributes(trace_data, entry)
inject_trace_data_to_message_attributes(trace_data, entry, endpoint)


def inject_trace_to_sqs_or_sns_message(params, span):
# type: (Any, Span) -> None
def inject_trace_to_sqs_or_sns_message(params, span, endpoint=None):
# type: (Any, Span, Optional[str]) -> None
"""
:params: contains the params for the current botocore action
:span: the span which provides the trace context to be propagated
:endpoint: endpoint of message, "sqs" or "sns"
Inject trace headers into MessageAttributes for the SQS or SNS record
"""
trace_data = {}
HTTPPropagator.inject(span.context, trace_data)

inject_trace_data_to_message_attributes(trace_data, params)
inject_trace_data_to_message_attributes(trace_data, params, endpoint)


def inject_trace_to_eventbridge_detail(params, span):
Expand Down Expand Up @@ -293,17 +307,17 @@ def patched_api_call(original_func, instance, args, kwargs):
if endpoint_name == "lambda" and operation == "Invoke":
inject_trace_to_client_context(params, span)
if endpoint_name == "sqs" and operation == "SendMessage":
inject_trace_to_sqs_or_sns_message(params, span)
inject_trace_to_sqs_or_sns_message(params, span, endpoint_name)
if endpoint_name == "sqs" and operation == "SendMessageBatch":
inject_trace_to_sqs_or_sns_batch_message(params, span)
inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name)
if endpoint_name == "events" and operation == "PutEvents":
inject_trace_to_eventbridge_detail(params, span)
if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords"):
inject_trace_to_kinesis_stream(params, span)
if endpoint_name == "sns" and operation == "Publish":
inject_trace_to_sqs_or_sns_message(params, span)
inject_trace_to_sqs_or_sns_message(params, span, endpoint_name)
if endpoint_name == "sns" and operation == "PublishBatch":
inject_trace_to_sqs_or_sns_batch_message(params, span)
inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name)
except Exception:
log.warning("Unable to inject trace context", exc_info=True)

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ services:
- ./.ddriot:/root/project/.riot

localstack:
image: localstack/localstack:0.13.1
image: localstack/localstack:0.14.1
network_mode: bridge
ports:
- "127.0.0.1:4566:4566"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
botocore: fix incorrect context propagation message attribute types for SNS. This addresses `Datadog/serverless-plugin-datadog#232 <https://github.com/DataDog/serverless-plugin-datadog/issues/232>`_
8 changes: 6 additions & 2 deletions tests/contrib/botocore/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,9 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self):
assert msg_str == "test"
msg_attr = msg_body["MessageAttributes"]
assert msg_attr.get("_datadog") is not None
headers = json.loads(msg_attr["_datadog"]["Value"])
assert msg_attr["_datadog"]["Type"] == "Binary"
datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"])
headers = json.loads(datadog_value_decoded.decode())
assert headers is not None
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)
Expand Down Expand Up @@ -1068,7 +1070,9 @@ def test_sns_send_message_trace_injection_with_message_attributes(self):
assert msg_str == "test"
msg_attr = msg_body["MessageAttributes"]
assert msg_attr.get("_datadog") is not None
headers = json.loads(msg_attr["_datadog"]["Value"])
assert msg_attr["_datadog"]["Type"] == "Binary"
datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"])
headers = json.loads(datadog_value_decoded.decode())
assert headers is not None
assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id)
assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)
Expand Down

0 comments on commit f6b0089

Please sign in to comment.