From ac698d769f421378ad2b4bd45c21675d7787cda6 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Tue, 10 Aug 2021 11:43:50 +0200 Subject: [PATCH 1/5] send traceparent header to ES --- module/apmelasticsearch/client.go | 6 ++++++ module/apmelasticsearch/client_test.go | 23 +++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/module/apmelasticsearch/client.go b/module/apmelasticsearch/client.go index e5f49876c..d0e394d9f 100644 --- a/module/apmelasticsearch/client.go +++ b/module/apmelasticsearch/client.go @@ -74,6 +74,12 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { return r.r.RoundTrip(req) } + headerValue := apmhttp.FormatTraceparentHeader(tx.TraceContext()) + req.Header.Set(apmhttp.W3CTraceparentHeader, headerValue) + if tx.ShouldPropagateLegacyHeader() { + req.Header.Set(apmhttp.ElasticTraceparentHeader, headerValue) + } + statement, req := captureSearchStatement(req) username, _, _ := req.BasicAuth() ctx = apm.ContextWithSpan(ctx, span) diff --git a/module/apmelasticsearch/client_test.go b/module/apmelasticsearch/client_test.go index 593e09d36..232fc794c 100644 --- a/module/apmelasticsearch/client_test.go +++ b/module/apmelasticsearch/client_test.go @@ -37,6 +37,7 @@ import ( "go.elastic.co/apm/apmtest" "go.elastic.co/apm/model" "go.elastic.co/apm/module/apmelasticsearch" + "go.elastic.co/apm/module/apmhttp" ) func TestWrapRoundTripper(t *testing.T) { @@ -303,6 +304,28 @@ func TestDestination(t *testing.T) { test("http://[2001:db8::1]:80/_search", "2001:db8::1", 80) } +func TestTraceparentHeader(t *testing.T) { + headers := make(map[string]string) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + for k, vs := range req.Header { + headers[k] = strings.Join(vs, " ") + } + })) + defer server.Close() + client := &http.Client{Transport: apmelasticsearch.WrapRoundTripper(http.DefaultTransport)} + + req, err := http.NewRequest("GET", server.URL, nil) + require.NoError(t, err) + + _, _, _ = apmtest.WithTransaction(func(ctx context.Context) { + _, err := client.Do(req.WithContext(ctx)) + assert.NoError(t, err) + }) + + assert.Contains(t, headers, apmhttp.ElasticTraceparentHeader) + assert.Contains(t, headers, apmhttp.W3CTraceparentHeader) +} + type readCloser struct { io.Reader closed bool From 1a83369c78c281aac6e8e0dcaa1b2eef66752330 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 11 Aug 2021 09:56:04 +0200 Subject: [PATCH 2/5] propagate tracestate header --- module/apmelasticsearch/client.go | 6 +++++- module/apmelasticsearch/client_test.go | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/module/apmelasticsearch/client.go b/module/apmelasticsearch/client.go index d0e394d9f..c91612b75 100644 --- a/module/apmelasticsearch/client.go +++ b/module/apmelasticsearch/client.go @@ -74,11 +74,15 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { return r.r.RoundTrip(req) } - headerValue := apmhttp.FormatTraceparentHeader(tx.TraceContext()) + traceContext := span.TraceContext() + headerValue := apmhttp.FormatTraceparentHeader(traceContext) req.Header.Set(apmhttp.W3CTraceparentHeader, headerValue) if tx.ShouldPropagateLegacyHeader() { req.Header.Set(apmhttp.ElasticTraceparentHeader, headerValue) } + if tracestate := traceContext.State.String(); tracestate != "" { + req.Header.Set(apmhttp.TracestateHeader, tracestate) + } statement, req := captureSearchStatement(req) username, _, _ := req.BasicAuth() diff --git a/module/apmelasticsearch/client_test.go b/module/apmelasticsearch/client_test.go index 232fc794c..169dc02d6 100644 --- a/module/apmelasticsearch/client_test.go +++ b/module/apmelasticsearch/client_test.go @@ -304,7 +304,7 @@ func TestDestination(t *testing.T) { test("http://[2001:db8::1]:80/_search", "2001:db8::1", 80) } -func TestTraceparentHeader(t *testing.T) { +func TestTraceHeaders(t *testing.T) { headers := make(map[string]string) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { for k, vs := range req.Header { @@ -324,6 +324,7 @@ func TestTraceparentHeader(t *testing.T) { assert.Contains(t, headers, apmhttp.ElasticTraceparentHeader) assert.Contains(t, headers, apmhttp.W3CTraceparentHeader) + assert.Contains(t, headers, apmhttp.TracestateHeader) } type readCloser struct { From 39743a32c11ce74125f3bac61442870f8f257972 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 11 Aug 2021 11:35:12 +0200 Subject: [PATCH 3/5] propagate headers for non-sampled tx/dropped span Propagate the parent transaction's traceparent and tracestate headers when there's a non-sampled transaction in the context, or if the span is dropped --- module/apmelasticsearch/client.go | 19 +++--- module/apmelasticsearch/client_test.go | 89 ++++++++++++++++++++++++++ module/apmhttp/client.go | 7 +- 3 files changed, 101 insertions(+), 14 deletions(-) diff --git a/module/apmelasticsearch/client.go b/module/apmelasticsearch/client.go index c91612b75..68fe725c1 100644 --- a/module/apmelasticsearch/client.go +++ b/module/apmelasticsearch/client.go @@ -63,27 +63,23 @@ type roundTripper struct { func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { ctx := req.Context() tx := apm.TransactionFromContext(ctx) - if tx == nil || !tx.Sampled() { + traceContext := tx.TraceContext() + propagateLegacyHeader := tx.ShouldPropagateLegacyHeader() + if !tx.Sampled() { + apmhttp.SetHeaders(req, traceContext, propagateLegacyHeader) return r.r.RoundTrip(req) } name := requestName(req) span := tx.StartSpan(name, "db.elasticsearch", apm.SpanFromContext(ctx)) + if span.Dropped() { span.End() + apmhttp.SetHeaders(req, traceContext, propagateLegacyHeader) return r.r.RoundTrip(req) } - traceContext := span.TraceContext() - headerValue := apmhttp.FormatTraceparentHeader(traceContext) - req.Header.Set(apmhttp.W3CTraceparentHeader, headerValue) - if tx.ShouldPropagateLegacyHeader() { - req.Header.Set(apmhttp.ElasticTraceparentHeader, headerValue) - } - if tracestate := traceContext.State.String(); tracestate != "" { - req.Header.Set(apmhttp.TracestateHeader, tracestate) - } - + traceContext = span.TraceContext() statement, req := captureSearchStatement(req) username, _, _ := req.BasicAuth() ctx = apm.ContextWithSpan(ctx, span) @@ -99,6 +95,7 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { User: username, }) + apmhttp.SetHeaders(req, traceContext, propagateLegacyHeader) resp, err := r.r.RoundTrip(req) if err != nil { span.End() diff --git a/module/apmelasticsearch/client_test.go b/module/apmelasticsearch/client_test.go index 169dc02d6..d9d229211 100644 --- a/module/apmelasticsearch/client_test.go +++ b/module/apmelasticsearch/client_test.go @@ -34,10 +34,12 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/net/context/ctxhttp" + "go.elastic.co/apm" "go.elastic.co/apm/apmtest" "go.elastic.co/apm/model" "go.elastic.co/apm/module/apmelasticsearch" "go.elastic.co/apm/module/apmhttp" + "go.elastic.co/apm/transport/transporttest" ) func TestWrapRoundTripper(t *testing.T) { @@ -327,6 +329,93 @@ func TestTraceHeaders(t *testing.T) { assert.Contains(t, headers, apmhttp.TracestateHeader) } +func TestClientSpanDropped(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte(req.Header.Get("Elastic-Apm-Traceparent"))) + })) + defer server.Close() + + tracer, transport := transporttest.NewRecorderTracer() + defer tracer.Close() + + tracer.SetMaxSpans(1) + tx := tracer.StartTransaction("name", "type") + ctx := apm.ContextWithTransaction(context.Background(), tx) + + var responseBodies []string + for i := 0; i < 2; i++ { + body, err := doGET(ctx, server.URL) + require.NoError(t, err) + responseBodies = append(responseBodies, body) + } + + tx.End() + tracer.Flush(nil) + payloads := transport.Payloads() + require.Len(t, payloads.Spans, 1) + transaction := payloads.Transactions[0] + span := payloads.Spans[0] // for first request + + clientTraceContext, err := apmhttp.ParseTraceparentHeader(string(responseBodies[0])) + require.NoError(t, err) + assert.Equal(t, span.TraceID, model.TraceID(clientTraceContext.Trace)) + assert.Equal(t, span.ID, model.SpanID(clientTraceContext.Span)) + + clientTraceContext, err = apmhttp.ParseTraceparentHeader(string(responseBodies[1])) + require.NoError(t, err) + assert.Equal(t, transaction.TraceID, model.TraceID(clientTraceContext.Trace)) + assert.Equal(t, transaction.ID, model.SpanID(clientTraceContext.Span)) +} + +func TestClientTransactionUnsampled(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte(req.Header.Get("Elastic-Apm-Traceparent"))) + })) + defer server.Close() + + tracer, transport := transporttest.NewRecorderTracer() + defer tracer.Close() + tracer.SetSampler(apm.NewRatioSampler(0)) // sample nothing + + tx := tracer.StartTransaction("name", "type") + ctx := apm.ContextWithTransaction(context.Background(), tx) + body, err := doGET(ctx, server.URL) + require.NoError(t, err) + + tx.End() + tracer.Flush(nil) + + payloads := transport.Payloads() + require.Len(t, payloads.Transactions, 1) + require.Len(t, payloads.Spans, 0) + transaction := payloads.Transactions[0] + + clientTraceContext, err := apmhttp.ParseTraceparentHeader(string(body)) + require.NoError(t, err) + assert.Equal(t, transaction.TraceID, model.TraceID(clientTraceContext.Trace)) + assert.Equal(t, transaction.ID, model.SpanID(clientTraceContext.Span)) +} + +func doGET(ctx context.Context, url string) (string, error) { + client := &http.Client{Transport: apmelasticsearch.WrapRoundTripper(http.DefaultTransport)} + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return "", err + } + resp, err := client.Do(req.WithContext(ctx)) + if err != nil { + return "", err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + defer resp.Body.Close() + + return string(body), nil +} + type readCloser struct { io.Reader closed bool diff --git a/module/apmhttp/client.go b/module/apmhttp/client.go index e29846502..f48ca4877 100644 --- a/module/apmhttp/client.go +++ b/module/apmhttp/client.go @@ -97,7 +97,7 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { propagateLegacyHeader := tx.ShouldPropagateLegacyHeader() traceContext := tx.TraceContext() if !traceContext.Options.Recorded() { - r.setHeaders(req, traceContext, propagateLegacyHeader) + SetHeaders(req, traceContext, propagateLegacyHeader) return r.r.RoundTrip(req) } @@ -117,7 +117,7 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { span = nil } - r.setHeaders(req, traceContext, propagateLegacyHeader) + SetHeaders(req, traceContext, propagateLegacyHeader) resp, err := r.r.RoundTrip(req) if span != nil { if err != nil { @@ -133,7 +133,8 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { return resp, err } -func (r *roundTripper) setHeaders(req *http.Request, traceContext apm.TraceContext, propagateLegacyHeader bool) { +// SetHeaders sets traceparent and tracestate headers on an http request. +func SetHeaders(req *http.Request, traceContext apm.TraceContext, propagateLegacyHeader bool) { headerValue := FormatTraceparentHeader(traceContext) if propagateLegacyHeader { req.Header.Set(ElasticTraceparentHeader, headerValue) From 692aa538436fef92e379d900303fd12b410fbc7c Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 11 Aug 2021 13:13:00 +0200 Subject: [PATCH 4/5] fix nil pointer error --- module/apmelasticsearch/client.go | 6 +++--- module/apmelasticsearch/client_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/module/apmelasticsearch/client.go b/module/apmelasticsearch/client.go index 68fe725c1..345b42836 100644 --- a/module/apmelasticsearch/client.go +++ b/module/apmelasticsearch/client.go @@ -64,12 +64,12 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { ctx := req.Context() tx := apm.TransactionFromContext(ctx) traceContext := tx.TraceContext() - propagateLegacyHeader := tx.ShouldPropagateLegacyHeader() - if !tx.Sampled() { - apmhttp.SetHeaders(req, traceContext, propagateLegacyHeader) + if tx == nil || !tx.Sampled() { + apmhttp.SetHeaders(req, traceContext, false) return r.r.RoundTrip(req) } + propagateLegacyHeader := tx.ShouldPropagateLegacyHeader() name := requestName(req) span := tx.StartSpan(name, "db.elasticsearch", apm.SpanFromContext(ctx)) diff --git a/module/apmelasticsearch/client_test.go b/module/apmelasticsearch/client_test.go index d9d229211..aa15dcf4d 100644 --- a/module/apmelasticsearch/client_test.go +++ b/module/apmelasticsearch/client_test.go @@ -331,7 +331,7 @@ func TestTraceHeaders(t *testing.T) { func TestClientSpanDropped(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - w.Write([]byte(req.Header.Get("Elastic-Apm-Traceparent"))) + w.Write([]byte(req.Header.Get("Traceparent"))) })) defer server.Close() @@ -369,7 +369,7 @@ func TestClientSpanDropped(t *testing.T) { func TestClientTransactionUnsampled(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - w.Write([]byte(req.Header.Get("Elastic-Apm-Traceparent"))) + w.Write([]byte(req.Header.Get("Traceparent"))) })) defer server.Close() From 7ae9b032dd55a6489320b695a86fc25e9551b089 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 11 Aug 2021 16:15:00 +0200 Subject: [PATCH 5/5] do not set traceheaders when transaction is nil --- module/apmelasticsearch/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/module/apmelasticsearch/client.go b/module/apmelasticsearch/client.go index 345b42836..3c9a24f59 100644 --- a/module/apmelasticsearch/client.go +++ b/module/apmelasticsearch/client.go @@ -63,8 +63,11 @@ type roundTripper struct { func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { ctx := req.Context() tx := apm.TransactionFromContext(ctx) + if tx == nil { + return r.r.RoundTrip(req) + } traceContext := tx.TraceContext() - if tx == nil || !tx.Sampled() { + if !tx.Sampled() { apmhttp.SetHeaders(req, traceContext, false) return r.r.RoundTrip(req) }