Skip to content

Commit

Permalink
feat(tracer): implement AWS payload tagging for request/response (#10642
Browse files Browse the repository at this point in the history
)

## Overview

This pull request adds the ability to expand AWS request/response
payloads as span tags.
This matches our lambda offerings and provides useful information to
developers when debugging communication between various AWS services.
This is based on the AWS Payload Tagging RFC and this implementation in
[dd-trace-node](DataDog/dd-trace-js#4309) and
this implementation in
[dd-trace-java](DataDog/dd-trace-java#7312).

This feature is _disabled_ by default.

When activated this will produce span tags such as:

```
 "aws.request.body.PublishBatchRequestEntries.0.Id": "1",
 "aws.request.body.PublishBatchRequestEntries.0.Message": "ironmaiden",
 "aws.request.body.PublishBatchRequestEntries.1.Id": "2",
 "aws.request.body.PublishBatchRequestEntries.1.Message": "megadeth"
 "aws.response.body.HTTPStatusCode": "200",
```

## Configuration

There are five new configuration options:

- `DD_TRACE_CLOUD_REQUEST_PAYLOAD_TAGGING`:
- `""` by default to indicate that AWS request payload expansion is
**disabled** for _requests_.
- `"all"` to define that AWS request payload expansion is **enabled**
for _requests_ using the default `JSONPath`s for redaction logic.
- a comma-separated list of user-supplied `JSONPath`s to define that AWS
request payload expansion is **enabled** for _requests_ using the
default `JSONPath`s and the user-supplied `JSONPath`s for redaction
logic.
- `DD_TRACE_CLOUD_RESPONSE_PAYLOAD_TAGGING`:
- `""` by default to indicate that AWS response payload expansion is
**disabled** for _responses_.
- `"all"` to define that AWS response payload expansion is **enabled**
for _responses_ using the default `JSONPath`s for redaction logic.
- a comma-separated list of user-supplied `JSONPath`s to define that AWS
request payload expansion is **enabled** for _responses_ using the
default `JSONPath`s and the user-supplied `JSONPath`s for redaction
logic.
- `DD_TRACE_CLOUD_PAYLOAD_TAGGING_MAX_DEPTH` (not defined in RFC but
done to match NodeJS):
  - sets the depth after which we stop creating tags from a payload
  - defaults to a value of `10`
- `DD_TRACE_CLOUD_PAYLOAD_TAGGING_MAX_TAGS` (to match Java
implementation)
  - sets the maximum number of tags allowed to be expanded
  - defaults to a value of `758`
- `DD_TRACE_CLOUD_PAYLOAD_TAGGING_SERVICES` (to match Java
implementation)
  - a comma-separated list of supported AWS services
  - defaults to ` s3,sns,sqs,kinesis,eventbridge`

## Other

- [`jsonpath-ng` has been
vendored](https://github.com/h2non/jsonpath-ng/blob/master/jsonpath_ng/jsonpath.py)
- [`ply` has been vendored (v3.11) (dependency of
`jsonpath-ng`)](https://github.com/dabeaz/ply/releases/tag/3.11)

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: erikayasuda <[email protected]>
  • Loading branch information
2 people authored and quinna-h committed Nov 13, 2024
1 parent 8057c09 commit cfffb02
Show file tree
Hide file tree
Showing 26 changed files with 8,544 additions and 3 deletions.
242 changes: 242 additions & 0 deletions ddtrace/_trace/utils_botocore/aws_payload_tagging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
import copy
from decimal import Decimal
import json
from typing import Any
from typing import Dict
from typing import Optional

from ddtrace import Span
from ddtrace import config
from ddtrace.vendor.jsonpath_ng import parse


_MAX_TAG_VALUE_LENGTH = 5000


class AWSPayloadTagging:
_INCOMPLETE_TAG = "_dd.payload_tags_incomplete" # Set to True if MAX_TAGS is reached

_REDACTION_PATHS_DEFAULTS = [
# SNS
"$..Attributes.KmsMasterKeyId",
"$..Attributes.Token",
# EventBridge
"$..AuthParameters.OAuthParameters.OAuthHttpParameters.HeaderParameters[*].Value",
"$..AuthParameters.OAuthParameters.OAuthHttpParameters.QueryStringParameters[*].Value",
"$..AuthParameters.OAuthParameters.OAuthHttpParameters.BodyParameters[*].Value",
"$..AuthParameters.InvocationHttpParameters.HeaderParameters[*].Value",
"$..AuthParameters.InvocationHttpParameters.QueryStringParameters[*].Value",
"$..AuthParameters.InvocationHttpParameters.BodyParameters[*].Value",
"$..Targets[*].RedshiftDataParameters.Sql",
"$..Targets[*].RedshiftDataParameters.Sqls",
"$..Targets[*].AppSyncParameters.GraphQLOperation",
# // S3
"$..SSEKMSKeyId",
"$..SSEKMSEncryptionContext",
]
_REQUEST_REDACTION_PATHS_DEFAULTS = [
# Sns
"$..Attributes.PlatformCredential",
"$..Attributes.PlatformPrincipal",
"$..AWSAccountId",
"$..Endpoint",
"$..Token",
"$..OneTimePassword",
"$..phoneNumber",
"$..PhoneNumber",
# EventBridge
"$..AuthParameters.BasicAuthParameters.Password",
"$..AuthParameters.OAuthParameters.ClientParameters.ClientSecret",
"$..AuthParameters.ApiKeyAuthParameters.ApiKeyValue",
# S3
"$..SSECustomerKey",
"$..CopySourceSSECustomerKey",
"$..RestoreRequest.OutputLocation.S3.Encryption.KMSKeyId",
]

_RESPONSE_REDACTION_PATHS_DEFAULTS = [
# // Sns
"$..Endpoints.*.Token",
"$..PlatformApplication.*.PlatformCredential",
"$..PlatformApplication.*.PlatformPrincipal",
"$..Subscriptions.*.Endpoint",
"$..PhoneNumbers[*].PhoneNumber",
"$..phoneNumbers[*]",
# // S3
"$..Credentials.SecretAccessKey",
"$..Credentials.SessionToken",
]

def __init__(self):
self.current_tag_count = 0
self.validated = False
self.request_redaction_paths = None
self.response_redaction_paths = None

def expand_payload_as_tags(self, span: Span, result: Dict[str, Any], key: str) -> None:
"""
Expands the JSON payload from various AWS services into tags and sets them on the Span.
"""
if not self.validated:
self.request_redaction_paths = self._get_redaction_paths_request()
self.response_redaction_paths = self._get_redaction_paths_response()
self.validated = True

if not self.request_redaction_paths and not self.response_redaction_paths:
return

if not result:
return

# we will be redacting at least one of request/response
redacted_dict = copy.deepcopy(result)
self.current_tag_count = 0
if self.request_redaction_paths:
self._redact_json(redacted_dict, span, self.request_redaction_paths)
if self.response_redaction_paths:
self._redact_json(redacted_dict, span, self.response_redaction_paths)

# flatten the payload into span tags
for key2, value in redacted_dict.items():
escaped_sub_key = key2.replace(".", "\\.")
self._tag_object(span, f"{key}.{escaped_sub_key}", value)
if self.current_tag_count >= config.botocore.get("payload_tagging_max_tags"):
return

def _should_json_parse(self, obj: Any) -> bool:
if isinstance(obj, (str, bytes)):
return True
return False

def _validate_json_paths(self, paths: Optional[str]) -> bool:
"""
Checks whether paths is "all" or all valid JSONPaths
"""
if not paths:
return False # not enabled

if paths == "all":
return True # enabled, use the defaults

# otherwise validate that we have valid JSONPaths
for path in paths.split(","):
if path:
try:
parse(path)
except Exception:
return False
else:
return False

return True

def _redact_json(self, data: Dict[str, Any], span: Span, paths: list) -> None:
"""
Redact sensitive data in the JSON payload based on default and user-provided JSONPath expressions
"""
for path in paths:
expression = parse(path)
for match in expression.find(data):
match.context.value[match.path.fields[0]] = "redacted"

def _get_redaction_paths_response(self) -> list:
"""
Get the list of redaction paths, combining defaults with any user-provided JSONPaths.
"""
if not config.botocore.get("payload_tagging_response"):
return []

response_redaction = config.botocore.get("payload_tagging_response")
if self._validate_json_paths(response_redaction):
if response_redaction == "all":
return self._RESPONSE_REDACTION_PATHS_DEFAULTS + self._REDACTION_PATHS_DEFAULTS
return (
self._RESPONSE_REDACTION_PATHS_DEFAULTS + self._REDACTION_PATHS_DEFAULTS + response_redaction.split(",")
)

return []

def _get_redaction_paths_request(self) -> list:
"""
Get the list of redaction paths, combining defaults with any user-provided JSONPaths.
"""
if not config.botocore.get("payload_tagging_request"):
return []

request_redaction = config.botocore.get("payload_tagging_request")
if self._validate_json_paths(request_redaction):
if request_redaction == "all":
return self._REQUEST_REDACTION_PATHS_DEFAULTS + self._REDACTION_PATHS_DEFAULTS
return (
self._REQUEST_REDACTION_PATHS_DEFAULTS + self._REDACTION_PATHS_DEFAULTS + request_redaction.split(",")
)

return []

def _tag_object(self, span: Span, key: str, obj: Any, depth: int = 0) -> None:
"""
Recursively expands the given AWS payload object and adds the values as flattened Span tags.
It is not expected that AWS Payloads will be deeply nested so the number of recursive calls should be low.
For example, the following (shortened payload object) becomes:
{
"ResponseMetadata": {
"RequestId": "SOMEID",
"HTTPHeaders": {
"x-amz-request-id": "SOMEID",
"content-length": "5",
}
}
=>
"aws.response.body.RequestId": "SOMEID"
"aws.response.body.HTTPHeaders.x-amz-request-id": "SOMEID"
"aws.response.body.HTTPHeaders.content-length": "5"
"""
# if we've hit the maximum allowed tags, mark the expansion as incomplete
if self.current_tag_count >= config.botocore.get("payload_tagging_max_tags"):
span.set_tag(self._INCOMPLETE_TAG, True)
return
if obj is None:
self.current_tag_count += 1
span.set_tag(key, obj)
return
if depth >= config.botocore.get("payload_tagging_max_depth"):
self.current_tag_count += 1
span.set_tag(
key, str(obj)[:_MAX_TAG_VALUE_LENGTH]
) # at the maximum depth - set the tag without further expansion
return
depth += 1
if self._should_json_parse(obj):
try:
parsed = json.loads(obj)
self._tag_object(span, key, parsed, depth)
except ValueError:
self.current_tag_count += 1
span.set_tag(key, str(obj)[:_MAX_TAG_VALUE_LENGTH])
return
if isinstance(obj, (int, float, Decimal)):
self.current_tag_count += 1
span.set_tag(key, str(obj))
return
if isinstance(obj, list):
for k, v in enumerate(obj):
self._tag_object(span, f"{key}.{k}", v, depth)
return
if hasattr(obj, "items"):
for k, v in obj.items():
escaped_key = str(k).replace(".", "\\.")
self._tag_object(span, f"{key}.{escaped_key}", v, depth)
return
if hasattr(obj, "to_dict"):
for k, v in obj.to_dict().items():
escaped_key = str(k).replace(".", "\\.")
self._tag_object(span, f"{key}.{escaped_key}", v, depth)
return
try:
value_as_str = str(obj)
except Exception:
value_as_str = "UNKNOWN"
self.current_tag_count += 1
span.set_tag(key, value_as_str)
14 changes: 14 additions & 0 deletions ddtrace/_trace/utils_botocore/span_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ddtrace import Span
from ddtrace import config
from ddtrace._trace.utils_botocore.aws_payload_tagging import AWSPayloadTagging
from ddtrace.constants import _ANALYTICS_SAMPLE_RATE_KEY
from ddtrace.constants import SPAN_KIND
from ddtrace.constants import SPAN_MEASURED_KEY
Expand All @@ -15,6 +16,9 @@
from ddtrace.internal.utils.formats import deep_getattr


_PAYLOAD_TAGGER = AWSPayloadTagging()


def set_botocore_patched_api_call_span_tags(span: Span, instance, args, params, endpoint_name, operation):
span.set_tag_str(COMPONENT, config.botocore.integration_name)
# set span.kind to the type of request being performed
Expand All @@ -31,6 +35,11 @@ def set_botocore_patched_api_call_span_tags(span: Span, instance, args, params,
if params and not config.botocore["tag_no_params"]:
aws._add_api_param_span_tags(span, endpoint_name, params)

if config.botocore["payload_tagging_request"] and endpoint_name in config.botocore.get(
"payload_tagging_services"
):
_PAYLOAD_TAGGER.expand_payload_as_tags(span, params, "aws.request.body")

else:
span.resource = endpoint_name

Expand All @@ -54,6 +63,11 @@ def set_botocore_response_metadata_tags(
return
response_meta = result["ResponseMetadata"]

if config.botocore["payload_tagging_response"] and span.get_tag("aws_service") in config.botocore.get(
"payload_tagging_services"
):
_PAYLOAD_TAGGER.expand_payload_as_tags(span, response_meta, "aws.response.body")

if "HTTPStatusCode" in response_meta:
status_code = response_meta["HTTPStatusCode"]
span.set_tag(http.STATUS_CODE, status_code)
Expand Down
11 changes: 11 additions & 0 deletions ddtrace/contrib/internal/botocore/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ def _load_dynamodb_primary_key_names_for_tables() -> Dict[str, Set[str]]:
"empty_poll_enabled": asbool(os.getenv("DD_BOTOCORE_EMPTY_POLL_ENABLED", default=True)),
"dynamodb_primary_key_names_for_tables": _load_dynamodb_primary_key_names_for_tables(),
"add_span_pointers": asbool(os.getenv("DD_BOTOCORE_ADD_SPAN_POINTERS", default=True)),
"payload_tagging_request": os.getenv("DD_TRACE_CLOUD_REQUEST_PAYLOAD_TAGGING", default=None),
"payload_tagging_response": os.getenv("DD_TRACE_CLOUD_RESPONSE_PAYLOAD_TAGGING", default=None),
"payload_tagging_max_depth": int(
os.getenv("DD_TRACE_CLOUD_PAYLOAD_TAGGING_MAX_DEPTH", 10)
), # RFC defined 10 levels (1.2.3.4...10) as max tagging depth
"payload_tagging_max_tags": int(
os.getenv("DD_TRACE_CLOUD_PAYLOAD_TAGGING_MAX_TAGS", 758)
), # RFC defined default limit - spans are limited past 1000
"payload_tagging_services": set(
os.getenv("DD_TRACE_CLOUD_PAYLOAD_TAGGING_SERVICES", default={"s3", "sns", "sqs", "kinesis", "eventbridge"})
),
},
)

Expand Down
27 changes: 27 additions & 0 deletions ddtrace/vendor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,33 @@
Notes:
- We only vendor the packaging.version sub-module as this is all we currently
need.
ply
---------
Source: https://github.com/dabeaz/ply
Version: 3.11
License: BSD-3-Clause
Notes:
- jsonpath-ng dependency
Did a "pip install jsonpath-ng"
Then went and looked at the contents of the ply packages
yacc.py and lex.py files here.
Didn't copy: cpp.py, ctokens.py, ygen.py (didn't see them used)
jsonpath-ng
---------
Source: https://github.com/h2non/jsonpath-ng
Version: 1.6.1
License: Apache License 2.0
Notes:
- Copied ply into vendors as well.
Changed "-" to "_" as was causing errors when importing.
"""

# Initialize `ddtrace.vendor.datadog.base.log` logger with our custom rate limited logger
Expand Down
6 changes: 6 additions & 0 deletions ddtrace/vendor/jsonpath_ng/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .jsonpath import * # noqa
from .parser import parse # noqa


# Current package version
__version__ = '1.6.1'
10 changes: 10 additions & 0 deletions ddtrace/vendor/jsonpath_ng/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class JSONPathError(Exception):
pass


class JsonPathLexerError(JSONPathError):
pass


class JsonPathParserError(JSONPathError):
pass
Loading

0 comments on commit cfffb02

Please sign in to comment.