Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

David.kim/updated lambda forwarder #327

Merged
merged 21 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9c2341c
Add first pass of telemetry to log forwarder
Chronobreak Aug 17, 2020
4850aec
Updated lambda forwarder with telemetry
Chronobreak Aug 18, 2020
88bb75b
Merge branch 'master' into david.kim/updated-lambda-forwarder
Chronobreak Aug 18, 2020
4b2f901
Update lambda_function.py
Chronobreak Aug 18, 2020
550c7d1
Update lambda_function.py
Chronobreak Aug 18, 2020
7d73d1c
Linted with black
Chronobreak Aug 18, 2020
888d0a3
Merge branch 'david.kim/updated-lambda-forwarder' of github.com:DataD…
Chronobreak Aug 18, 2020
18ad05e
Refactored telemetry to fire from forward_* functions and removed cou…
Chronobreak Aug 18, 2020
9629afa
Ran black linter
Chronobreak Aug 18, 2020
59b9f04
Fix local/global variable issue
Chronobreak Aug 18, 2020
8e74aa8
Fix local/global variable issue and run black linter
Chronobreak Aug 18, 2020
f4aa21b
Refactored to use set_forwarder_telemetry_tags and add telemetry tags…
Chronobreak Aug 20, 2020
0b59915
Merge branch 'master' into david.kim/updated-lambda-forwarder
Chronobreak Aug 20, 2020
1981f03
Merge branch 'master' into david.kim/updated-lambda-forwarder
Chronobreak Aug 21, 2020
d843a9f
Finalize log forwarder telemetry and update integration test snapshots
Chronobreak Aug 21, 2020
58f4757
Run black linter on lambda_function and enhanced_lambda_metrics
Chronobreak Aug 21, 2020
a221d4e
Address final comments and update integration tests
Chronobreak Aug 24, 2020
3ef99ed
Address final comments and update snapshots
Chronobreak Aug 24, 2020
4d0e92f
Add final tweaks
Chronobreak Aug 24, 2020
5fa05a3
Update error handling for event_type and update snapshots
Chronobreak Aug 24, 2020
b63ab4e
Add error handling to event_type and remove duplicate tags in ehanced…
Chronobreak Aug 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions aws/logs_monitoring/enhanced_lambda_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,17 @@ def parse_get_resources_response_for_tags_by_arn(get_resources_page):
return tags_by_arn


def get_forwarder_telemetry_prefix_and_tags():
"""Retrieves prefix and tags used when submitting telemetry metrics
Used to overcome circular import"""
from lambda_function import (
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX,
DD_FORWARDER_TELEMETRY_TAGS,
)

return DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, DD_FORWARDER_TELEMETRY_TAGS


def build_tags_by_arn_cache():
"""Makes API calls to GetResources to get the live tags of the account's Lambda functions

Expand All @@ -307,14 +318,14 @@ def build_tags_by_arn_cache():
"""
tags_by_arn_cache = {}
get_resources_paginator = resource_tagging_client.get_paginator("get_resources")
prefix, tags = get_forwarder_telemetry_prefix_and_tags()

try:
for page in get_resources_paginator.paginate(
ResourceTypeFilters=[GET_RESOURCES_LAMBDA_FILTER], ResourcesPerPage=100
):
lambda_stats.distribution(
"{}.get_resources_api_calls".format(ENHANCED_METRICS_NAMESPACE_PREFIX),
1,
"{}.get_resources_api_calls".format(prefix), 1, tags=tags,
)
page_tags_by_arn = parse_get_resources_response_for_tags_by_arn(page)
tags_by_arn_cache.update(page_tags_by_arn)
Expand Down
57 changes: 55 additions & 2 deletions aws/logs_monitoring/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ def compileRegex(rule, pattern):
]


# Used to build and pass aws.dd_forwarder.* telemetry tags
DD_FORWARDER_TELEMETRY_TAGS = []
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX = "aws.dd_forwarder"


