Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

David.kim/updated lambda forwarder #327

Merged
merged 21 commits into from
Aug 24, 2020
Merged
Changes from 5 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9c2341c
Add first pass of telemetry to log forwarder
Chronobreak Aug 17, 2020
4850aec
Updated lambda forwarder with telemetry
Chronobreak Aug 18, 2020
88bb75b
Merge branch 'master' into david.kim/updated-lambda-forwarder
Chronobreak Aug 18, 2020
4b2f901
Update lambda_function.py
Chronobreak Aug 18, 2020
550c7d1
Update lambda_function.py
Chronobreak Aug 18, 2020
7d73d1c
Linted with black
Chronobreak Aug 18, 2020
888d0a3
Merge branch 'david.kim/updated-lambda-forwarder' of github.com:DataD…
Chronobreak Aug 18, 2020
18ad05e
Refactored telemetry to fire from forward_* functions and removed cou…
Chronobreak Aug 18, 2020
9629afa
Ran black linter
Chronobreak Aug 18, 2020
59b9f04
Fix local/global variable issue
Chronobreak Aug 18, 2020
8e74aa8
Fix local/global variable issue and run black linter
Chronobreak Aug 18, 2020
f4aa21b
Refactored to use set_forwarder_telemetry_tags and add telemetry tags…
Chronobreak Aug 20, 2020
0b59915
Merge branch 'master' into david.kim/updated-lambda-forwarder
Chronobreak Aug 20, 2020
1981f03
Merge branch 'master' into david.kim/updated-lambda-forwarder
Chronobreak Aug 21, 2020
d843a9f
Finalize log forwarder telemetry and update integration test snapshots
Chronobreak Aug 21, 2020
58f4757
Run black linter on lambda_function and enhanced_lambda_metrics
Chronobreak Aug 21, 2020
a221d4e
Address final comments and update integration tests
Chronobreak Aug 24, 2020
3ef99ed
Address final comments and update snapshots
Chronobreak Aug 24, 2020
4d0e92f
Add final tweaks
Chronobreak Aug 24, 2020
5fa05a3
Update error handling for event_type and update snapshots
Chronobreak Aug 24, 2020
b63ab4e
Add error handling to event_type and remove duplicate tags in ehanced…
Chronobreak Aug 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions aws/logs_monitoring/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ def compileRegex(rule, pattern):
]


# Used to store aws.dd_forwarder.* metrics and tags
DD_FORWARDER_TELEMETRY = {
"logs_forwarded": 0,
"traces_forwarded": 0,
"metrics_forwarded": 0,
"tags": []
}


class RetriableException(Exception):
pass

Expand Down Expand Up @@ -390,14 +399,24 @@ def datadog_forwarder(event, context):

if DD_FORWARD_LOG:
forward_logs(filter_logs(map(json.dumps, logs)))
tianchu marked this conversation as resolved.
Show resolved Hide resolved
DD_FORWARDER_TELEMETRY["logs_forwarded"]+=len(list(filter_logs(map(json.dumps, logs))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good idea to duplicate filter_logs(map(... here. Can you move logic into forward_logs, where you have filtered logs (and actual count)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Tian - moved to forward_logs and removed duplication of the filter_logs call


forward_metrics(metrics)
DD_FORWARDER_TELEMETRY["metrics_forwarded"]+=len(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to have this logic happen inside the forward_metrics function, since it's a small detail which shouldn't /pollute the main (i.e., datadog_forwarder) function. I.e., when you read the main function, you can quickly see what this function does on the high level without being distracted by lots of "details".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored/cleaned up so the main function is clean


if len(trace_payloads) > 0:
DD_FORWARDER_TELEMETRY["traces_forwarded"]+=len(trace_payloads)
forward_traces(trace_payloads)

parse_and_submit_enhanced_metrics(logs)

send_dd_forwarder_telemetry()

# Reset DD_FORWARDER_TELEMETRY counters and tags after sending
DD_FORWARDER_TELEMETRY["logs_forwarded"] = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to keep a counter at all. Why not just submit the len(metrics) right away?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed counter construction and submitting telemetry metrics right away (but still need to build the tags list)

DD_FORWARDER_TELEMETRY["traces_forwarded"] = 0
DD_FORWARDER_TELEMETRY["metrics_forwarded"] = 0
DD_FORWARDER_TELEMETRY["tags"] = []

lambda_handler = datadog_lambda_wrapper(datadog_forwarder)

Expand Down Expand Up @@ -515,6 +534,10 @@ def add_metadata_to_lambda_log(event):
tags = list(set(tags))
tags.sort() # Keep order deterministic

# Set custom tags on DD_FORWARDER_TELEMETRY
DD_FORWARDER_TELEMETRY["tags"].extend(custom_lambda_tags)
DD_FORWARDER_TELEMETRY["tags"].extend(tags)

event[DD_CUSTOM_TAGS] = ",".join([event[DD_CUSTOM_TAGS]] + tags)


Expand All @@ -532,6 +555,11 @@ def generate_metadata(context):
"forwarder_memorysize": context.memory_limit_in_mb,
"forwarder_version": DD_FORWARDER_VERSION,
}

DD_FORWARDER_TELEMETRY["tags"].append("forwardername:"+dd_custom_tags_data["forwardername"])
DD_FORWARDER_TELEMETRY["tags"].append("forwarder_memorysize:"+dd_custom_tags_data["forwarder_memorysize"])
DD_FORWARDER_TELEMETRY["tags"].append("forwarder_version:"+DD_FORWARDER_VERSION)

metadata[DD_CUSTOM_TAGS] = ",".join(
filter(
None,
Expand Down Expand Up @@ -989,3 +1017,30 @@ def parse_service_arn(source, key, bucket, context):
region, accountID, clustername
)
return


# Send forwarder metrics with tags
def send_dd_forwarder_telemetry():
timestamp = time.time()
tags = list(set(DD_FORWARDER_TELEMETRY["tags"]))

lambda_stats.distribution(
"aws.dd_forwarder.logs_forwarded",
DD_FORWARDER_TELEMETRY["logs_forwarded"],
timestamp=timestamp,
tags=tags
)

lambda_stats.distribution(
"aws.dd_forwarder.traces_forwarded",
DD_FORWARDER_TELEMETRY["traces_forwarded"],
timestamp=timestamp,
tags=tags
)

lambda_stats.distribution(
"aws.dd_forwarder.metrics_forwarded",
DD_FORWARDER_TELEMETRY["metrics_forwarded"],
timestamp=timestamp,
tags=tags
)