Skip to content

Commit

Permalink
fix: appsec-node serverless integration test failure (#20798)
Browse files Browse the repository at this point in the history
Somehow this is not registering a trace anymore. For some reason I fail to understand, when instrumentation telemetry is enabled (which is te default behavior when `DD_SERVERLESS_APPSEC_ENABLED` is `true`-ish), the `appsec-node` function no longer produces a trace in a way that is captured by the `recorder-extension`. I have however been able to validate that the trace is actually still created (in a run with `DD_LOG_LEVEL=debug`).

In order to fix this, explicitly disabled instrumentation telemetry so the test continues to work correctly. The actual interaction needs to be investigated later/separately, as it is not well understood today why either the Node tracer library or the recorder extension behaves differently in this particular scenario.

---

Additionally, added extra logging into the recorder extension (as I was attempting to understand the issue), and improved the log normalization to also neutralize the AWS account ID.

---

Finally, using feature environments to reduce the amount of deployed functions (in the serverless stacks) depending on the test suite being run.
  • Loading branch information
RomainMuller authored Nov 23, 2023
1 parent d1453b1 commit f432302
Show file tree
Hide file tree
Showing 30 changed files with 1,289 additions and 1,211 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/serverless-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
timeout_minutes: 45
max_attempts: 2
command: |
RAWLOGS_DIR="${{ steps.rawlogs.outputs.dir }}/${{ matrix.architecture }}}"
RAWLOGS_DIR="${{ steps.rawlogs.outputs.dir }}/${{ matrix.architecture }}"
cd go/src/github.com/DataDog/datadog-agent
ARCHITECTURE=${{ matrix.architecture }} RAWLOGS_DIR=$RAWLOGS_DIR \
./test/integration/serverless/run.sh ${{ matrix.suite }}
Expand Down
3 changes: 2 additions & 1 deletion test/integration/serverless/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ recorder-extension/extensions
recorder-extension/ext.zip
src/csharp-tests/bin
src/csharp-tests/obj
src/java-tests/*/target
src/java-tests/*/target
node_modules/
24 changes: 14 additions & 10 deletions test/integration/serverless/log_normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Union


def normalize_metrics(stage):
def normalize_metrics(stage, aws_account_id):
def clear_dogsketches(log):
log["dogsketches"] = []

Expand All @@ -28,14 +28,15 @@ def metric_sort_key(log):
replace(r'(serverless.lambda-extension.integration-test.count)[0-9\.]+', r'\1'),
replace(r'(architecture:)(x86_64|arm64)', r'\1XXX'),
replace(stage, 'XXXXXX'),
replace(aws_account_id, '############'),
exclude(r'[ ]$'),
foreach(clear_dogsketches),
foreach(sort_tags),
sort_by(metric_sort_key),
]


def normalize_logs(stage):
def normalize_logs(stage, aws_account_id):
rmvs = (
"DATADOG TRACER CONFIGURATION",
# TODO: these messages may be an indication of a real problem and
Expand Down Expand Up @@ -67,14 +68,15 @@ def log_sort_key(log):
replace(r'([a-zA-Z0-9]{8}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12})', r'XXX'),
replace(r'"REPORT RequestId:.*?"', '"REPORT"'),
replace(stage, 'XXXXXX'),
replace(aws_account_id, '############'),
replace(r'(architecture:)(x86_64|arm64)', r'\1XXX'),
rm_item(rm_extra_items_key),
foreach(sort_tags),
sort_by(log_sort_key),
]