class RetriableException(Exception):
pass

Expand Down Expand Up @@ -389,7 +394,7 @@ def datadog_forwarder(event, context):
metrics, logs, trace_payloads = split(enrich(parse(event, context)))

if DD_FORWARD_LOG:
forward_logs(filter_logs(map(json.dumps, logs)))
tianchu marked this conversation as resolved.
Show resolved Hide resolved
forward_logs(logs)

forward_metrics(metrics)

Expand All @@ -404,6 +409,7 @@ def datadog_forwarder(event, context):

def forward_logs(logs):
"""Forward logs to Datadog"""
logs_to_forward = filter_logs(list(map(json.dumps, logs)))
scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS)
if DD_USE_TCP:
batcher = DatadogBatcher(256 * 1000, 256 * 1000, 1)
Expand All @@ -415,7 +421,7 @@ def forward_logs(logs):
)

with DatadogClient(cli) as client:
for batch in batcher.batch(logs):
for batch in batcher.batch(logs_to_forward):
try:
client.send(batch)
except Exception:
Expand All @@ -424,10 +430,17 @@ def forward_logs(logs):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarded log batch: {json.dumps(batch)}")

lambda_stats.distribution(
"{}.logs_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
len(logs_to_forward),
tags=DD_FORWARDER_TELEMETRY_TAGS,
)


def parse(event, context):
"""Parse Lambda input to normalized events"""
metadata = generate_metadata(context)
event_type = "unknown"
tianchu marked this conversation as resolved.
Show resolved Hide resolved
try:
# Route to the corresponding parser
event_type = parse_event_type(event)
Expand All @@ -448,9 +461,25 @@ def parse(event, context):
)
events = [err_message]

set_forwarder_telemetry_tags(context, event_type)

return normalize_events(events, metadata)


def set_forwarder_telemetry_tags(context, event_type):
"""Helper function to set tags on telemetry metrics
Do not submit telemetry metrics before this helper function is invoked
"""
global DD_FORWARDER_TELEMETRY_TAGS

DD_FORWARDER_TELEMETRY_TAGS = [
f"forwardername:{context.function_name.lower()}",
f"forwarder_memorysize:{context.memory_limit_in_mb}",
f"forwarder_version:{DD_FORWARDER_VERSION}",
f"event_type:{event_type}",
]


