Skip to content

Commit

Permalink
[SVLS-4142] Create a Lambda span on timeouts (#21481)
Browse files Browse the repository at this point in the history
* create a Lambda span on timeouts

* don't create a cold start span when the runtime restarts during timeouts

* fix linting

* fix test

* lint: rename name variables

* lint again

* small fixes

* refactor timeout span logic

* add mutexes

* fix span completed check

* revert refactor

* remove cold start span changes

* use mutex over rwmutex

* test routes

* add comment + update tests

* test endExecutionSpan

* add serverless.go test

* add test /hello for route

* only set span incomplete when /startInvocation has been hit

* time out -> timeout

Co-authored-by: Duncan Harvey <[email protected]>

---------

Co-authored-by: Duncan Harvey <[email protected]>
  • Loading branch information
2 people authored and CelianR committed Apr 26, 2024
1 parent 0f77d7f commit 82c2177
Show file tree
Hide file tree
Showing 11 changed files with 379 additions and 35 deletions.
2 changes: 1 addition & 1 deletion cmd/serverless/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func runAgent() {
ExtraTags: serverlessDaemon.ExtraTags,
Demux: serverlessDaemon.MetricAgent.Demux,
ProcessTrace: ta.Process,
DetectLambdaLibrary: func() bool { return serverlessDaemon.LambdaLibraryDetected },
DetectLambdaLibrary: serverlessDaemon.IsLambdaLibraryDetected,
InferredSpansEnabled: inferredspan.IsInferredSpansEnabled(),
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/serverless/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ type Daemon struct {
// LambdaLibraryDetected represents whether the Datadog Lambda Library was detected in the environment
LambdaLibraryDetected bool

// LambdaLibraryStateLock keeps track of whether the Datadog Lambda Library was detected in the environment
LambdaLibraryStateLock sync.Mutex

// executionSpanIncomplete indicates whether the Lambda span has been completed by the Extension
executionSpanIncomplete bool

// ExecutionSpanStateLock keeps track of whether the serverless Invocation routes have been hit to complete the execution span
ExecutionSpanStateLock sync.Mutex

// runtimeStateMutex is used to ensure that modifying the state of the runtime is thread-safe
runtimeStateMutex sync.Mutex

Expand Down Expand Up @@ -435,3 +444,24 @@ func (d *Daemon) setTraceTags(tagMap map[string]string) bool {
}
return false
}

// IsLambdaLibraryDetected returns if the Lambda Library is in use
func (d *Daemon) IsLambdaLibraryDetected() bool {
d.LambdaLibraryStateLock.Lock()
defer d.LambdaLibraryStateLock.Unlock()
return d.LambdaLibraryDetected
}

// IsExecutionSpanIncomplete checks if the Lambda execution span was finished
func (d *Daemon) IsExecutionSpanIncomplete() bool {
d.ExecutionSpanStateLock.Lock()
defer d.ExecutionSpanStateLock.Unlock()
return d.executionSpanIncomplete
}

// SetExecutionSpanIncomplete keeps track of whether the Extension completed the Lambda execution span
func (d *Daemon) SetExecutionSpanIncomplete(spanIncomplete bool) {
d.ExecutionSpanStateLock.Lock()
defer d.ExecutionSpanStateLock.Unlock()
d.executionSpanIncomplete = spanIncomplete
}
4 changes: 4 additions & 0 deletions pkg/serverless/daemon/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Hello struct {
//nolint:revive // TODO(SERV) Fix revive linter
func (h *Hello) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Debug("Hit on the serverless.Hello route.")
h.daemon.LambdaLibraryStateLock.Lock()
defer h.daemon.LambdaLibraryStateLock.Unlock()
h.daemon.LambdaLibraryDetected = true
}

Expand Down Expand Up @@ -53,6 +55,7 @@ type StartInvocation struct {

func (s *StartInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Debug("Hit on the serverless.StartInvocation route.")
s.daemon.SetExecutionSpanIncomplete(true)
startTime := time.Now()
reqBody, err := io.ReadAll(r.Body)
if err != nil {
Expand Down Expand Up @@ -86,6 +89,7 @@ type EndInvocation struct {

func (e *EndInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Debug("Hit on the serverless.EndInvocation route.")
e.daemon.SetExecutionSpanIncomplete(false)
endTime := time.Now()
ecs := e.daemon.ExecutionContext.GetCurrentState()
coldStartTags := e.daemon.ExecutionContext.GetColdStartTagsForRequestID(ecs.LastRequestID)
Expand Down
54 changes: 54 additions & 0 deletions pkg/serverless/daemon/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,30 @@ func TestTraceContext(t *testing.T) {
}
}

func TestHello(t *testing.T) {
assert := assert.New(t)

port := testutil.FreeTCPPort(t)
d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port))
time.Sleep(100 * time.Millisecond)
defer d.Stop()
d.InvocationProcessor = &invocationlifecycle.LifecycleProcessor{
ExtraTags: d.ExtraTags,
Demux: nil,
ProcessTrace: nil,
DetectLambdaLibrary: d.IsLambdaLibraryDetected,
}
client := &http.Client{}
body := bytes.NewBuffer([]byte(`{}`))
request, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/lambda/hello", port), body)
assert.Nil(err)
assert.False(d.IsLambdaLibraryDetected())
response, err := client.Do(request)
assert.Nil(err)
response.Body.Close()
assert.True(d.IsLambdaLibraryDetected())
}

func TestStartEndInvocationSpanParenting(t *testing.T) {
port := testutil.FreeTCPPort(t)
d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port))
Expand Down Expand Up @@ -332,6 +356,36 @@ func TestStartEndInvocationSpanParenting(t *testing.T) {
}
}

func TestStartEndInvocationIsExecutionSpanIncomplete(t *testing.T) {
assert := assert.New(t)
port := testutil.FreeTCPPort(t)
d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port))
time.Sleep(100 * time.Millisecond)
defer d.Stop()

m := &mockLifecycleProcessor{}
d.InvocationProcessor = m

client := &http.Client{}
body := bytes.NewBuffer([]byte(`{"key": "value"}`))
startReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/lambda/start-invocation", port), body)
assert.Nil(err)
startResp, err := client.Do(startReq)
assert.Nil(err)
startResp.Body.Close()
assert.True(m.OnInvokeStartCalled)
assert.True(d.IsExecutionSpanIncomplete())

body = bytes.NewBuffer([]byte(`{}`))
endReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/lambda/end-invocation", port), body)
assert.Nil(err)
endResp, err := client.Do(endReq)
assert.Nil(err)
endResp.Body.Close()
assert.True(m.OnInvokeEndCalled)
assert.False(d.IsExecutionSpanIncomplete())
}