def normalize_traces(stage):
def normalize_traces(stage, aws_account_id):
def trace_sort_key(log):
name = log['chunks'][0]['spans'][0]['name']
cold_start = log['chunks'][0]['spans'][0]['meta'].get('cold_start')
Expand Down Expand Up @@ -113,6 +115,7 @@ def sort__dd_tags_container(log):
replace(r'("faas.instance":")[a-zA-Z0-9-/]+\[\$LATEST\][a-zA-Z0-9]+"', r'\1null"'),
replace(r'("_dd.tracer_hostname":)"\d{1,3}(?:.\d{1,3}){3}"+', r'\1"<redacted>"'),
replace(stage, 'XXXXXX'),
replace(aws_account_id, '############'),
exclude(r'[ ]$'),
foreach(sort__dd_tags_container),
sort_by(trace_sort_key),
Expand Down Expand Up @@ -253,19 +256,19 @@ def _rm_item(log):
###################


def normalize(log, typ, stage):
for normalizer in get_normalizers(typ, stage):
def normalize(log, typ, stage, aws_account_id):
for normalizer in get_normalizers(typ, stage, aws_account_id):
log = normalizer(log)
return format_json(log)


def get_normalizers(typ, stage):
def get_normalizers(typ, stage, aws_account_id):
if typ == 'metrics':
return normalize_metrics(stage)
return normalize_metrics(stage, aws_account_id)
elif typ == 'logs':
return normalize_logs(stage)
return normalize_logs(stage, aws_account_id)
elif typ == 'traces':
return normalize_traces(stage)
return normalize_traces(stage, aws_account_id)
elif typ == 'appsec':
return normalize_appsec(stage)
else:
Expand All @@ -281,6 +284,7 @@ def format_json(log):

def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--accountid', required=True)
parser.add_argument('--type', required=True)
parser.add_argument('--logs', required=True)
parser.add_argument('--stage', required=True)
Expand All @@ -295,7 +299,7 @@ def parse_args():
with open(args.logs[5:], 'r') as f:
args.logs = f.read()

