Skip to content

Commit

Permalink
Batch traces before making API calls (#291)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhinsch authored Jul 7, 2020
1 parent 404f879 commit 5104ce3
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 85 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lambdachecks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ jobs:
- name: Run trace forwarder integration tests
run: |
./aws/logs_monitoring/trace_forwarder/scripts/run_tests.sh
- name: Run enhanced metric unittest
- name: Run unit tests
env:
AWS_DEFAULT_REGION: us-east-1
run: |
pip install boto3
pip install boto3 mock
python -m unittest discover ./aws/logs_monitoring/
136 changes: 74 additions & 62 deletions aws/logs_monitoring/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import gzip
import json
import os
from collections import defaultdict

import boto3
import itertools
Expand All @@ -23,6 +24,11 @@
from datadog_lambda.metric import lambda_stats
from datadog import api
from trace_forwarder.connection import TraceConnection
from enhanced_lambda_metrics import (
get_enriched_lambda_log_tags,
parse_and_submit_enhanced_metrics,
)


log = logging.getLogger()
log.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
Expand All @@ -42,23 +48,6 @@
# of requests is removed in botocore 1.13.x.
from botocore.vendored import requests

try:
from enhanced_lambda_metrics import (
get_enriched_lambda_log_tags,
parse_and_submit_enhanced_metrics,
)

IS_ENHANCED_METRICS_FILE_PRESENT = True
except ImportError:
IS_ENHANCED_METRICS_FILE_PRESENT = False
log.warn(
"Could not import from enhanced_lambda_metrics so enhanced metrics "
"will not be submitted. Ensure you've included the enhanced_lambda_metrics "
"file in your Lambda project."
)
finally:
log.debug(f"IS_ENHANCED_METRICS_FILE_PRESENT: {IS_ENHANCED_METRICS_FILE_PRESENT}")


def get_env_var(envvar, default, boolean=False):
"""
Expand Down Expand Up @@ -474,39 +463,39 @@ def __exit__(self, ex_type, ex_value, traceback):


class DatadogBatcher(object):
def __init__(self, max_log_size_bytes, max_size_bytes, max_size_count):
self._max_log_size_bytes = max_log_size_bytes
self._max_size_bytes = max_size_bytes
self._max_size_count = max_size_count
def __init__(self, max_item_size_bytes, max_batch_size_bytes, max_items_count):
self._max_item_size_bytes = max_item_size_bytes
self._max_batch_size_bytes = max_batch_size_bytes
self._max_items_count = max_items_count

def _sizeof_bytes(self, log):
return len(log.encode("UTF-8"))
def _sizeof_bytes(self, item):
return len(str(item).encode("UTF-8"))

def batch(self, logs):
def batch(self, items):
"""
Returns an array of batches.
Each batch contains at most max_size_count logs and
is not strictly greater than max_size_bytes.
All logs strictly greater than max_log_size_bytes are dropped.
Each batch contains at most max_items_count items and
is not strictly greater than max_batch_size_bytes.
All items strictly greater than max_item_size_bytes are dropped.
"""
batches = []
batch = []
size_bytes = 0
size_count = 0
for log in logs:
log_size_bytes = self._sizeof_bytes(log)
for item in items:
item_size_bytes = self._sizeof_bytes(item)
if size_count > 0 and (
size_count >= self._max_size_count
or size_bytes + log_size_bytes > self._max_size_bytes
size_count >= self._max_items_count
or size_bytes + item_size_bytes > self._max_batch_size_bytes
):
batches.append(batch)
batch = []
size_bytes = 0
size_count = 0
# all logs exceeding max_log_size_bytes are dropped here
if log_size_bytes <= self._max_log_size_bytes:
batch.append(log)
size_bytes += log_size_bytes
# all items exceeding max_item_size_bytes are dropped here
if item_size_bytes <= self._max_item_size_bytes:
batch.append(item)
size_bytes += item_size_bytes
size_count += 1
if size_count > 0:
batches.append(batch)
Expand Down Expand Up @@ -556,18 +545,18 @@ def datadog_forwarder(event, context):
if log.isEnabledFor(logging.DEBUG):
log.debug(f"Received Event:{json.dumps(event)}")

metrics, logs, traces = split(enrich(parse(event, context)))
metrics, logs, trace_payloads = split(enrich(parse(event, context)))

if DD_FORWARD_LOG:
forward_logs(filter_logs(map(json.dumps, logs)))

if DD_FORWARD_METRIC:
forward_metrics(metrics)

if DD_FORWARD_TRACES and len(traces) > 0:
forward_traces(traces)
if DD_FORWARD_TRACES and len(trace_payloads) > 0:
forward_traces(trace_payloads)

if IS_ENHANCED_METRICS_FILE_PRESENT and DD_FORWARD_METRIC:
if DD_FORWARD_METRIC:
parse_and_submit_enhanced_metrics(logs)


Expand Down Expand Up @@ -667,18 +656,17 @@ def add_metadata_to_lambda_log(event):
tags = ["functionname:{}".format(function_name)]

# Add any enhanced tags from metadata
if IS_ENHANCED_METRICS_FILE_PRESENT:
custom_lambda_tags = get_enriched_lambda_log_tags(event)
custom_lambda_tags = get_enriched_lambda_log_tags(event)

# Check if one of the Lambda's custom tags is env
# If an env tag exists, remove the env:none placeholder
custom_env_tag = next(
(tag for tag in custom_lambda_tags if tag.startswith("env:")), None
)
if custom_env_tag is not None:
event[DD_CUSTOM_TAGS] = event[DD_CUSTOM_TAGS].replace("env:none", "")
# Check if one of the Lambda's custom tags is env
# If an env tag exists, remove the env:none placeholder
custom_env_tag = next(
(tag for tag in custom_lambda_tags if tag.startswith("env:")), None
)
if custom_env_tag is not None:
event[DD_CUSTOM_TAGS] = event[DD_CUSTOM_TAGS].replace("env:none", "")

tags += custom_lambda_tags
tags += custom_lambda_tags

# Dedup tags, so we don't end up with functionname twice
tags = list(set(tags))
Expand Down Expand Up @@ -716,8 +704,8 @@ def generate_metadata(context):
return metadata


def extract_trace(event):
"""Extract traces from an event if possible"""
def extract_trace_payload(event):
"""Extract trace payload from an event if possible"""
try:
message = event["message"]
obj = json.loads(event["message"])
Expand Down Expand Up @@ -745,19 +733,19 @@ def extract_metric(event):


def split(events):
"""Split events into metrics, logs, and traces
"""Split events into metrics, logs, and trace payloads
"""
metrics, logs, traces = [], [], []
metrics, logs, trace_payloads = [], [], []
for event in events:
metric = extract_metric(event)
trace = extract_trace(event)
trace_payload = extract_trace_payload(event)
if metric and DD_FORWARD_METRIC:
metrics.append(metric)
elif trace:
traces.append(trace)
elif trace_payload:
trace_payloads.append(trace_payload)
else:
logs.append(event)
return metrics, logs, traces
return metrics, logs, trace_payloads


# should only be called when INCLUDE_AT_MATCH and/or EXCLUDE_AT_MATCH exist
Expand Down Expand Up @@ -805,15 +793,39 @@ def forward_metrics(metrics):
log.debug(f"Forwarded metric: {json.dumps(metric)}")


def forward_traces(traces):
for trace in traces:
def forward_traces(trace_payloads):
batched_payloads = batch_trace_payloads(trace_payloads)

for payload in batched_payloads:
try:
trace_connection.send_trace(trace["message"], trace["tags"])
trace_connection.send_traces(payload["message"], payload["tags"])
except Exception:
log.exception(f"Exception while forwarding trace {json.dumps(trace)}")
log.exception(f"Exception while forwarding traces {json.dumps(payload)}")
else:
if log.isEnabledFor(logging.DEBUG):
log.debug(f"Forwarded trace: {json.dumps(trace)}")
log.debug(f"Forwarded traces: {json.dumps(payload)}")


def batch_trace_payloads(trace_payloads):
"""
To reduce the number of API calls, batch traces that have the same tags
"""
traces_grouped_by_tags = defaultdict(list)
for trace_payload in trace_payloads:
tags = trace_payload["tags"]
traces = json.loads(trace_payload["message"])["traces"]
traces_grouped_by_tags[tags] += traces

batched_trace_payloads = []
batcher = DatadogBatcher(256 * 1000, 2 * 1000 * 1000, 200)
for tags, traces in traces_grouped_by_tags.items():
batches = batcher.batch(traces)
for batch in batches:
batched_trace_payloads.append(
{"tags": tags, "message": json.dumps({"traces": batch})}
)

return batched_trace_payloads


# Utility functions
Expand Down
49 changes: 49 additions & 0 deletions aws/logs_monitoring/tests/test_lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from mock import MagicMock, patch
import os
import sys
import unittest

sys.modules["datadog_lambda.wrapper"] = MagicMock()
sys.modules["datadog_lambda.metric"] = MagicMock()
sys.modules["datadog"] = MagicMock()
sys.modules["requests"] = MagicMock()

env_patch = patch.dict(os.environ, {"DD_API_KEY": "11111111111111111111111111111111"})
env_patch.start()
from lambda_function import batch_trace_payloads

env_patch.stop()


class TestBatchTracePayloads(unittest.TestCase):
def test_batch_trace_payloads(self):
trace_payloads = [
{"tags": "tag1:value", "message": '{"traces":[[{"trace_id":"1"}]]}\n',},
{
"tags": "tag1:value",
"message": '{"traces":[[{"trace_id":"2"}, {"trace_id":"3"}]]}\n',
},
{
"tags": "tag2:value",
"message": '{"traces":[[{"trace_id":"4"}], [{"trace_id":"5"}]]}\n',
},
]

batched_payloads = batch_trace_payloads(trace_payloads)

expected_batched_payloads = [
{
"tags": "tag1:value",
"message": '{"traces": [[{"trace_id": "1"}], [{"trace_id": "2"}, {"trace_id": "3"}]]}',
},
{
"tags": "tag2:value",
"message": '{"traces": [[{"trace_id": "4"}], [{"trace_id": "5"}]]}',
},
]

self.assertEqual(batched_payloads, expected_batched_payloads)


if __name__ == "__main__":
unittest.main()
52 changes: 33 additions & 19 deletions aws/logs_monitoring/trace_forwarder/cmd/trace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring/trace_forwarder/internal/apm"
)
import "github.com/DataDog/datadog-agent/pkg/trace/obfuscate"
import (
"github.com/DataDog/datadog-agent/pkg/trace/obfuscate"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
)