def enrich(events):
"""Adds event-specific tags and attributes to each event

Expand Down Expand Up @@ -572,6 +601,7 @@ def generate_metadata(context):
"forwarder_memorysize": context.memory_limit_in_mb,
"forwarder_version": DD_FORWARDER_VERSION,
}

metadata[DD_CUSTOM_TAGS] = ",".join(
filter(
None,
Expand Down Expand Up @@ -675,6 +705,12 @@ def forward_metrics(metrics):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarded metric: {json.dumps(metric)}")

lambda_stats.distribution(
"{}.metrics_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
len(metrics),
tags=DD_FORWARDER_TELEMETRY_TAGS,
)


def forward_traces(trace_payloads):
try:
Expand All @@ -687,20 +723,37 @@ def forward_traces(trace_payloads):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarded traces: {json.dumps(trace_payloads)}")

lambda_stats.distribution(
"{}.traces_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
len(trace_payloads),
tags=DD_FORWARDER_TELEMETRY_TAGS,
)


# Utility functions


def normalize_events(events, metadata):
normalized = []
events_counter = 0

for event in events:
events_counter += 1
if isinstance(event, dict):
normalized.append(merge_dicts(event, metadata))
elif isinstance(event, str):
normalized.append(merge_dicts({"message": event}, metadata))
else:
# drop this log
continue

"""Submit count of total events"""
lambda_stats.distribution(
"{}.incoming_events".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
events_counter,
tags=DD_FORWARDER_TELEMETRY_TAGS,
)

return normalized


Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,57 @@
"host": "testLogGroup"
}
]
},
{
"path": "/api/v1/distribution_points?api_key=abcdefghijklmnopqrstuvwxyz012345",
"verb": "POST",
"content-type": "application/json",
"data": {
"series": [
{
"metric": "aws.dd_forwarder.incoming_events",
"points": "<redacted from snapshot>",
"type": "distribution",
"host": null,
"device": null,
"tags": [
"forwardername:test",
"forwarder_memorysize:1536",
"forwarder_version:<redacted from snapshot>",
"event_type:awslogs"
],
"interval": 10
},
{
"metric": "aws.dd_forwarder.logs_forwarded",
"points": "<redacted from snapshot>",
"type": "distribution",
"host": null,
"device": null,
"tags": [
"forwardername:test",
"forwarder_memorysize:1536",
"forwarder_version:<redacted from snapshot>",
"event_type:awslogs"
],
"interval": 10
},
{
"metric": "aws.dd_forwarder.metrics_forwarded",
"points": "<redacted from snapshot>",
"type": "distribution",
"host": null,
"device": null,
"tags": [
"forwardername:test",
"forwarder_memorysize:1536",
"forwarder_version:<redacted from snapshot>",
"event_type:awslogs"
],
"interval": 10
}
]
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,57 @@
"host": "arn:aws:lambda:us-east-1:0:function:storms-cloudwatch-event"
}
]
},
{
"path": "/api/v1/distribution_points?api_key=abcdefghijklmnopqrstuvwxyz012345",
"verb": "POST",
"content-type": "application/json",
"data": {
"series": [
{
"metric": "aws.dd_forwarder.incoming_events",
"points": "<redacted from snapshot>",
"type": "distribution",
"host": null,
"device": null,
"tags": [
"forwardername:test",
"forwarder_memorysize:1536",
"forwarder_version:<redacted from snapshot>",
"event_type:awslogs"
],
"interval": 10
},
{
"metric": "aws.dd_forwarder.logs_forwarded",
"points": "<redacted from snapshot>",
"type": "distribution",
"host": null,
"device": null,
"tags": [
"forwardername:test",
"forwarder_memorysize:1536",
"forwarder_version:<redacted from snapshot>",
"event_type:awslogs"
],
"interval": 10
},
{
"metric": "aws.dd_forwarder.metrics_forwarded",
"points": "<redacted from snapshot>",
"type": "distribution",
"host": null,
"device": null,
"tags": [
"forwardername:test",
"forwarder_memorysize:1536",
"forwarder_version:<redacted from snapshot>",
"event_type:awslogs"
],
"interval": 10
}
]
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,57 @@
"host": "testLogGroup"
}
]
},
{
"path": "/api/v1/distribution_points?api_key=abcdefghijklmnopqrstuvwxyz012345",
"verb": "POST",
"content-type": "application/json",
"data": {
"series": [
{
"metric": "aws.dd_forwarder.incoming_events",
"points": "<redacted from snapshot>",
"type": "distribution",
"host": null,
"device": null,
"tags": [
"forwardername:test",
"forwarder_memorysize:1536",
"forwarder_version:<redacted from snapshot>",
"event_type:awslogs"
],
"interval": 10
},
{
"metric": "aws.dd_forwarder.logs_forwarded",
"points": "<redacted from snapshot>",
"type": "distribution",
"host": null,
"device": null,
"tags": [
"forwardername:test",
"forwarder_memorysize:1536",
"forwarder_version:<redacted from snapshot>",
"event_type:awslogs"
],
"interval": 10
},
{
"metric": "aws.dd_forwarder.metrics_forwarded",
"points": "<redacted from snapshot>",
"type": "distribution",
"host": null,
"device": null,
"tags": [
"forwardername:test",
"forwarder_memorysize:1536",
"forwarder_version:<redacted from snapshot>",
"event_type:awslogs"
],
"interval": 10
}
]
}
}
]
}
Loading