Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch traces by env instead of tags #296

Merged
merged 5 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lambdachecks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Run integration tests
run: |
./aws/logs_monitoring/tools/integration_test.sh
- name: Run trace forwarder integration tests
- name: Run trace forwarder tests
run: |
./aws/logs_monitoring/trace_forwarder/scripts/run_tests.sh
- name: Run unit tests
Expand Down
39 changes: 7 additions & 32 deletions aws/logs_monitoring/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,38 +794,13 @@ def forward_metrics(metrics):


def forward_traces(trace_payloads):
batched_payloads = batch_trace_payloads(trace_payloads)

for payload in batched_payloads:
try:
trace_connection.send_traces(payload["message"], payload["tags"])
except Exception:
log.exception(f"Exception while forwarding traces {json.dumps(payload)}")
else:
if log.isEnabledFor(logging.DEBUG):
log.debug(f"Forwarded traces: {json.dumps(payload)}")


def batch_trace_payloads(trace_payloads):
"""
To reduce the number of API calls, batch traces that have the same tags
"""
traces_grouped_by_tags = defaultdict(list)
for trace_payload in trace_payloads:
tags = trace_payload["tags"]
traces = json.loads(trace_payload["message"])["traces"]
traces_grouped_by_tags[tags] += traces

batched_trace_payloads = []
batcher = DatadogBatcher(256 * 1000, 2 * 1000 * 1000, 200)
for tags, traces in traces_grouped_by_tags.items():
batches = batcher.batch(traces)
for batch in batches:
batched_trace_payloads.append(
{"tags": tags, "message": json.dumps({"traces": batch})}
)

return batched_trace_payloads
try:
trace_connection.send_traces(trace_payloads)
except Exception:
log.exception(f"Exception while forwarding traces {json.dumps(trace_payloads)}")
else:
if log.isEnabledFor(logging.DEBUG):
log.debug(f"Forwarded traces: {json.dumps(trace_payloads)}")


# Utility functions
Expand Down
49 changes: 0 additions & 49 deletions aws/logs_monitoring/tests/test_lambda_function.py

This file was deleted.

2 changes: 1 addition & 1 deletion aws/logs_monitoring/trace_forwarder/Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
target:
go build -o bin/trace-intake.so -buildmode=c-shared cmd/trace/main.go
go build -o bin/trace-intake.so -gcflags="-e" -buildmode=c-shared cmd/trace/main.go
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Show all compilation errors, not just the first ten.

104 changes: 83 additions & 21 deletions aws/logs_monitoring/trace_forwarder/cmd/trace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ package main
import (
"C"
"context"
"encoding/json"
"errors"
"fmt"

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring/trace_forwarder/internal/apm"
)
import (

"github.com/DataDog/datadog-agent/pkg/trace/obfuscate"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
)
Expand All @@ -24,6 +25,13 @@ var (
edgeConnection apm.TraceEdgeConnection
)

type (
RawTracePayload struct {
Message string `json:"message"`
Tags string `json:"tags"`
}
)

