Skip to content

Commit

Permalink
Batch traces by env instead of tags (#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhinsch authored Jul 10, 2020
1 parent 1825a0d commit 3abae6c
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 109 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lambdachecks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Run integration tests
run: |
./aws/logs_monitoring/tools/integration_test.sh
- name: Run trace forwarder integration tests
- name: Run trace forwarder tests
run: |
./aws/logs_monitoring/trace_forwarder/scripts/run_tests.sh
- name: Run unit tests
Expand Down
39 changes: 7 additions & 32 deletions aws/logs_monitoring/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,38 +794,13 @@ def forward_metrics(metrics):


def forward_traces(trace_payloads):
batched_payloads = batch_trace_payloads(trace_payloads)

for payload in batched_payloads:
try:
trace_connection.send_traces(payload["message"], payload["tags"])
except Exception:
log.exception(f"Exception while forwarding traces {json.dumps(payload)}")
else:
if log.isEnabledFor(logging.DEBUG):
log.debug(f"Forwarded traces: {json.dumps(payload)}")


def batch_trace_payloads(trace_payloads):
"""
To reduce the number of API calls, batch traces that have the same tags
"""
traces_grouped_by_tags = defaultdict(list)
for trace_payload in trace_payloads:
tags = trace_payload["tags"]
traces = json.loads(trace_payload["message"])["traces"]
traces_grouped_by_tags[tags] += traces

batched_trace_payloads = []
batcher = DatadogBatcher(256 * 1000, 2 * 1000 * 1000, 200)
for tags, traces in traces_grouped_by_tags.items():
batches = batcher.batch(traces)
for batch in batches:
batched_trace_payloads.append(
{"tags": tags, "message": json.dumps({"traces": batch})}
)

return batched_trace_payloads
try:
trace_connection.send_traces(trace_payloads)
except Exception:
log.exception(f"Exception while forwarding traces {json.dumps(trace_payloads)}")
else:
if log.isEnabledFor(logging.DEBUG):
log.debug(f"Forwarded traces: {json.dumps(trace_payloads)}")


# Utility functions
Expand Down
49 changes: 0 additions & 49 deletions aws/logs_monitoring/tests/test_lambda_function.py

This file was deleted.

2 changes: 1 addition & 1 deletion aws/logs_monitoring/trace_forwarder/Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
target:
go build -o bin/trace-intake.so -buildmode=c-shared cmd/trace/main.go
go build -o bin/trace-intake.so -gcflags="-e" -buildmode=c-shared cmd/trace/main.go
104 changes: 83 additions & 21 deletions aws/logs_monitoring/trace_forwarder/cmd/trace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ package main
import (
"C"
"context"
"encoding/json"
"errors"
"fmt"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring/trace_forwarder/internal/apm"
)
import (

"github.com/DataDog/datadog-agent/pkg/trace/obfuscate"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
)
Expand All @@ -24,6 +25,13 @@ var (
edgeConnection apm.TraceEdgeConnection
)

type (
RawTracePayload struct {
Message string `json:"message"`
Tags string `json:"tags"`
}
)

// Configure will set up the bindings
//export Configure
func Configure(rootURL, apiKey string) {
Expand All @@ -48,46 +56,100 @@ func Configure(rootURL, apiKey string) {
edgeConnection = apm.CreateTraceEdgeConnection(localRootURL, localAPIKey)
}

// ForwardTraces will perform filtering and log forwarding to the trace intake
// returns 0 on success, 1 on error
//export ForwardTraces
func ForwardTraces(content string, tags string) int {
tracePayloads, err := apm.ProcessTrace(content, obfuscator, tags)
func ForwardTraces(serializedTraces string) int {
rawTracePayloads, err := unmarshalSerializedTraces(serializedTraces)
if err != nil {
fmt.Printf("Couldn't forward traces: %v", err)
return 1
}

combinedPayload := combinePayloads(tracePayloads)

err = edgeConnection.SendTraces(context.Background(), combinedPayload, 3)
processedTracePayloads, err := processRawTracePayloads(rawTracePayloads)
if err != nil {
fmt.Printf("Failed to send traces with error %v\n", err)
fmt.Printf("Couldn't forward traces: %v", err)
return 1
}

stats := apm.ComputeAPMStats(combinedPayload)
err = edgeConnection.SendStats(context.Background(), stats, 3)
aggregatedTracePayloads := aggregateTracePayloadsByEnv(processedTracePayloads)

err = sendTracesToIntake(aggregatedTracePayloads)
if err != nil {
fmt.Printf("Failed to send trace stats with error %v\n", err)
fmt.Printf("Couldn't forward traces: %v", err)
return 1
}

return 0
}

// Combine payloads into one
// Assumes that all payloads have the same HostName and Env
func combinePayloads(tracePayloads []*pb.TracePayload) *pb.TracePayload {
combinedPayload := &pb.TracePayload{
HostName: tracePayloads[0].HostName,
Env: tracePayloads[0].Env,
Traces: make([]*pb.APITrace, 0),
func unmarshalSerializedTraces(serializedTraces string) ([]RawTracePayload, error) {
var rawTracePayloads []RawTracePayload
err := json.Unmarshal([]byte(serializedTraces), &rawTracePayloads)

if err != nil {
return rawTracePayloads, fmt.Errorf("Couldn't unmarshal serialized traces, %v", err)
}

return rawTracePayloads, nil
}

func processRawTracePayloads(rawTracePayloads []RawTracePayload) ([]*pb.TracePayload, error) {
var processedTracePayloads []*pb.TracePayload
for _, rawTracePayload := range rawTracePayloads {
traceList, err := apm.ProcessTrace(rawTracePayload.Message, obfuscator, rawTracePayload.Tags)
if err != nil {
return processedTracePayloads, err
}
processedTracePayloads = append(processedTracePayloads, traceList...)
}
return processedTracePayloads, nil
}

func aggregateTracePayloadsByEnv(tracePayloads []*pb.TracePayload) []*pb.TracePayload {
lookup := make(map[string]*pb.TracePayload)
for _, tracePayload := range tracePayloads {
combinedPayload.Traces = append(combinedPayload.Traces, tracePayload.Traces...)
key := fmt.Sprintf("%s|%s", tracePayload.HostName, tracePayload.Env)
var existingPayload *pb.TracePayload
if val, ok := lookup[key]; ok {
existingPayload = val
} else {
existingPayload = &pb.TracePayload{
HostName: tracePayload.HostName,
Env: tracePayload.Env,
Traces: make([]*pb.APITrace, 0),
}
lookup[key] = existingPayload
}
existingPayload.Traces = append(existingPayload.Traces, tracePayload.Traces...)
}

newPayloads := make([]*pb.TracePayload, 0)

for _, tracePayload := range lookup {
newPayloads = append(newPayloads, tracePayload)
}
return newPayloads
}

func sendTracesToIntake(tracePayloads []*pb.TracePayload) error {
hadErr := false
for _, tracePayload := range tracePayloads {
err := edgeConnection.SendTraces(context.Background(), tracePayload, 3)
if err != nil {
fmt.Printf("Failed to send traces with error %v\n", err)
hadErr = true
}
stats := apm.ComputeAPMStats(tracePayload)
err = edgeConnection.SendStats(context.Background(), stats, 3)
if err != nil {
fmt.Printf("Failed to send trace stats with error %v\n", err)
hadErr = true
}
}
if hadErr {
return errors.New("Failed to send traces or stats to intake")
}
return combinedPayload
return nil
}

func main() {}
50 changes: 50 additions & 0 deletions aws/logs_monitoring/trace_forwarder/cmd/trace/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed
* under the Apache License Version 2.0.
*
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2020 Datadog, Inc.
*/
package main

import (
"testing"

"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/stretchr/testify/assert"
)

func TestUnmarshalSerializedTraces(t *testing.T) {
input := "[{\"message\":\"traces\",\"tags\":\"tag1:value\"},{\"message\":\"traces\",\"tags\":\"tag1:value\"}]"

output, _ := unmarshalSerializedTraces(input)

assert.Equal(t, output[0].Message, "traces")
assert.Equal(t, output[0].Tags, "tag1:value")
assert.Equal(t, output[1].Message, "traces")
assert.Equal(t, output[1].Tags, "tag1:value")
}

func TestAggregateTracePayloadsByEnv(t *testing.T) {
payload1 := pb.TracePayload{
HostName: "Host",
Env: "Env1",
Traces: make([]*pb.APITrace, 0),
}

payload2 := pb.TracePayload{
HostName: "Host",
Env: "Env1",
Traces: make([]*pb.APITrace, 0),
}

payload3 := pb.TracePayload{
HostName: "Host",
Env: "Env2",
Traces: make([]*pb.APITrace, 0),
}

input := []*pb.TracePayload{&payload1, &payload2, &payload3}
output := aggregateTracePayloadsByEnv(input)
assert.Equal(t, len(output), 2)
}
6 changes: 3 additions & 3 deletions aws/logs_monitoring/trace_forwarder/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ def __init__(self, root_url, api_key):
self.lib = cdll.LoadLibrary("{}/bin/trace-intake.so".format(dir))
self.lib.Configure(make_go_string(root_url), make_go_string(api_key))

def send_traces(self, traces_str, tags=""):
def send_traces(self, trace_payloads):
serialized_trace_paylods = json.dumps(trace_payloads)
had_error = (
self.lib.ForwardTraces(make_go_string(traces_str), make_go_string(tags))
!= 0
self.lib.ForwardTraces(make_go_string(serialized_trace_paylods)) != 0
)
if had_error:
raise Exception("Failed to send to trace intake")
2 changes: 1 addition & 1 deletion aws/logs_monitoring/trace_forwarder/internal/apm/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

type (
// TraceList is an incomming trace payload
// TraceList is an incoming trace payload
traceList struct {
Traces [][]span `json:"traces"`
}
Expand Down
2 changes: 1 addition & 1 deletion aws/logs_monitoring/trace_forwarder/scripts/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ set -e
cd $(dirname "$0")/..

docker build -t datadog-go-layer . --build-arg runtime=python:3.7
docker run datadog-go-layer go test ./...
docker run datadog-go-layer go test -v ./...

0 comments on commit 3abae6c

Please sign in to comment.