Skip to content

Commit

Permalink
[Serverless] Merge serverless/main to main. (#14826)
Browse files Browse the repository at this point in the history
* [Serverless] change account (#14755)

* Aj/buffer cold start span data (#14664)

* wip dirty commit - trace being created but not flushed properly. No further traces appearing

WIP: more debugging. StopChan properly set up

feat: Starting coldstart creator as a daemon, and recieving data from two channels. Todo: spec

feat: Update specs to write to channels

feat: Merge conflicts resolved for tests

feat: Use smaller methods to handle locking

fix: pass coldstartSpanId to sls-init main

feat: Remove default

feat: Use Millisecond as Second is far longer than necessary

feat: No need to export ColdStartSpanId

fix: update units

feat: Directionality for lambdaSpanChan as well as for initDurationChan

fix: No need for the nil check, I need to stop javascripting my go

feat: ints

* feat: rebase missing changes from merge commits

* feat: update ints after moving accounts

* Empty commit to trigger ci

* [Serverless] Fix flaky integration tests and make them more easily maintainable. (#14783)

* Retry serverless integration test failures automatically. (#14801)

* [Serverless] Allow some keys to be option in serverless integration tests. (#14827)

* Ability to remove items from the json.

* Remove items from snapshot.

Co-authored-by: Maxime David <[email protected]>
Co-authored-by: AJ Stuyvenberg <[email protected]>
  • Loading branch information
3 people authored Dec 22, 2022
1 parent dd27430 commit 96f6442
Show file tree
Hide file tree
Showing 65 changed files with 2,026 additions and 1,768 deletions.
13 changes: 9 additions & 4 deletions .github/workflows/serverless-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
uses: actions/setup-node@v3
with:
node-version: 14

- name: Install Serverless Framework
run: sudo yarn global add serverless@^3.7.9 --prefix /usr/local

Expand All @@ -45,8 +45,13 @@ jobs:
uses: docker/setup-buildx-action@v2

- name: Run tests if AWS credentials are available
uses: nick-fields/retry@v2
env:
AWS_ACCESS_KEY_ID: ${{ secrets.SERVERLESS_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.SERVERLESS_AWS_SECRET_ACCESS_KEY }}
run: ARCHITECTURE=${{ matrix.architecture }} ./test/integration/serverless/run.sh
working-directory: go/src/github.com/DataDog/datadog-agent
AWS_SECRET_ACCESS_KEY: ${{ secrets.SERVERLESS_AWS_SECRET_ACCESS_KEY }}
with:
timeout_minutes: 30
max_attempts: 2
command: |
cd go/src/github.com/DataDog/datadog-agent
ARCHITECTURE=${{ matrix.architecture }} ./test/integration/serverless/run.sh
3 changes: 2 additions & 1 deletion cmd/serverless-init/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/DataDog/datadog-agent/cmd/serverless-init/tag"
"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/serverless/metrics"
"github.com/DataDog/datadog-agent/pkg/serverless/random"
"github.com/DataDog/datadog-agent/pkg/serverless/trace"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ func main() {
}

func setupTraceAgent(traceAgent *trace.ServerlessTraceAgent, tags map[string]string) {
traceAgent.Start(config.Datadog.GetBool("apm_config.enabled"), &trace.LoadConfig{Path: datadogConfigPath}, nil)
traceAgent.Start(config.Datadog.GetBool("apm_config.enabled"), &trace.LoadConfig{Path: datadogConfigPath}, nil, random.Random.Uint64())
traceAgent.SetTags(tag.GetBaseTagsMapWithMetadata(tags))
for range time.Tick(3 * time.Second) {
traceAgent.Flush()
Expand Down
24 changes: 21 additions & 3 deletions cmd/serverless/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
serverlessLogs "github.com/DataDog/datadog-agent/pkg/serverless/logs"
"github.com/DataDog/datadog-agent/pkg/serverless/metrics"
"github.com/DataDog/datadog-agent/pkg/serverless/proxy"
"github.com/DataDog/datadog-agent/pkg/serverless/random"
"github.com/DataDog/datadog-agent/pkg/serverless/registration"
"github.com/DataDog/datadog-agent/pkg/serverless/trace"
"github.com/DataDog/datadog-agent/pkg/serverless/trace/inferredspan"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/DataDog/datadog-agent/pkg/util/flavor"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand Down Expand Up @@ -211,11 +213,14 @@ func runAgent(stopCh chan struct{}) (serverlessDaemon *daemon.Daemon, err error)
}
config.LoadProxyFromEnv(config.Datadog)
logChannel := make(chan *logConfig.ChannelMessage)

// Channels for ColdStartCreator
lambdaSpanChan := make(chan *pb.Span)
initDurationChan := make(chan float64)
coldStartSpanId := random.Random.Uint64()
metricAgent := &metrics.ServerlessMetricAgent{}
metricAgent.Start(daemon.FlushTimeout, &metrics.MetricConfig{}, &metrics.MetricDogStatsD{})
serverlessDaemon.SetStatsdServer(metricAgent)
serverlessDaemon.SetupLogCollectionHandler(logsAPICollectionRoute, logChannel, config.Datadog.GetBool("serverless.logs_enabled"), config.Datadog.GetBool("enhanced_metrics"))
serverlessDaemon.SetupLogCollectionHandler(logsAPICollectionRoute, logChannel, config.Datadog.GetBool("serverless.logs_enabled"), config.Datadog.GetBool("enhanced_metrics"), initDurationChan)

// Concurrently start heavyweight features
var wg sync.WaitGroup
Expand All @@ -225,7 +230,7 @@ func runAgent(stopCh chan struct{}) (serverlessDaemon *daemon.Daemon, err error)
go func() {
defer wg.Done()
traceAgent := &trace.ServerlessTraceAgent{}
traceAgent.Start(config.Datadog.GetBool("apm_config.enabled"), &trace.LoadConfig{Path: datadogConfigPath}, serverlessDaemon.ExecutionContext)
traceAgent.Start(config.Datadog.GetBool("apm_config.enabled"), &trace.LoadConfig{Path: datadogConfigPath}, lambdaSpanChan, coldStartSpanId)
serverlessDaemon.SetTraceAgent(traceAgent)
}()

Expand Down Expand Up @@ -269,6 +274,19 @@ func runAgent(stopCh chan struct{}) (serverlessDaemon *daemon.Daemon, err error)

wg.Wait()

coldStartSpanCreator := &trace.ColdStartSpanCreator{
LambdaSpanChan: lambdaSpanChan,
InitDurationChan: initDurationChan,
TraceAgent: serverlessDaemon.TraceAgent,
StopChan: make(chan struct{}),
ColdStartSpanId: coldStartSpanId,
}

log.Debug("Starting ColdStartSpanCreator")
coldStartSpanCreator.Run()
log.Debug("Setting ColdStartSpanCreator on Daemon")
serverlessDaemon.SetColdStartSpanCreator(coldStartSpanCreator)

// set up invocation processor in the serverless Daemon to be used for the proxy and/or lifecycle API
serverlessDaemon.InvocationProcessor = &invocationlifecycle.LifecycleProcessor{
ExtraTags: serverlessDaemon.ExtraTags,
Expand Down
14 changes: 12 additions & 2 deletions pkg/serverless/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Daemon struct {

TraceAgent *trace.ServerlessTraceAgent

ColdStartCreator *trace.ColdStartSpanCreator

// lastInvocations stores last invocation times to be able to compute the
// interval of invocation of the function.
lastInvocations []time.Time
Expand Down Expand Up @@ -164,10 +166,10 @@ func (d *Daemon) GetFlushStrategy() string {
}

// SetupLogCollectionHandler configures the log collection route handler
func (d *Daemon) SetupLogCollectionHandler(route string, logsChan chan *logConfig.ChannelMessage, logsEnabled bool, enhancedMetricsEnabled bool) {
func (d *Daemon) SetupLogCollectionHandler(route string, logsChan chan *logConfig.ChannelMessage, logsEnabled bool, enhancedMetricsEnabled bool, initDurationChan chan<- float64) {

d.logCollector = serverlessLog.NewLambdaLogCollector(logsChan,
d.MetricAgent.Demux, d.ExtraTags, logsEnabled, enhancedMetricsEnabled, d.ExecutionContext, d.HandleRuntimeDone)
d.MetricAgent.Demux, d.ExtraTags, logsEnabled, enhancedMetricsEnabled, d.ExecutionContext, d.HandleRuntimeDone, initDurationChan)
server := serverlessLog.NewLambdaLogsAPIServer(d.logCollector.In)

d.mux.Handle(route, &server)
Expand All @@ -184,6 +186,10 @@ func (d *Daemon) SetTraceAgent(traceAgent *trace.ServerlessTraceAgent) {
d.TraceAgent = traceAgent
}

func (d *Daemon) SetColdStartSpanCreator(creator *trace.ColdStartSpanCreator) {
d.ColdStartCreator = creator
}

// SetFlushStrategy sets the flush strategy to use.
func (d *Daemon) SetFlushStrategy(strategy flush.Strategy) {
log.Debugf("Set flush strategy: %s (was: %s)", strategy.String(), d.GetFlushStrategy())
Expand Down Expand Up @@ -305,6 +311,10 @@ func (d *Daemon) Stop() {
if d.MetricAgent != nil {
d.MetricAgent.Stop()
}

if d.ColdStartCreator != nil {
d.ColdStartCreator.Stop()
}
logs.Stop()
log.Debug("Serverless agent shutdown complete")
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/serverless/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/serverless/random"
"github.com/DataDog/datadog-agent/pkg/serverless/trace"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
"github.com/DataDog/datadog-agent/pkg/trace/testutil"
)

Expand Down Expand Up @@ -163,7 +165,7 @@ func TestSetTraceTagOk(t *testing.T) {
}
var agent = &trace.ServerlessTraceAgent{}
t.Setenv("DD_API_KEY", "x")
agent.Start(true, &trace.LoadConfig{Path: "/does-not-exist.yml"}, nil)
agent.Start(true, &trace.LoadConfig{Path: "/does-not-exist.yml"}, make(chan *pb.Span), random.Random.Uint64())
defer agent.Stop()
d := Daemon{
TraceAgent: agent,
Expand Down
1 change: 0 additions & 1 deletion pkg/serverless/daemon/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (e *EndInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) {
IsError: r.Header.Get(invocationlifecycle.InvocationErrorHeader) == "true",
RequestID: ecs.LastRequestID,
ResponseRawPayload: responseBody,
ColdStartDuration: ecs.ColdstartDuration,
}
executionContext := e.daemon.InvocationProcessor.GetExecutionInfo()
if executionContext.TraceID == 0 {
Expand Down
9 changes: 0 additions & 9 deletions pkg/serverless/executioncontext/executioncontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type ExecutionContext struct {
coldstartRequestID string
lastLogRequestID string
coldstart bool
coldstartDuration float64
startTime time.Time
endTime time.Time
}
Expand All @@ -35,7 +34,6 @@ type State struct {
ColdstartRequestID string
LastLogRequestID string
Coldstart bool
ColdstartDuration float64
StartTime time.Time
EndTime time.Time
}
Expand All @@ -50,18 +48,11 @@ func (ec *ExecutionContext) GetCurrentState() State {
ColdstartRequestID: ec.coldstartRequestID,
LastLogRequestID: ec.lastLogRequestID,
Coldstart: ec.coldstart,
ColdstartDuration: ec.coldstartDuration,
StartTime: ec.startTime,
EndTime: ec.endTime,
}
}

func (ec *ExecutionContext) SetColdStartDuration(duration float64) {
ec.m.Lock()
defer ec.m.Unlock()
ec.coldstartDuration = duration
}

// SetFromInvocation sets the execution context based on an invocation
func (ec *ExecutionContext) SetFromInvocation(arn string, requestID string) {
ec.m.Lock()
Expand Down
1 change: 0 additions & 1 deletion pkg/serverless/invocationlifecycle/invocation_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,4 @@ type InvocationEndDetails struct {
IsError bool
RequestID string
ResponseRawPayload []byte
ColdStartDuration float64
}
6 changes: 3 additions & 3 deletions pkg/serverless/invocationlifecycle/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func TestTriggerTypesLifecycleEventForKinesis(t *testing.T) {
RequestID: "test-request-id",
})
assert.Equal(t, map[string]string{
"function_trigger.event_source_arn": "arn:aws:kinesis:sa-east-1:601427279990:stream/kinesisStream",
"function_trigger.event_source_arn": "arn:aws:kinesis:sa-east-1:425362996713:stream/kinesisStream",
"request_id": "test-request-id",
"function_trigger.event_source": "kinesis",
}, testProcessor.GetTags())
Expand Down Expand Up @@ -779,7 +779,7 @@ func TestTriggerTypesLifecycleEventForSNS(t *testing.T) {
RequestID: "test-request-id",
})
assert.Equal(t, map[string]string{
"function_trigger.event_source_arn": "arn:aws:sns:sa-east-1:601427279990:serverlessTracingTopicPy",
"function_trigger.event_source_arn": "arn:aws:sns:sa-east-1:425362996713:serverlessTracingTopicPy",
"request_id": "test-request-id",
"function_trigger.event_source": "sns",
}, testProcessor.GetTags())
Expand All @@ -801,7 +801,7 @@ func TestTriggerTypesLifecycleEventForSQS(t *testing.T) {
RequestID: "test-request-id",
})
assert.Equal(t, map[string]string{
"function_trigger.event_source_arn": "arn:aws:sqs:sa-east-1:601427279990:InferredSpansQueueNode",
"function_trigger.event_source_arn": "arn:aws:sqs:sa-east-1:425362996713:InferredSpansQueueNode",
"request_id": "test-request-id",
"function_trigger.event_source": "sqs",
}, testProcessor.GetTags())
Expand Down
6 changes: 4 additions & 2 deletions pkg/serverless/logs/logs_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ type LambdaLogsCollector struct {
invocationEndTime time.Time
process_once *sync.Once
executionContext *executioncontext.ExecutionContext
initDurationChan chan<- float64

arn string

// handleRuntimeDone is the function to be called when a platform.runtimeDone log message is received
handleRuntimeDone func()
}

func NewLambdaLogCollector(out chan<- *logConfig.ChannelMessage, demux aggregator.Demultiplexer, extraTags *Tags, logsEnabled bool, enhancedMetricsEnabled bool, executionContext *executioncontext.ExecutionContext, handleRuntimeDone func()) *LambdaLogsCollector {
func NewLambdaLogCollector(out chan<- *logConfig.ChannelMessage, demux aggregator.Demultiplexer, extraTags *Tags, logsEnabled bool, enhancedMetricsEnabled bool, executionContext *executioncontext.ExecutionContext, handleRuntimeDone func(), initDurationChan chan<- float64) *LambdaLogsCollector {

return &LambdaLogsCollector{
In: make(chan []LambdaLogAPIMessage, maxBufferedLogs), // Buffered, so we can hold start-up logs before first invocation without blocking
Expand All @@ -65,6 +66,7 @@ func NewLambdaLogCollector(out chan<- *logConfig.ChannelMessage, demux aggregato
executionContext: executionContext,
handleRuntimeDone: handleRuntimeDone,
process_once: &sync.Once{},
initDurationChan: initDurationChan,
}
}

Expand Down Expand Up @@ -170,7 +172,7 @@ func (lc *LambdaLogsCollector) processMessage(
return
}
if message.logType == logTypePlatformInitReport {
lc.executionContext.SetColdStartDuration(message.objectRecord.reportLogItem.initDurationTelemetry)
lc.initDurationChan <- message.objectRecord.reportLogItem.initDurationTelemetry
}

if message.logType == logTypePlatformStart {
Expand Down
16 changes: 8 additions & 8 deletions pkg/serverless/logs/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestProcessMessageValid(t *testing.T) {
tags := Tags{
Tags: metricTags,
}
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, func() {})
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, func() {}, make(chan<- float64))

lc.processMessage(&message)

Expand Down Expand Up @@ -255,7 +255,7 @@ func TestProcessMessageStartValid(t *testing.T) {
tags := Tags{
Tags: metricTags,
}
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, mockRuntimeDone)
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, mockRuntimeDone, make(chan<- float64))
lc.lastRequestID = lastRequestID
lc.processMessage(message)
assert.Equal(t, runtimeDoneCallbackWasCalled, false)
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestProcessMessagePlatformRuntimeDoneValid(t *testing.T) {
mockRuntimeDone := func() {
runtimeDoneCallbackWasCalled = true
}
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, mockRuntimeDone)
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, mockRuntimeDone, make(chan<- float64))
lc.lastRequestID = lastRequestID
lc.processMessage(&message)
ecs := mockExecutionContext.GetCurrentState()
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestProcessMessagePlatformRuntimeDonePreviousInvocation(t *testing.T) {
mockRuntimeDone := func() {
runtimeDoneCallbackWasCalled = true
}
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, mockRuntimeDone)
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, mockRuntimeDone, make(chan<- float64))

lc.processMessage(message)
// Runtime done callback should NOT be called if the log message was for a previous invocation
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestProcessMessageShouldNotProcessArnNotSet(t *testing.T) {
mockExecutionContext := &executioncontext.ExecutionContext{}

computeEnhancedMetrics := true
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, func() {})
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, func() {}, make(chan<- float64))

go lc.processMessage(message)

Expand Down Expand Up @@ -383,7 +383,7 @@ func TestProcessMessageShouldNotProcessLogsDropped(t *testing.T) {

mockExecutionContext := &executioncontext.ExecutionContext{}
mockExecutionContext.SetFromInvocation(arn, lastRequestID)
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, func() {})
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, func() {}, make(chan<- float64))

go lc.processMessage(message)

Expand Down Expand Up @@ -412,7 +412,7 @@ func TestProcessMessageShouldProcessLogTypeFunction(t *testing.T) {
mockExecutionContext := &executioncontext.ExecutionContext{}
mockExecutionContext.SetFromInvocation(arn, lastRequestID)

lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, func() {})
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, func() {}, make(chan<- float64))

go lc.processMessage(message)

Expand Down Expand Up @@ -860,7 +860,7 @@ func TestRuntimeMetricsMatchLogs(t *testing.T) {
},
},
}
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, func() {})
lc := NewLambdaLogCollector(make(chan<- *config.ChannelMessage), demux, &tags, true, computeEnhancedMetrics, mockExecutionContext, func() {}, make(chan<- float64))
lc.invocationStartTime = startTime

lc.processMessage(doneMessage)
Expand Down
6 changes: 3 additions & 3 deletions pkg/serverless/serverless_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,14 @@ func TestComputeTimeout(t *testing.T) {
}

func TestRemoveQualifierFromArnWithAlias(t *testing.T) {
invokedFunctionArn := "arn:aws:lambda:eu-south-1:601427279990:function:inferred-spans-function-urls-dev-harv-function-urls:$latest"
invokedFunctionArn := "arn:aws:lambda:eu-south-1:425362996713:function:inferred-spans-function-urls-dev-harv-function-urls:$latest"
functionArn := removeQualifierFromArn(invokedFunctionArn)
expectedArn := "arn:aws:lambda:eu-south-1:601427279990:function:inferred-spans-function-urls-dev-harv-function-urls"
expectedArn := "arn:aws:lambda:eu-south-1:425362996713:function:inferred-spans-function-urls-dev-harv-function-urls"
assert.Equal(t, functionArn, expectedArn)
}

func TestRemoveQualifierFromArnWithoutAlias(t *testing.T) {
invokedFunctionArn := "arn:aws:lambda:eu-south-1:601427279990:function:inferred-spans-function-urls-dev-harv-function-urls"
invokedFunctionArn := "arn:aws:lambda:eu-south-1:425362996713:function:inferred-spans-function-urls-dev-harv-function-urls"
functionArn := removeQualifierFromArn(invokedFunctionArn)
assert.Equal(t, functionArn, invokedFunctionArn)
}
Loading

0 comments on commit 96f6442

Please sign in to comment.