// Configure will set up the bindings
//export Configure
func Configure(rootURL, apiKey string) {
Expand All @@ -48,46 +56,100 @@ func Configure(rootURL, apiKey string) {
edgeConnection = apm.CreateTraceEdgeConnection(localRootURL, localAPIKey)
}

// ForwardTraces will perform filtering and log forwarding to the trace intake
// returns 0 on success, 1 on error
//export ForwardTraces
func ForwardTraces(content string, tags string) int {
tracePayloads, err := apm.ProcessTrace(content, obfuscator, tags)
func ForwardTraces(serializedTraces string) int {
rawTracePayloads, err := unmarshalSerializedTraces(serializedTraces)
if err != nil {
fmt.Printf("Couldn't forward traces: %v", err)
return 1
}

combinedPayload := combinePayloads(tracePayloads)

err = edgeConnection.SendTraces(context.Background(), combinedPayload, 3)
processedTracePayloads, err := processRawTracePayloads(rawTracePayloads)
if err != nil {
fmt.Printf("Failed to send traces with error %v\n", err)
fmt.Printf("Couldn't forward traces: %v", err)
return 1
}

stats := apm.ComputeAPMStats(combinedPayload)
err = edgeConnection.SendStats(context.Background(), stats, 3)
aggregatedTracePayloads := aggregateTracePayloadsByEnv(processedTracePayloads)

err = sendTracesToIntake(aggregatedTracePayloads)
if err != nil {
fmt.Printf("Failed to send trace stats with error %v\n", err)
fmt.Printf("Couldn't forward traces: %v", err)
return 1
}

return 0
}

// Combine payloads into one
// Assumes that all payloads have the same HostName and Env
func combinePayloads(tracePayloads []*pb.TracePayload) *pb.TracePayload {
combinedPayload := &pb.TracePayload{
HostName: tracePayloads[0].HostName,
Env: tracePayloads[0].Env,
Traces: make([]*pb.APITrace, 0),
func unmarshalSerializedTraces(serializedTraces string) ([]RawTracePayload, error) {
var rawTracePayloads []RawTracePayload
err := json.Unmarshal([]byte(serializedTraces), &rawTracePayloads)

if err != nil {
return rawTracePayloads, fmt.Errorf("Couldn't unmarshal serialized traces, %v", err)
}

return rawTracePayloads, nil
}

func processRawTracePayloads(rawTracePayloads []RawTracePayload) ([]*pb.TracePayload, error) {
var processedTracePayloads []*pb.TracePayload
for _, rawTracePayload := range rawTracePayloads {
traceList, err := apm.ProcessTrace(rawTracePayload.Message, obfuscator, rawTracePayload.Tags)
if err != nil {
return processedTracePayloads, err
}
processedTracePayloads = append(processedTracePayloads, traceList...)
}
return processedTracePayloads, nil
}

func aggregateTracePayloadsByEnv(tracePayloads []*pb.TracePayload) []*pb.TracePayload {
lookup := make(map[string]*pb.TracePayload)
for _, tracePayload := range tracePayloads {
combinedPayload.Traces = append(combinedPayload.Traces, tracePayload.Traces...)
key := fmt.Sprintf("%s|%s", tracePayload.HostName, tracePayload.Env)
var existingPayload *pb.TracePayload
if val, ok := lookup[key]; ok {
existingPayload = val
} else {
existingPayload = &pb.TracePayload{
HostName: tracePayload.HostName,
Env: tracePayload.Env,
Traces: make([]*pb.APITrace, 0),
}
lookup[key] = existingPayload
}
existingPayload.Traces = append(existingPayload.Traces, tracePayload.Traces...)
}

newPayloads := make([]*pb.TracePayload, 0)

for _, tracePayload := range lookup {
newPayloads = append(newPayloads, tracePayload)
}
return newPayloads
}

func sendTracesToIntake(tracePayloads []*pb.TracePayload) error {
hadErr := false
for _, tracePayload := range tracePayloads {
err := edgeConnection.SendTraces(context.Background(), tracePayload, 3)
if err != nil {
fmt.Printf("Failed to send traces with error %v\n", err)
hadErr = true
}
stats := apm.ComputeAPMStats(tracePayload)
err = edgeConnection.SendStats(context.Background(), stats, 3)
if err != nil {
fmt.Printf("Failed to send trace stats with error %v\n", err)
hadErr = true
}
}
if hadErr {
return errors.New("Failed to send traces or stats to intake")
}
return combinedPayload
return nil
}

func main() {}
50 changes: 50 additions & 0 deletions aws/logs_monitoring/trace_forwarder/cmd/trace/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Unless explicitly stated otherwise all files in this repository are licensed
* under the Apache License Version 2.0.
*
* This product includes software developed at Datadog (https://www.datadoghq.com/).
* Copyright 2020 Datadog, Inc.
*/
package main

import (
"testing"

"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/stretchr/testify/assert"
)

func TestUnmarshalSerializedTraces(t *testing.T) {
input := "[{\"message\":\"traces\",\"tags\":\"tag1:value\"},{\"message\":\"traces\",\"tags\":\"tag1:value\"}]"

output, _ := unmarshalSerializedTraces(input)

assert.Equal(t, output[0].Message, "traces")
assert.Equal(t, output[0].Tags, "tag1:value")
assert.Equal(t, output[1].Message, "traces")
assert.Equal(t, output[1].Tags, "tag1:value")
}

func TestAggregateTracePayloadsByEnv(t *testing.T) {
payload1 := pb.TracePayload{
HostName: "Host",
Env: "Env1",
Traces: make([]*pb.APITrace, 0),
}

payload2 := pb.TracePayload{
HostName: "Host",
Env: "Env1",
Traces: make([]*pb.APITrace, 0),
}

payload3 := pb.TracePayload{
HostName: "Host",
Env: "Env2",
Traces: make([]*pb.APITrace, 0),
}

input := []*pb.TracePayload{&payload1, &payload2, &payload3}
output := aggregateTracePayloadsByEnv(input)
assert.Equal(t, len(output), 2)
}
6 changes: 3 additions & 3 deletions aws/logs_monitoring/trace_forwarder/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ def __init__(self, root_url, api_key):
self.lib = cdll.LoadLibrary("{}/bin/trace-intake.so".format(dir))
self.lib.Configure(make_go_string(root_url), make_go_string(api_key))

def send_traces(self, traces_str, tags=""):
def send_traces(self, trace_payloads):
serialized_trace_paylods = json.dumps(trace_payloads)
had_error = (
self.lib.ForwardTraces(make_go_string(traces_str), make_go_string(tags))
!= 0
self.lib.ForwardTraces(make_go_string(serialized_trace_paylods)) != 0
)
if had_error:
raise Exception("Failed to send to trace intake")
2 changes: 1 addition & 1 deletion aws/logs_monitoring/trace_forwarder/internal/apm/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

type (
// TraceList is an incomming trace payload
// TraceList is an incoming trace payload
traceList struct {
Traces [][]span `json:"traces"`
}
Expand Down
2 changes: 1 addition & 1 deletion aws/logs_monitoring/trace_forwarder/scripts/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ set -e
cd $(dirname "$0")/..

docker build -t datadog-go-layer . --build-arg runtime=python:3.7
docker run datadog-go-layer go test ./...
docker run datadog-go-layer go test -v ./...
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enable verbose mode, so we can see log output.