var (
obfuscator *obfuscate.Obfuscator
Expand Down Expand Up @@ -45,35 +48,46 @@ func Configure(rootURL, apiKey string) {
edgeConnection = apm.CreateTraceEdgeConnection(localRootURL, localAPIKey)
}

// ForwardTrace will perform filtering and log forwarding to the trace intake
// ForwardTraces will perform filtering and log forwarding to the trace intake
// returns 0 on success, 1 on error
//export ForwardTrace
func ForwardTrace(content string, tags string) int {
//export ForwardTraces
func ForwardTraces(content string, tags string) int {
tracePayloads, err := apm.ProcessTrace(content, obfuscator, tags)
if err != nil {
fmt.Printf("Couldn't forward trace: %v", err)
fmt.Printf("Couldn't forward traces: %v", err)
return 1
}
hadErr := false

for _, tracePayload := range tracePayloads {
combinedPayload := combinePayloads(tracePayloads)

err = edgeConnection.SendTraces(context.Background(), tracePayload, 3)
if err != nil {
fmt.Printf("Failed to send traces with error %v\n", err)
hadErr = true
}
stats := apm.ComputeAPMStats(tracePayload)
err = edgeConnection.SendStats(context.Background(), stats, 3)
if err != nil {
fmt.Printf("Failed to send trace stats with error %v\n", err)
hadErr = true
}
err = edgeConnection.SendTraces(context.Background(), combinedPayload, 3)
if err != nil {
fmt.Printf("Failed to send traces with error %v\n", err)
return 1
}
if hadErr {

stats := apm.ComputeAPMStats(combinedPayload)
err = edgeConnection.SendStats(context.Background(), stats, 3)
if err != nil {
fmt.Printf("Failed to send trace stats with error %v\n", err)
return 1
}

return 0
}

// Combine payloads into one
// Assumes that all payloads have the same HostName and Env
func combinePayloads(tracePayloads []*pb.TracePayload) *pb.TracePayload {
combinedPayload := &pb.TracePayload{
HostName: tracePayloads[0].HostName,
Env: tracePayloads[0].Env,
Traces: make([]*pb.APITrace, 0),
}
for _, tracePayload := range tracePayloads {
combinedPayload.Traces = append(combinedPayload.Traces, tracePayload.Traces...)
}
return combinedPayload
}

func main() {}
5 changes: 3 additions & 2 deletions aws/logs_monitoring/trace_forwarder/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ def __init__(self, root_url, api_key):
self.lib = cdll.LoadLibrary("{}/bin/trace-intake.so".format(dir))
self.lib.Configure(make_go_string(root_url), make_go_string(api_key))

def send_trace(self, trace_str, tags=""):
def send_traces(self, traces_str, tags=""):
had_error = (
self.lib.ForwardTrace(make_go_string(trace_str), make_go_string(tags)) != 0
self.lib.ForwardTraces(make_go_string(traces_str), make_go_string(tags))
!= 0
)
if had_error:
raise Exception("Failed to send to trace intake")

0 comments on commit 5104ce3

Please sign in to comment.