// Helper function for reading test file
func getEventFromFile(filename string) string {
event, err := os.ReadFile("../trace/testdata/event_samples/" + filename)
Expand Down
1 change: 1 addition & 0 deletions pkg/serverless/invocationlifecycle/invocation_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type InvocationStartDetails struct {
type InvocationEndDetails struct {
EndTime time.Time
IsError bool
IsTimeout bool
RequestID string
ResponseRawPayload []byte
ColdStart bool
Expand Down
53 changes: 31 additions & 22 deletions pkg/serverless/invocationlifecycle/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,32 +281,14 @@ func (lp *LifecycleProcessor) OnInvokeEnd(endDetails *InvocationEndDetails) {
spans = append(spans, span)

if lp.InferredSpansEnabled {
log.Debug("[lifecycle] Attempting to complete the inferred span")
log.Debugf("[lifecycle] Inferred span context: %+v", lp.GetInferredSpan().Span)
if lp.GetInferredSpan().Span.Start != 0 {
span0, span1 := lp.requestHandler.inferredSpans[0], lp.requestHandler.inferredSpans[1]
if span1 != nil {
log.Debug("[lifecycle] Completing a secondary inferred span")
lp.setParentIDForMultipleInferredSpans()
span1.AddTagToInferredSpan("http.status_code", statusCode)
span1.AddTagToInferredSpan("peer.service", lp.GetServiceName())
span := lp.completeInferredSpan(span1, lp.getInferredSpanStart(), endDetails.IsError)
spans = append(spans, span)
log.Debug("[lifecycle] The secondary inferred span attributes are %v", lp.requestHandler.inferredSpans[1])
}
span0.AddTagToInferredSpan("http.status_code", statusCode)
span0.AddTagToInferredSpan("peer.service", lp.GetServiceName())
span := lp.completeInferredSpan(span0, endDetails.EndTime, endDetails.IsError)
spans = append(spans, span)
log.Debugf("[lifecycle] The inferred span attributes are: %v", lp.GetInferredSpan())
} else {
log.Debug("[lifecyle] Failed to complete inferred span due to a missing start time. Please check that the event payload was received with the appropriate data")
}
inferredSpans := lp.endInferredSpan(statusCode, endDetails.EndTime, endDetails.IsError)
spans = append(spans, inferredSpans...)
}
lp.processTrace(spans)
}

if endDetails.IsError {
// We don't submit an error metric on timeouts since it should have already been submitted when the Extension receives a SHUTDOWN event
if endDetails.IsError && !endDetails.IsTimeout {
serverlessMetrics.SendErrorsEnhancedMetric(
lp.ExtraTags.Tags, endDetails.EndTime, lp.Demux,
)
Expand Down Expand Up @@ -385,3 +367,30 @@ func (lp *LifecycleProcessor) setParentIDForMultipleInferredSpans() {
lp.requestHandler.inferredSpans[1].Span.ParentID = lp.requestHandler.inferredSpans[0].Span.ParentID
lp.requestHandler.inferredSpans[0].Span.ParentID = lp.requestHandler.inferredSpans[1].Span.SpanID
}

// endInferredSpan attempts to complete any inferred spans and send them to intake
func (lp *LifecycleProcessor) endInferredSpan(statusCode string, endTime time.Time, isError bool) []*pb.Span {
spans := make([]*pb.Span, 0, 2)
log.Debug("[lifecycle] Attempting to complete the inferred span")
log.Debugf("[lifecycle] Inferred span context: %+v", lp.GetInferredSpan().Span)
if lp.GetInferredSpan().Span.Start != 0 {
span0, span1 := lp.requestHandler.inferredSpans[0], lp.requestHandler.inferredSpans[1]
if span1 != nil {
log.Debug("[lifecycle] Completing a secondary inferred span")
lp.setParentIDForMultipleInferredSpans()
span1.AddTagToInferredSpan("http.status_code", statusCode)
span1.AddTagToInferredSpan("peer.service", lp.GetServiceName())
span := lp.completeInferredSpan(span1, lp.getInferredSpanStart(), isError)
spans = append(spans, span)
log.Debug("[lifecycle] The secondary inferred span attributes are %v", lp.requestHandler.inferredSpans[1])
}
span0.AddTagToInferredSpan("http.status_code", statusCode)
span0.AddTagToInferredSpan("peer.service", lp.GetServiceName())
span := lp.completeInferredSpan(span0, endTime, isError)
spans = append(spans, span)
log.Debugf("[lifecycle] The inferred span attributes are: %v", lp.GetInferredSpan())
} else {
log.Debug("[lifecyle] Failed to complete inferred span due to a missing start time. Please check that the event payload was received with the appropriate data")
}
return spans
}
117 changes: 117 additions & 0 deletions pkg/serverless/invocationlifecycle/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,123 @@ func TestCompleteInferredSpanWithOutStartTime(t *testing.T) {
completedInferredSpan := tracePayload.TracerPayload.Chunks[0].Spans[0]
assert.Equal(t, startInvocationTime.UnixNano(), completedInferredSpan.Start)
}

func TestTimeoutExecutionSpan(t *testing.T) {
t.Setenv(functionNameEnvVar, "my-function")
t.Setenv("DD_SERVICE", "mock-lambda-service")

extraTags := &logs.Tags{
Tags: []string{"functionname:test-function"},
}
demux := createDemultiplexer(t)
defer demux.Stop(false)
mockDetectLambdaLibrary := func() bool { return false }

var tracePayload *api.Payload
mockProcessTrace := func(payload *api.Payload) {
tracePayload = payload
}

testProcessor := LifecycleProcessor{
ExtraTags: extraTags,
ProcessTrace: mockProcessTrace,
DetectLambdaLibrary: mockDetectLambdaLibrary,
Demux: demux,
InferredSpansEnabled: true,
}
startTime := time.Now()
duration := 1 * time.Second
endTime := startTime.Add(duration)
startDetails := InvocationStartDetails{
StartTime: time.Now(),
InvokeEventRawPayload: []byte(`{}`),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
}
testProcessor.OnInvokeStart(&startDetails)

timeoutCtx := &InvocationEndDetails{
RequestID: "test-request-id",
Runtime: "java11",
ColdStart: false,
ProactiveInit: false,
EndTime: endTime,
IsError: true,
IsTimeout: true,
ResponseRawPayload: nil,
}
testProcessor.OnInvokeEnd(timeoutCtx)

spans := tracePayload.TracerPayload.Chunks[0].Spans
assert.Equal(t, 1, len(spans))
// No trace context passed
assert.NotZero(t, testProcessor.GetExecutionInfo().TraceID)
assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().SpanID)
assert.Equal(t, int32(-128), tracePayload.TracerPayload.Chunks[0].Priority)
// New trace ID and span ID has been created
assert.NotEqual(t, uint64(0), spans[0].TraceID)
assert.NotEqual(t, uint64(0), spans[0].SpanID)
assert.Equal(t, spans[0].TraceID, testProcessor.GetExecutionInfo().TraceID)
assert.Equal(t, spans[0].Error, int32(1))
assert.Equal(t, spans[0].GetMeta()["request_id"], "test-request-id")
assert.Equal(t, spans[0].GetMeta()["language"], "java")
}

func TestTimeoutExecutionSpanWithTraceContext(t *testing.T) {
t.Setenv(functionNameEnvVar, "my-function")
t.Setenv("DD_SERVICE", "mock-lambda-service")

extraTags := &logs.Tags{
Tags: []string{"functionname:test-function"},
}
demux := createDemultiplexer(t)
defer demux.Stop(false)
mockDetectLambdaLibrary := func() bool { return false }

var tracePayload *api.Payload
mockProcessTrace := func(payload *api.Payload) {
tracePayload = payload
}

testProcessor := LifecycleProcessor{
ExtraTags: extraTags,
ProcessTrace: mockProcessTrace,
DetectLambdaLibrary: mockDetectLambdaLibrary,
Demux: demux,
InferredSpansEnabled: true,
}
eventPayload := `a5a{"resource":"/users/create","path":"/users/create","httpMethod":"GET","headers":{"Accept":"*/*","Accept-Encoding":"gzip","x-datadog-parent-id":"1480558859903409531","x-datadog-sampling-priority":"1","x-datadog-trace-id":"5736943178450432258"}}0`
startTime := time.Now()
duration := 1 * time.Second
endTime := startTime.Add(duration)
startDetails := InvocationStartDetails{
StartTime: startTime,
InvokeEventRawPayload: []byte(eventPayload),
InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function",
}
testProcessor.OnInvokeStart(&startDetails)
timeoutCtx := &InvocationEndDetails{
RequestID: "test-request-id",
Runtime: "java11",
ColdStart: false,
ProactiveInit: false,
EndTime: endTime,
IsError: true,
IsTimeout: true,
ResponseRawPayload: nil,
}
testProcessor.OnInvokeEnd(timeoutCtx)

spans := tracePayload.TracerPayload.Chunks[0].Spans
assert.Equal(t, 1, len(spans))
// Trace context received
assert.Equal(t, spans[0].GetTraceID(), testProcessor.GetExecutionInfo().TraceID)
assert.Equal(t, spans[0].GetParentID(), testProcessor.GetExecutionInfo().parentID)
assert.Equal(t, tracePayload.TracerPayload.Chunks[0].Priority, int32(testProcessor.GetExecutionInfo().SamplingPriority))
assert.Equal(t, spans[0].Error, int32(1))
assert.Equal(t, spans[0].GetMeta()["request_id"], "test-request-id")
assert.Equal(t, spans[0].GetMeta()["language"], "java")
}

func TestTriggerTypesLifecycleEventForAPIGatewayRest(t *testing.T) {
startDetails := &InvocationStartDetails{
InvokeEventRawPayload: getEventFromFile("api-gateway.json"),
Expand Down
Loading

0 comments on commit 82c2177

Please sign in to comment.