print(normalize(args.logs, args.type, args.stage))
print(normalize(args.logs, args.type, args.stage, args.accountid))
except Exception as e:
err: dict[str, Union[str, list[str]]] = {
"error": "normalization raised exception",
Expand Down
87 changes: 55 additions & 32 deletions test/integration/serverless/recorder-extension/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@ func processEvents(ctx context.Context) {
for {
select {
case <-ctx.Done():
log("processEvent context done")
return
default:
res, err := extensionClient.NextEvent(ctx)
if err != nil {
log("an error occurred: %v", err)
return
}
if res.EventType == Shutdown {
log("shutdown signal received")
time.Sleep(1900 * time.Millisecond)
return
}
Expand Down Expand Up @@ -172,7 +175,7 @@ func (e *Client) NextEvent(ctx context.Context) (*NextEventResponse, error) {
return nil, err
}
if httpRes.StatusCode != 200 {
return nil, fmt.Errorf("request failed with status %s", httpRes.Status)
return nil, fmt.Errorf("%s %s failed with status %s", httpReq.Method, httpReq.URL.String(), httpRes.Status)
}
defer httpRes.Body.Close()
body, err := io.ReadAll(httpRes.Body)
Expand All @@ -197,12 +200,12 @@ func startHTTPServer(port string) {
nbHitMetrics++
body, err := io.ReadAll(r.Body)
if err != nil {
fmt.Printf("Error while reading HTTP request body: %s \n", err)
log("error while reading HTTP request body: %v", err)
return
}
pl := new(gogen.SketchPayload)
if err := pl.Unmarshal(body); err != nil {
fmt.Printf("Error while unmarshalling sketches %s \n", err)
log("error while unmarshalling sketches: %v", err)
return
}

Expand All @@ -212,7 +215,7 @@ func startHTTPServer(port string) {
// two calls + shutdown
jsonSketch, err := json.Marshal(outputSketches)
if err != nil {
fmt.Printf("Error while JSON encoding the sketch")
log("error while JSON encoding the sketch: %v", err)
}
fmt.Printf("%s%s%s\n", "BEGINMETRIC", string(jsonSketch), "ENDMETRIC")
}
Expand All @@ -232,9 +235,14 @@ func startHTTPServer(port string) {
return
}

privateLogPrefix := fmt.Sprintf("[%s]", extensionName)
for _, log := range messages {
msg := log.Message.Message
if strings.Contains(msg, "BEGINMETRIC") || strings.Contains(msg, "BEGINLOG") || strings.Contains(msg, "BEGINTRACE") {
if strings.Contains(msg, "BEGINMETRIC") ||
strings.Contains(msg, "BEGINLOG") ||
strings.Contains(msg, "BEGINTRACE") ||
// "Private" entries produced by the "log" function are not reported back to the test suites.
strings.Contains(msg, privateLogPrefix) {
continue
}
if strings.HasPrefix(msg, "REPORT RequestId:") {
Expand All @@ -246,45 +254,55 @@ func startHTTPServer(port string) {
if nbReport == 2 && !hasBeenOutput {
jsonLogs, err := json.Marshal(outputLogs)
if err != nil {
fmt.Printf("Error while JSON encoding the logs")
log("error while JSON encoding the logs: %v", err)
}
fmt.Printf("%s%s%s\n", "BEGINLOG", string(jsonLogs), "ENDLOG")
hasBeenOutput = true // make sure not re re-output the logs
}

})

http.HandleFunc("/api/v0.2/traces", func(w http.ResponseWriter, r *http.Request) {
nbHitTraces++
body, err := io.ReadAll(r.Body)
if err != nil {
return
}
decompressedBody, err := decompress(body)
if err != nil {
return
}
pl := new(pb.AgentPayload)
if err := pl.Unmarshal(decompressedBody); err != nil {
fmt.Printf("Error while unmarshalling traces %s \n", err)
return
}
for _, version := range []string{"v0.2", "v0.4"} {
http.HandleFunc(fmt.Sprintf("/api/%s/traces", version), func(w http.ResponseWriter, r *http.Request) {
nbHitTraces++
body, err := io.ReadAll(r.Body)
if err != nil {
return
}
decompressedBody, err := decompress(body)
if err != nil {
return
}
pl := new(pb.AgentPayload)
if err := pl.Unmarshal(decompressedBody); err != nil {
log("error while unmarshalling traces: %s", err)
return
}

outputTraces = append(outputTraces, pl.TracerPayloads...)
outputTraces = append(outputTraces, pl.TracerPayloads...)

if nbHitTraces == 2 {
jsonLogs, err := json.Marshal(outputTraces)
if err != nil {
fmt.Printf("Error while JSON encoding the traces")
if nbHitTraces == 2 {
jsonLogs, err := json.Marshal(outputTraces)
if err != nil {
log("error while JSON encoding the traces: %v", err)
}
fmt.Printf("%s%s%s\n", "BEGINTRACE", string(jsonLogs), "ENDTRACE")
}
fmt.Printf("%s%s%s\n", "BEGINTRACE", string(jsonLogs), "ENDTRACE")
}
})
})
}

http.HandleFunc("/api/v1/series", func(w http.ResponseWriter, r *http.Request) {
})
for _, pattern := range []string{
"/api/v0.2/stats",
"/api/v1/check_run",
"/api/v1/series",
} {
// These endpoints are ignored by the recorder and silently return an empty success response.
http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { /* do nothing */ })
}

http.HandleFunc("/api/v1/check_run", func(w http.ResponseWriter, r *http.Request) {
// This is actually a wildcard handler....
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
log("unexpected request: %s %s", r.Method, r.URL.String())
})

err := http.ListenAndServe(":"+port, nil)
Expand All @@ -307,3 +325,8 @@ func decompress(payload []byte) ([]byte, error) {

return buffer.Bytes(), nil
}

// log records a message that is "private" to this extension and will not be reported back in the BEGINLOG...ENDLOG blocks.
func log(format string, args ...any) {
fmt.Printf("[%s] %s\n", extensionName, fmt.Sprintf(format, args...))
}
Loading

0 comments on commit f432302

Please sign in to comment.