Skip to content

Commit

Permalink
[Serverless] Merge serverless/main to main (#14980)
Browse files Browse the repository at this point in the history
* [Serverless] change account (#14755)

* Aj/buffer cold start span data (#14664)

* wip dirty commit - trace being created but not flushed properly. No further traces appearing

WIP: more debugging. StopChan properly set up

feat: Starting coldstart creator as a daemon, and recieving data from two channels. Todo: spec

feat: Update specs to write to channels

feat: Merge conflicts resolved for tests

feat: Use smaller methods to handle locking

fix: pass coldstartSpanId to sls-init main

feat: Remove default

feat: Use Millisecond as Second is far longer than necessary

feat: No need to export ColdStartSpanId

fix: update units

feat: Directionality for lambdaSpanChan as well as for initDurationChan

fix: No need for the nil check, I need to stop javascripting my go

feat: ints

* feat: rebase missing changes from merge commits

* feat: update ints after moving accounts

* Empty commit to trigger ci

* [Serverless] Fix flaky integration tests and make them more easily maintainable. (#14783)

* Retry serverless integration test failures automatically. (#14801)

* [Serverless] Allow some keys to be option in serverless integration tests. (#14827)

* Ability to remove items from the json.

* Remove items from snapshot.

* Do not expect spans when there is no spans object. (#14396)

* [Serverless] Improve stability of two tests. (#14895)

* Increase timeout while decreasing test time.

* Increase timeout in test.

* [Serverless] Consolidate log normalization to single file for integration tests. (#15004)

* Consolidate log normalization to single file.

* Save raw logs to a temp dir.

* Fix linting issues.

Co-authored-by: Maxime David <[email protected]>
Co-authored-by: AJ Stuyvenberg <[email protected]>
  • Loading branch information
3 people authored Jan 12, 2023
1 parent a014b91 commit 328e0ea
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 47 deletions.
3 changes: 2 additions & 1 deletion pkg/serverless/logs/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ func (l *LambdaLogAPIMessage) handlePlatformRuntimeDone(objectRecord map[string]
func (l *LambdaLogAPIMessage) handlePlatformRuntimeDoneSpans(objectRecord map[string]interface{}) {
spans, ok := objectRecord["spans"].([]interface{})
if !ok {
log.Error("LogMessage.UnmarshalJSON: can't read the spans object")
// no spans if the function errored and did not return a response
log.Debug("LogMessage.UnmarshalJSON: no spans object received")
return
}
for _, span := range spans {
Expand Down
7 changes: 6 additions & 1 deletion pkg/serverless/registration/telemetry_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,18 @@ func TestSubscribeLogsSuccess(t *testing.T) {

func TestSubscribeLogsTimeout(t *testing.T) {
payload := buildLogRegistrationPayload("myUri", "platform function", 10, 100, 1000)
done := make(chan struct{})
//fake the register route
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// timeout
time.Sleep(registerLogsTimeout + 100*time.Millisecond)
select {
case <-time.After(registerLogsTimeout + 5*time.Second):
case <-done:
}
w.WriteHeader(200)
}))
defer ts.Close()
defer close(done)

err := subscribeTelemetry("myId", ts.URL, registerLogsTimeout, payload)
assert.NotNil(t, err)
Expand Down
13 changes: 7 additions & 6 deletions pkg/serverless/serverless_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/serverless/daemon"
"github.com/DataDog/datadog-agent/pkg/serverless/flush"
"github.com/DataDog/datadog-agent/pkg/serverless/tags"
)

Expand All @@ -30,14 +31,14 @@ func TestHandleInvocationShouldSetExtraTags(t *testing.T) {
d := daemon.StartDaemon("http://localhost:8124")
defer d.Stop()

d.WaitForDaemon()
// force daemon not to wait for flush at end of handleInvocation
d.SetFlushStrategy(flush.NewPeriodically(time.Second))
d.UseAdaptiveFlush(false)

d.TellDaemonRuntimeStarted()
// deadline = current time + 5s
deadlineMs := (time.Now().UnixNano())/1000000 + 5000

//deadline = current time + 20 ms
deadlineMs := (time.Now().UnixNano())/1000000 + 20

//setting DD_TAGS and DD_EXTRA_TAGS
// setting DD_TAGS and DD_EXTRA_TAGS
t.Setenv("DD_TAGS", "a1:valueA1,a2:valueA2,A_MAJ:valueAMaj")
t.Setenv("DD_EXTRA_TAGS", "a3:valueA3 a4:valueA4")

Expand Down
67 changes: 49 additions & 18 deletions test/integration/serverless/log_normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@


def normalize_metrics(stage):
def clear_dogsketches(log):
log["dogsketches"] = []

def sort_tags(log):
log["tags"].sort()

def metric_sort_key(log):
return (log["metric"], "cold_start:true" in log["tags"])

return [
replace(r'raise Exception', r'\n'),
require(r'BEGINMETRIC.*ENDMETRIC'),
exclude(r'BEGINMETRIC'),
exclude(r'ENDMETRIC'),
replace(r'(ts":)[0-9]{10}', r'\1XXX'),
replace(r'(min":)[0-9\.e\-]{1,30}', r'\1XXX'),
replace(r'(max":)[0-9\.e\-]{1,30}', r'\1XXX'),
replace(r'(cnt":)[0-9\.e\-]{1,30}', r'\1XXX'),
replace(r'(avg":)[0-9\.e\-]{1,30}', r'\1XXX'),
replace(r'(sum":)[0-9\.e\-]{1,30}', r'\1XXX'),
replace(r'(k":\[)[0-9\.e\-]{1,30}', r'\1XXX'),
replace(r'(datadog-nodev)[0-9]+\.[0-9]+\.[0-9]+', r'\1X.X.X'),
replace(r'(datadog_lambda:v)[0-9]+\.[0-9]+\.[0-9]+', r'\1X.X.X'),
replace(r'dd_lambda_layer:datadog-go[0-9.]{1,}', r'dd_lambda_layer:datadog-gox.x.x'),
Expand All @@ -24,11 +26,32 @@ def normalize_metrics(stage):
replace(r'(architecture:)(x86_64|arm64)', r'\1XXX'),
replace(stage, 'XXXXXX'),
exclude(r'[ ]$'),
sort_by(lambda log: (log["metric"], "cold_start:true" in log["tags"])),
foreach(clear_dogsketches),
foreach(sort_tags),
sort_by(metric_sort_key),
]


def normalize_logs(stage):
rmvs = (
"DATADOG TRACER CONFIGURATION",
# TODO: these messages may be an indication of a real problem and
# should be investigated
"TIMESTAMP UTC | DD_EXTENSION | ERROR | could not forward the request context canceled",
"TIMESTAMP http: proxy error: context canceled",
)

def rm_extra_items_key(log):
return any(rmv in log["message"]["message"] for rmv in rmvs)

def sort_tags(log):
tags = log["ddtags"].split(',')
tags.sort()
log["ddtags"] = ','.join(tags)

def log_sort_key(log):
return log["message"]["message"]

return [
require(r'BEGINLOG.*ENDLOG'),
exclude(r'BEGINLOG'),
Expand All @@ -39,18 +62,12 @@ def normalize_logs(stage):
replace(r'\d{4}\/\d{2}\/\d{2}\s\d{2}:\d{2}:\d{2}', 'TIMESTAMP'),
replace(r'\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}', 'TIMESTAMP'),
replace(r'([a-zA-Z0-9]{8}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12})', r'XXX'),
replace(r'"REPORT RequestId:.*?"', '"REPORT"'),
replace(stage, 'XXXXXX'),
replace(r'(architecture:)(x86_64|arm64)', r'\1XXX'),
sort_by(lambda log: log["message"]["message"]),
# TODO: these messages may be an indication of a real problem and
# should be investigated
rm_item(
lambda log: log["message"]["message"]
in (
"TIMESTAMP UTC | DD_EXTENSION | ERROR | could not forward the request context canceled\n",
"TIMESTAMP http: proxy error: context canceled\n",
)
),
rm_item(rm_extra_items_key),
foreach(sort_tags),
sort_by(log_sort_key),
]


Expand Down Expand Up @@ -114,6 +131,20 @@ def _require(log):
return _require


def foreach(fn):
"""
Execute fn with each element of the list in order
"""

def _foreach(log):
logs = json.loads(log, strict=False)
for log_item in logs:
fn(log_item)
return json.dumps(logs)

return _foreach


def sort_by(key):
"""
Sort the json entries using the given key function, requires the log string
Expand Down
29 changes: 8 additions & 21 deletions test/integration/serverless/recorder-extension/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"net/http"
"os"
"os/signal"
"sort"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -207,17 +206,10 @@ func startHTTPServer(port string) {
return
}

for _, sketch := range pl.Sketches {
sort.Strings(sketch.Tags)
sketch.Dogsketches = make([]gogen.SketchPayload_Sketch_Dogsketch, 0)
outputSketches = append(outputSketches, sketch)
}
outputSketches = append(outputSketches, pl.Sketches...)

if nbHitMetrics == 3 {
// two calls + shutdown
sort.SliceStable(outputSketches, func(i, j int) bool {
return outputSketches[i].Metric < outputSketches[j].Metric
})
jsonSketch, err := json.Marshal(outputSketches)
if err != nil {
fmt.Printf("Error while JSON encoding the sketch")
Expand All @@ -241,19 +233,14 @@ func startHTTPServer(port string) {
}

for _, log := range messages {
if !strings.Contains(log.Message.Message, "BEGINLOG") && !strings.Contains(log.Message.Message, "BEGINTRACE") {
if strings.HasPrefix(log.Message.Message, "REPORT RequestId:") {
log.Message.Message = "REPORT" // avoid dealing with stripping out init duration, duration, memory used etc.
nbReport++
}
sortedTags := strings.Split(log.Tags, ",")
sort.Strings(sortedTags)
log.Tags = strings.Join(sortedTags, ",")
if !strings.Contains(log.Message.Message, "DATADOG TRACER CONFIGURATION") {
// skip dd-trace-go tracer configuration output
outputLogs = append(outputLogs, log)
}
msg := log.Message.Message
if strings.Contains(msg, "BEGINMETRIC") || strings.Contains(msg, "BEGINLOG") || strings.Contains(msg, "BEGINTRACE") {
continue
}
if strings.HasPrefix(msg, "REPORT RequestId:") {
nbReport++
}
outputLogs = append(outputLogs, log)
}

if nbReport == 2 && !hasBeenOutput {
Expand Down
5 changes: 5 additions & 0 deletions test/integration/serverless/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ sleep "$LOGS_WAIT_MINUTES"m

failed_functions=()

RAWLOGS_DIR=$(mktemp -d)
echo "Raw logs will be written to ${RAWLOGS_DIR}"

for function_name in "${all_functions[@]}"; do
echo "Fetching logs for ${function_name}..."
retry_counter=1
Expand All @@ -200,6 +203,8 @@ for function_name in "${all_functions[@]}"; do
done
printf "\e[A\e[K" # erase previous log line

echo $raw_logs > $RAWLOGS_DIR/$function_name

# Replace invocation-specific data like timestamps and IDs with XXX to normalize across executions
if [[ " ${metric_functions[*]} " =~ " ${function_name} " ]]; then
norm_type=metrics
Expand Down

0 comments on commit 328e0ea

Please sign in to comment.