diff --git a/cmd/serverless/main.go b/cmd/serverless/main.go index 8eec195a10e6f..9cd0819653b39 100644 --- a/cmd/serverless/main.go +++ b/cmd/serverless/main.go @@ -134,7 +134,7 @@ func runAgent() { ExtraTags: serverlessDaemon.ExtraTags, Demux: serverlessDaemon.MetricAgent.Demux, ProcessTrace: ta.Process, - DetectLambdaLibrary: func() bool { return serverlessDaemon.LambdaLibraryDetected }, + DetectLambdaLibrary: serverlessDaemon.IsLambdaLibraryDetected, InferredSpansEnabled: inferredspan.IsInferredSpansEnabled(), } diff --git a/pkg/serverless/daemon/daemon.go b/pkg/serverless/daemon/daemon.go index 21386b9653449..58bc1ac85190e 100644 --- a/pkg/serverless/daemon/daemon.go +++ b/pkg/serverless/daemon/daemon.go @@ -66,6 +66,15 @@ type Daemon struct { // LambdaLibraryDetected represents whether the Datadog Lambda Library was detected in the environment LambdaLibraryDetected bool + // LambdaLibraryStateLock keeps track of whether the Datadog Lambda Library was detected in the environment + LambdaLibraryStateLock sync.Mutex + + // executionSpanIncomplete indicates whether the Lambda span has been completed by the Extension + executionSpanIncomplete bool + + // ExecutionSpanStateLock keeps track of whether the serverless Invocation routes have been hit to complete the execution span + ExecutionSpanStateLock sync.Mutex + // runtimeStateMutex is used to ensure that modifying the state of the runtime is thread-safe runtimeStateMutex sync.Mutex @@ -435,3 +444,24 @@ func (d *Daemon) setTraceTags(tagMap map[string]string) bool { } return false } + +// IsLambdaLibraryDetected returns if the Lambda Library is in use +func (d *Daemon) IsLambdaLibraryDetected() bool { + d.LambdaLibraryStateLock.Lock() + defer d.LambdaLibraryStateLock.Unlock() + return d.LambdaLibraryDetected +} + +// IsExecutionSpanIncomplete checks if the Lambda execution span was finished +func (d *Daemon) IsExecutionSpanIncomplete() bool { + d.ExecutionSpanStateLock.Lock() + defer d.ExecutionSpanStateLock.Unlock() + return d.executionSpanIncomplete +} + +// SetExecutionSpanIncomplete keeps track of whether the Extension completed the Lambda execution span +func (d *Daemon) SetExecutionSpanIncomplete(spanIncomplete bool) { + d.ExecutionSpanStateLock.Lock() + defer d.ExecutionSpanStateLock.Unlock() + d.executionSpanIncomplete = spanIncomplete +} diff --git a/pkg/serverless/daemon/routes.go b/pkg/serverless/daemon/routes.go index 1b2379d8e1822..93e113782dbb8 100644 --- a/pkg/serverless/daemon/routes.go +++ b/pkg/serverless/daemon/routes.go @@ -26,6 +26,8 @@ type Hello struct { //nolint:revive // TODO(SERV) Fix revive linter func (h *Hello) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.Hello route.") + h.daemon.LambdaLibraryStateLock.Lock() + defer h.daemon.LambdaLibraryStateLock.Unlock() h.daemon.LambdaLibraryDetected = true } @@ -53,6 +55,7 @@ type StartInvocation struct { func (s *StartInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.StartInvocation route.") + s.daemon.SetExecutionSpanIncomplete(true) startTime := time.Now() reqBody, err := io.ReadAll(r.Body) if err != nil { @@ -86,6 +89,7 @@ type EndInvocation struct { func (e *EndInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.EndInvocation route.") + e.daemon.SetExecutionSpanIncomplete(false) endTime := time.Now() ecs := e.daemon.ExecutionContext.GetCurrentState() coldStartTags := e.daemon.ExecutionContext.GetColdStartTagsForRequestID(ecs.LastRequestID) diff --git a/pkg/serverless/daemon/routes_test.go b/pkg/serverless/daemon/routes_test.go index eab3e09e6be02..0cdae0c594057 100644 --- a/pkg/serverless/daemon/routes_test.go +++ b/pkg/serverless/daemon/routes_test.go @@ -161,6 +161,30 @@ func TestTraceContext(t *testing.T) { } } +func TestHello(t *testing.T) { + assert := assert.New(t) + + port := testutil.FreeTCPPort(t) + d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port)) + time.Sleep(100 * time.Millisecond) + defer d.Stop() + d.InvocationProcessor = &invocationlifecycle.LifecycleProcessor{ + ExtraTags: d.ExtraTags, + Demux: nil, + ProcessTrace: nil, + DetectLambdaLibrary: d.IsLambdaLibraryDetected, + } + client := &http.Client{} + body := bytes.NewBuffer([]byte(`{}`)) + request, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/lambda/hello", port), body) + assert.Nil(err) + assert.False(d.IsLambdaLibraryDetected()) + response, err := client.Do(request) + assert.Nil(err) + response.Body.Close() + assert.True(d.IsLambdaLibraryDetected()) +} + func TestStartEndInvocationSpanParenting(t *testing.T) { port := testutil.FreeTCPPort(t) d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port)) @@ -332,6 +356,36 @@ func TestStartEndInvocationSpanParenting(t *testing.T) { } } +func TestStartEndInvocationIsExecutionSpanIncomplete(t *testing.T) { + assert := assert.New(t) + port := testutil.FreeTCPPort(t) + d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port)) + time.Sleep(100 * time.Millisecond) + defer d.Stop() + + m := &mockLifecycleProcessor{} + d.InvocationProcessor = m + + client := &http.Client{} + body := bytes.NewBuffer([]byte(`{"key": "value"}`)) + startReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/lambda/start-invocation", port), body) + assert.Nil(err) + startResp, err := client.Do(startReq) + assert.Nil(err) + startResp.Body.Close() + assert.True(m.OnInvokeStartCalled) + assert.True(d.IsExecutionSpanIncomplete()) + + body = bytes.NewBuffer([]byte(`{}`)) + endReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/lambda/end-invocation", port), body) + assert.Nil(err) + endResp, err := client.Do(endReq) + assert.Nil(err) + endResp.Body.Close() + assert.True(m.OnInvokeEndCalled) + assert.False(d.IsExecutionSpanIncomplete()) +} + // Helper function for reading test file func getEventFromFile(filename string) string { event, err := os.ReadFile("../trace/testdata/event_samples/" + filename) diff --git a/pkg/serverless/invocationlifecycle/invocation_details.go b/pkg/serverless/invocationlifecycle/invocation_details.go index bd0e285f8d377..0ad7d0a98b8ea 100644 --- a/pkg/serverless/invocationlifecycle/invocation_details.go +++ b/pkg/serverless/invocationlifecycle/invocation_details.go @@ -27,6 +27,7 @@ type InvocationStartDetails struct { type InvocationEndDetails struct { EndTime time.Time IsError bool + IsTimeout bool RequestID string ResponseRawPayload []byte ColdStart bool diff --git a/pkg/serverless/invocationlifecycle/lifecycle.go b/pkg/serverless/invocationlifecycle/lifecycle.go index d8c470b187db5..90e931767cef1 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle.go +++ b/pkg/serverless/invocationlifecycle/lifecycle.go @@ -281,32 +281,14 @@ func (lp *LifecycleProcessor) OnInvokeEnd(endDetails *InvocationEndDetails) { spans = append(spans, span) if lp.InferredSpansEnabled { - log.Debug("[lifecycle] Attempting to complete the inferred span") - log.Debugf("[lifecycle] Inferred span context: %+v", lp.GetInferredSpan().Span) - if lp.GetInferredSpan().Span.Start != 0 { - span0, span1 := lp.requestHandler.inferredSpans[0], lp.requestHandler.inferredSpans[1] - if span1 != nil { - log.Debug("[lifecycle] Completing a secondary inferred span") - lp.setParentIDForMultipleInferredSpans() - span1.AddTagToInferredSpan("http.status_code", statusCode) - span1.AddTagToInferredSpan("peer.service", lp.GetServiceName()) - span := lp.completeInferredSpan(span1, lp.getInferredSpanStart(), endDetails.IsError) - spans = append(spans, span) - log.Debug("[lifecycle] The secondary inferred span attributes are %v", lp.requestHandler.inferredSpans[1]) - } - span0.AddTagToInferredSpan("http.status_code", statusCode) - span0.AddTagToInferredSpan("peer.service", lp.GetServiceName()) - span := lp.completeInferredSpan(span0, endDetails.EndTime, endDetails.IsError) - spans = append(spans, span) - log.Debugf("[lifecycle] The inferred span attributes are: %v", lp.GetInferredSpan()) - } else { - log.Debug("[lifecyle] Failed to complete inferred span due to a missing start time. Please check that the event payload was received with the appropriate data") - } + inferredSpans := lp.endInferredSpan(statusCode, endDetails.EndTime, endDetails.IsError) + spans = append(spans, inferredSpans...) } lp.processTrace(spans) } - if endDetails.IsError { + // We don't submit an error metric on timeouts since it should have already been submitted when the Extension receives a SHUTDOWN event + if endDetails.IsError && !endDetails.IsTimeout { serverlessMetrics.SendErrorsEnhancedMetric( lp.ExtraTags.Tags, endDetails.EndTime, lp.Demux, ) @@ -385,3 +367,30 @@ func (lp *LifecycleProcessor) setParentIDForMultipleInferredSpans() { lp.requestHandler.inferredSpans[1].Span.ParentID = lp.requestHandler.inferredSpans[0].Span.ParentID lp.requestHandler.inferredSpans[0].Span.ParentID = lp.requestHandler.inferredSpans[1].Span.SpanID } + +// endInferredSpan attempts to complete any inferred spans and send them to intake +func (lp *LifecycleProcessor) endInferredSpan(statusCode string, endTime time.Time, isError bool) []*pb.Span { + spans := make([]*pb.Span, 0, 2) + log.Debug("[lifecycle] Attempting to complete the inferred span") + log.Debugf("[lifecycle] Inferred span context: %+v", lp.GetInferredSpan().Span) + if lp.GetInferredSpan().Span.Start != 0 { + span0, span1 := lp.requestHandler.inferredSpans[0], lp.requestHandler.inferredSpans[1] + if span1 != nil { + log.Debug("[lifecycle] Completing a secondary inferred span") + lp.setParentIDForMultipleInferredSpans() + span1.AddTagToInferredSpan("http.status_code", statusCode) + span1.AddTagToInferredSpan("peer.service", lp.GetServiceName()) + span := lp.completeInferredSpan(span1, lp.getInferredSpanStart(), isError) + spans = append(spans, span) + log.Debug("[lifecycle] The secondary inferred span attributes are %v", lp.requestHandler.inferredSpans[1]) + } + span0.AddTagToInferredSpan("http.status_code", statusCode) + span0.AddTagToInferredSpan("peer.service", lp.GetServiceName()) + span := lp.completeInferredSpan(span0, endTime, isError) + spans = append(spans, span) + log.Debugf("[lifecycle] The inferred span attributes are: %v", lp.GetInferredSpan()) + } else { + log.Debug("[lifecyle] Failed to complete inferred span due to a missing start time. Please check that the event payload was received with the appropriate data") + } + return spans +} diff --git a/pkg/serverless/invocationlifecycle/lifecycle_test.go b/pkg/serverless/invocationlifecycle/lifecycle_test.go index e33d574035dd7..b7ee5aaa3057d 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle_test.go +++ b/pkg/serverless/invocationlifecycle/lifecycle_test.go @@ -379,6 +379,123 @@ func TestCompleteInferredSpanWithOutStartTime(t *testing.T) { completedInferredSpan := tracePayload.TracerPayload.Chunks[0].Spans[0] assert.Equal(t, startInvocationTime.UnixNano(), completedInferredSpan.Start) } + +func TestTimeoutExecutionSpan(t *testing.T) { + t.Setenv(functionNameEnvVar, "my-function") + t.Setenv("DD_SERVICE", "mock-lambda-service") + + extraTags := &logs.Tags{ + Tags: []string{"functionname:test-function"}, + } + demux := createDemultiplexer(t) + defer demux.Stop(false) + mockDetectLambdaLibrary := func() bool { return false } + + var tracePayload *api.Payload + mockProcessTrace := func(payload *api.Payload) { + tracePayload = payload + } + + testProcessor := LifecycleProcessor{ + ExtraTags: extraTags, + ProcessTrace: mockProcessTrace, + DetectLambdaLibrary: mockDetectLambdaLibrary, + Demux: demux, + InferredSpansEnabled: true, + } + startTime := time.Now() + duration := 1 * time.Second + endTime := startTime.Add(duration) + startDetails := InvocationStartDetails{ + StartTime: time.Now(), + InvokeEventRawPayload: []byte(`{}`), + InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function", + } + testProcessor.OnInvokeStart(&startDetails) + + timeoutCtx := &InvocationEndDetails{ + RequestID: "test-request-id", + Runtime: "java11", + ColdStart: false, + ProactiveInit: false, + EndTime: endTime, + IsError: true, + IsTimeout: true, + ResponseRawPayload: nil, + } + testProcessor.OnInvokeEnd(timeoutCtx) + + spans := tracePayload.TracerPayload.Chunks[0].Spans + assert.Equal(t, 1, len(spans)) + // No trace context passed + assert.NotZero(t, testProcessor.GetExecutionInfo().TraceID) + assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().SpanID) + assert.Equal(t, int32(-128), tracePayload.TracerPayload.Chunks[0].Priority) + // New trace ID and span ID has been created + assert.NotEqual(t, uint64(0), spans[0].TraceID) + assert.NotEqual(t, uint64(0), spans[0].SpanID) + assert.Equal(t, spans[0].TraceID, testProcessor.GetExecutionInfo().TraceID) + assert.Equal(t, spans[0].Error, int32(1)) + assert.Equal(t, spans[0].GetMeta()["request_id"], "test-request-id") + assert.Equal(t, spans[0].GetMeta()["language"], "java") +} + +func TestTimeoutExecutionSpanWithTraceContext(t *testing.T) { + t.Setenv(functionNameEnvVar, "my-function") + t.Setenv("DD_SERVICE", "mock-lambda-service") + + extraTags := &logs.Tags{ + Tags: []string{"functionname:test-function"}, + } + demux := createDemultiplexer(t) + defer demux.Stop(false) + mockDetectLambdaLibrary := func() bool { return false } + + var tracePayload *api.Payload + mockProcessTrace := func(payload *api.Payload) { + tracePayload = payload + } + + testProcessor := LifecycleProcessor{ + ExtraTags: extraTags, + ProcessTrace: mockProcessTrace, + DetectLambdaLibrary: mockDetectLambdaLibrary, + Demux: demux, + InferredSpansEnabled: true, + } + eventPayload := `a5a{"resource":"/users/create","path":"/users/create","httpMethod":"GET","headers":{"Accept":"*/*","Accept-Encoding":"gzip","x-datadog-parent-id":"1480558859903409531","x-datadog-sampling-priority":"1","x-datadog-trace-id":"5736943178450432258"}}0` + startTime := time.Now() + duration := 1 * time.Second + endTime := startTime.Add(duration) + startDetails := InvocationStartDetails{ + StartTime: startTime, + InvokeEventRawPayload: []byte(eventPayload), + InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function", + } + testProcessor.OnInvokeStart(&startDetails) + timeoutCtx := &InvocationEndDetails{ + RequestID: "test-request-id", + Runtime: "java11", + ColdStart: false, + ProactiveInit: false, + EndTime: endTime, + IsError: true, + IsTimeout: true, + ResponseRawPayload: nil, + } + testProcessor.OnInvokeEnd(timeoutCtx) + + spans := tracePayload.TracerPayload.Chunks[0].Spans + assert.Equal(t, 1, len(spans)) + // Trace context received + assert.Equal(t, spans[0].GetTraceID(), testProcessor.GetExecutionInfo().TraceID) + assert.Equal(t, spans[0].GetParentID(), testProcessor.GetExecutionInfo().parentID) + assert.Equal(t, tracePayload.TracerPayload.Chunks[0].Priority, int32(testProcessor.GetExecutionInfo().SamplingPriority)) + assert.Equal(t, spans[0].Error, int32(1)) + assert.Equal(t, spans[0].GetMeta()["request_id"], "test-request-id") + assert.Equal(t, spans[0].GetMeta()["language"], "java") +} + func TestTriggerTypesLifecycleEventForAPIGatewayRest(t *testing.T) { startDetails := &InvocationStartDetails{ InvokeEventRawPayload: getEventFromFile("api-gateway.json"), diff --git a/pkg/serverless/invocationlifecycle/trace.go b/pkg/serverless/invocationlifecycle/trace.go index 6ed2344b1014f..cfd545ed144f4 100644 --- a/pkg/serverless/invocationlifecycle/trace.go +++ b/pkg/serverless/invocationlifecycle/trace.go @@ -18,6 +18,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/config" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" + "github.com/DataDog/datadog-agent/pkg/serverless/random" "github.com/DataDog/datadog-agent/pkg/serverless/trace/inferredspan" "github.com/DataDog/datadog-agent/pkg/trace/api" "github.com/DataDog/datadog-agent/pkg/trace/info" @@ -76,18 +77,29 @@ func (lp *LifecycleProcessor) startExecutionSpan(event interface{}, rawPayload [ // It should be called at the end of the invocation. func (lp *LifecycleProcessor) endExecutionSpan(endDetails *InvocationEndDetails) *pb.Span { executionContext := lp.GetExecutionInfo() - duration := endDetails.EndTime.UnixNano() - executionContext.startTime.UnixNano() + start := executionContext.startTime.UnixNano() + + traceID := executionContext.TraceID + spanID := executionContext.SpanID + // If we fail to receive the trace and span IDs from the tracer during a timeout we create it ourselves + if endDetails.IsTimeout && traceID == 0 { + traceID = random.Random.Uint64() + lp.requestHandler.executionInfo.TraceID = traceID + } + if endDetails.IsTimeout && spanID == 0 { + spanID = random.Random.Uint64() + } executionSpan := &pb.Span{ Service: "aws.lambda", // will be replaced by the span processor Name: "aws.lambda", Resource: os.Getenv(functionNameEnvVar), Type: "serverless", - TraceID: executionContext.TraceID, - SpanID: executionContext.SpanID, + TraceID: traceID, + SpanID: spanID, ParentID: executionContext.parentID, - Start: executionContext.startTime.UnixNano(), - Duration: duration, + Start: start, + Duration: endDetails.EndTime.UnixNano() - start, Meta: lp.requestHandler.triggerTags, Metrics: lp.requestHandler.triggerMetrics, } @@ -110,17 +122,19 @@ func (lp *LifecycleProcessor) endExecutionSpan(endDetails *InvocationEndDetails) } else { capturePayloadAsTags(requestPayloadJSON, executionSpan, "function.request", 0, capturePayloadMaxDepth) } - responsePayloadJSON := make(map[string]interface{}) - if err := json.Unmarshal(endDetails.ResponseRawPayload, &responsePayloadJSON); err != nil { - log.Debugf("[lifecycle] Failed to parse response payload: %v", err) - executionSpan.Meta["function.response"] = string(endDetails.ResponseRawPayload) - } else { - capturePayloadAsTags(responsePayloadJSON, executionSpan, "function.response", 0, capturePayloadMaxDepth) + if endDetails.ResponseRawPayload != nil { + responsePayloadJSON := make(map[string]interface{}) + if err := json.Unmarshal(endDetails.ResponseRawPayload, &responsePayloadJSON); err != nil { + log.Debugf("[lifecycle] Failed to parse response payload: %v", err) + executionSpan.Meta["function.response"] = string(endDetails.ResponseRawPayload) + } else { + capturePayloadAsTags(responsePayloadJSON, executionSpan, "function.response", 0, capturePayloadMaxDepth) + } } } - if endDetails.IsError { executionSpan.Error = 1 + if len(endDetails.ErrorMsg) > 0 { executionSpan.Meta["error.msg"] = endDetails.ErrorMsg } @@ -130,6 +144,11 @@ func (lp *LifecycleProcessor) endExecutionSpan(endDetails *InvocationEndDetails) if len(endDetails.ErrorStack) > 0 { executionSpan.Meta["error.stack"] = endDetails.ErrorStack } + + if endDetails.IsTimeout { + executionSpan.Meta["error.type"] = "Impending Timeout" + executionSpan.Meta["error.msg"] = "Datadog detected an Impending Timeout" + } } return executionSpan diff --git a/pkg/serverless/invocationlifecycle/trace_test.go b/pkg/serverless/invocationlifecycle/trace_test.go index 0b925f9a25be6..6b45d32755165 100644 --- a/pkg/serverless/invocationlifecycle/trace_test.go +++ b/pkg/serverless/invocationlifecycle/trace_test.go @@ -649,6 +649,54 @@ func TestEndExecutionSpanWithError(t *testing.T) { assert.Equal(t, executionSpan.Error, int32(1)) } +func TestEndExecutionSpanWithTimeout(t *testing.T) { + t.Setenv(functionNameEnvVar, "TestFunction") + currentExecutionInfo := &ExecutionStartInfo{} + lp := &LifecycleProcessor{ + requestHandler: &RequestHandler{ + executionInfo: currentExecutionInfo, + triggerTags: make(map[string]string), + }, + } + + startTime := time.Now() + startDetails := &InvocationStartDetails{ + StartTime: startTime, + InvokeEventHeaders: http.Header{}, + } + lp.startExecutionSpan(nil, []byte("[]"), startDetails) + + assert.Zero(t, currentExecutionInfo.TraceID) + assert.Zero(t, currentExecutionInfo.SpanID) + + duration := 1 * time.Second + endTime := startTime.Add(duration) + + endDetails := &InvocationEndDetails{ + EndTime: endTime, + IsError: true, + IsTimeout: true, + RequestID: "test-request-id", + ResponseRawPayload: nil, + ColdStart: true, + ProactiveInit: false, + Runtime: "dotnet6", + } + executionSpan := lp.endExecutionSpan(endDetails) + assert.Equal(t, "aws.lambda", executionSpan.Name) + assert.Equal(t, "aws.lambda", executionSpan.Service) + assert.Equal(t, "TestFunction", executionSpan.Resource) + assert.Equal(t, "serverless", executionSpan.Type) + assert.Equal(t, "dotnet", executionSpan.Meta["language"]) + assert.Equal(t, lp.requestHandler.executionInfo.TraceID, executionSpan.TraceID) + assert.NotZero(t, executionSpan.TraceID) + assert.NotZero(t, executionSpan.SpanID) + assert.Equal(t, startTime.UnixNano(), executionSpan.Start) + assert.Equal(t, duration.Nanoseconds(), executionSpan.Duration) + assert.Equal(t, "Impending Timeout", executionSpan.Meta["error.type"]) + assert.Equal(t, "Datadog detected an Impending Timeout", executionSpan.Meta["error.msg"]) +} + func TestParseLambdaPayload(t *testing.T) { assert.Equal(t, []byte(""), ParseLambdaPayload([]byte(""))) assert.Equal(t, []byte("{}"), ParseLambdaPayload([]byte("{}"))) diff --git a/pkg/serverless/serverless.go b/pkg/serverless/serverless.go index 091494b15afce..24c04e22a08ad 100644 --- a/pkg/serverless/serverless.go +++ b/pkg/serverless/serverless.go @@ -18,6 +18,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/serverless/daemon" "github.com/DataDog/datadog-agent/pkg/serverless/flush" + "github.com/DataDog/datadog-agent/pkg/serverless/invocationlifecycle" "github.com/DataDog/datadog-agent/pkg/serverless/metrics" "github.com/DataDog/datadog-agent/pkg/serverless/registration" "github.com/DataDog/datadog-agent/pkg/serverless/tags" @@ -139,6 +140,10 @@ func WaitForNextInvocation(stopCh chan struct{}, daemon *daemon.Daemon, id regis metricTags = tags.AddInitTypeTag(metricTags) metrics.SendTimeoutEnhancedMetric(metricTags, daemon.MetricAgent.Demux) metrics.SendErrorsEnhancedMetric(metricTags, time.Now(), daemon.MetricAgent.Demux) + + if daemon.IsExecutionSpanIncomplete() { + finishTimeoutExecutionSpan(daemon, coldStartTags.IsColdStart, coldStartTags.IsProactiveInit) + } } err := daemon.ExecutionContext.SaveCurrentExecutionContext() if err != nil { @@ -214,3 +219,20 @@ func removeQualifierFromArn(functionArn string) string { } return functionArn } + +func finishTimeoutExecutionSpan(daemon *daemon.Daemon, isColdStart bool, isProactiveInit bool) { + ecs := daemon.ExecutionContext.GetCurrentState() + timeoutDetails := &invocationlifecycle.InvocationEndDetails{ + RequestID: ecs.LastRequestID, + Runtime: ecs.Runtime, + ColdStart: isColdStart, + ProactiveInit: isProactiveInit, + EndTime: time.Now(), + IsError: true, + IsTimeout: true, + ResponseRawPayload: nil, + } + log.Debug("Could not complete the execution span due to a timeout. Attempting to finish the span without details from the tracer.") + daemon.InvocationProcessor.OnInvokeEnd(timeoutDetails) + daemon.SetExecutionSpanIncomplete(false) +} diff --git a/pkg/serverless/serverless_test.go b/pkg/serverless/serverless_test.go index ccd144ea939bd..14bd868ab6548 100644 --- a/pkg/serverless/serverless_test.go +++ b/pkg/serverless/serverless_test.go @@ -15,6 +15,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/DataDog/datadog-agent/pkg/serverless/daemon" + "github.com/DataDog/datadog-agent/pkg/serverless/invocationlifecycle" + "github.com/DataDog/datadog-agent/pkg/serverless/trace" + "github.com/DataDog/datadog-agent/pkg/trace/testutil" ) func TestMain(m *testing.M) { @@ -69,3 +72,40 @@ func TestRemoveQualifierFromArnWithoutAlias(t *testing.T) { functionArn := removeQualifierFromArn(invokedFunctionArn) assert.Equal(t, functionArn, invokedFunctionArn) } + +type mockLifecycleProcessor struct { + isError bool + isTimeout bool + isColdStart bool + isProactiveInit bool +} + +func (m *mockLifecycleProcessor) GetExecutionInfo() *invocationlifecycle.ExecutionStartInfo { + return &invocationlifecycle.ExecutionStartInfo{} +} +func (m *mockLifecycleProcessor) OnInvokeStart(*invocationlifecycle.InvocationStartDetails) {} +func (m *mockLifecycleProcessor) OnInvokeEnd(endDetails *invocationlifecycle.InvocationEndDetails) { + m.isError = endDetails.IsError + m.isTimeout = endDetails.IsTimeout + m.isColdStart = endDetails.ColdStart + m.isProactiveInit = endDetails.ProactiveInit +} + +func TestFinishTimeoutExecutionSpan(t *testing.T) { + port := testutil.FreeTCPPort(t) + d := daemon.StartDaemon(fmt.Sprintf("127.0.0.1:%d", port)) + d.TraceAgent = &trace.ServerlessTraceAgent{} + mock := &mockLifecycleProcessor{} + d.InvocationProcessor = mock + defer d.Stop() + + assert.False(t, d.IsExecutionSpanIncomplete()) + d.SetExecutionSpanIncomplete(true) + assert.True(t, d.IsExecutionSpanIncomplete()) + finishTimeoutExecutionSpan(d, true, true) + assert.False(t, d.IsExecutionSpanIncomplete()) + assert.True(t, mock.isError) + assert.True(t, mock.isTimeout) + assert.True(t, mock.isColdStart) + assert.True(t, mock.isProactiveInit) +}