Skip to content

Commit

Permalink
add working draft inferred proxy spans
Browse files Browse the repository at this point in the history
  • Loading branch information
jordan-wong committed Dec 19, 2024
1 parent 58e1e63 commit 6fb5400
Show file tree
Hide file tree
Showing 4 changed files with 363 additions and 1 deletion.
3 changes: 3 additions & 0 deletions contrib/internal/httptrace/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
envTraceClientIPEnabled = "DD_TRACE_CLIENT_IP_ENABLED"
// envServerErrorStatuses is the name of the env var used to specify error status codes on http server spans
envServerErrorStatuses = "DD_TRACE_HTTP_SERVER_ERROR_STATUSES"

//used for enabling inferred span tracing
inferredProxyServicesEnabled = "DD_TRACE_INFERRED_PROXY_SERVICES_ENABLED"
)

// defaultQueryStringRegexp is the regexp used for query string obfuscation if `envQueryStringRegexp` is empty.
Expand Down
17 changes: 16 additions & 1 deletion contrib/internal/httptrace/httptrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func StartRequestSpan(r *http.Request, opts ...ddtrace.StartSpanOption) (tracer.
if cfg.Tags == nil {
cfg.Tags = make(map[string]interface{})
}

cfg.Tags[ext.SpanType] = ext.SpanTypeWeb
cfg.Tags[ext.HTTPMethod] = r.Method
cfg.Tags[ext.HTTPURL] = urlFromRequest(r)
Expand All @@ -50,9 +51,23 @@ func StartRequestSpan(r *http.Request, opts ...ddtrace.StartSpanOption) (tracer.
if r.Host != "" {
cfg.Tags["http.host"] = r.Host
}
if spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(r.Header)); err == nil {

spanctx, err := tracer.Extract(tracer.HTTPHeadersCarrier(r.Header))
is_inferred_proxy_set := false
println("IN HERE inferredProxyEnabled is:")
println(internal.BoolEnv(inferredProxyServicesEnabled, false))
if internal.BoolEnv(inferredProxyServicesEnabled, false) {
if inferred_proxy_span_ctx := tryCreateInferredProxySpan(r.Header, spanctx); inferred_proxy_span_ctx != nil {
println("IN HERE")
cfg.Parent = inferred_proxy_span_ctx
is_inferred_proxy_set = true
}
}

if err != nil && !is_inferred_proxy_set {
cfg.Parent = spanctx
}

for k, v := range ipTags {
cfg.Tags[k] = v
}
Expand Down
202 changes: 202 additions & 0 deletions contrib/internal/httptrace/httptrace_api_gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package httptrace

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/normalizer"

"github.com/stretchr/testify/assert"
)

var appListener *httptest.Server
var inferredHeaders = map[string]string{
"x-dd-proxy": "aws-apigateway",
"x-dd-proxy-request-time-ms": "1729780025473",
"x-dd-proxy-path": "/test",
"x-dd-proxy-httpmethod": "GET",
"x-dd-proxy-domain-name": "example.com",
"x-dd-proxy-stage": "dev",
}

// mock the aws server
func loadTest(t *testing.T) {
// Set environment variables
t.Setenv("DD_SERVICE", "aws-server")
t.Setenv("DD_TRACE_INFERRED_PROXY_SERVICES_ENABLED", "true")

// set up http server
mux := http.NewServeMux()

// set routes
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/error" {
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]string{"message": "ERROR"})
} else {
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(map[string]string{"message": "OK"})
}
})
appListener = httptest.NewServer(mux)

}
func cleanupTest() {
// close server
if appListener != nil {
appListener.Close()
}
}

func TestInferredProxySpans(t *testing.T) {

t.Run("should create parent and child spans for a 200", func(t *testing.T) {
t.Setenv(inferredProxyServicesEnabled, "true")
mt := mocktracer.Start()
defer mt.Stop()
loadTest(t)
defer cleanupTest()

client := &http.Client{}
req, err := http.NewRequest("GET", fmt.Sprintf("%s/", appListener.URL), nil)

assert := assert.New(t)
assert.NoError(err)

// in go, 2 possible ways to create a testing env, Assert.New(t) or

for k, v := range inferredHeaders {
req.Header.Set(k, v)
}

sp, _ := StartRequestSpan(req)
resp, err := client.Do(req)
FinishRequestSpan(sp, resp.StatusCode, nil)

spans := mt.FinishedSpans()

assert.NoError(err)
assert.Equal(http.StatusOK, resp.StatusCode)

assert.Equal(2, len(spans))
gateway_span := spans[0]
web_req_span := spans[1]
assert.Equal("aws.apigateway", gateway_span.OperationName())
assert.Equal("http.request", web_req_span.OperationName())
assert.True(web_req_span.ParentID() == gateway_span.SpanID())
for _, arg := range inferredHeaders {
header, tag := normalizer.HeaderTag(arg)
gateway_span_tags, ok := gateway_span.Tags()[tag]
if !ok {
gateway_span_tags = ""
}
assert.Equal(strings.Join(req.Header.Values(header), ","), gateway_span_tags)
}

assert.Equal(2, len(spans))

})

t.Run("should create parent and child spans for error", func(t *testing.T) {
t.Setenv("DD_INFERRED_PROXY_SERVICES_ENABLED", "true")
mt := mocktracer.Start()
defer mt.Stop()
loadTest(t)
defer cleanupTest()

client := &http.Client{}
req, err := http.NewRequest("GET", fmt.Sprintf("%s/error", appListener.URL), nil)
assert := assert.New(t)
assert.NoError(err)
for k, v := range inferredHeaders {
req.Header.Set(k, v)
}

sp, _ := StartRequestSpan(req)
resp, err := client.Do(req)
FinishRequestSpan(sp, resp.StatusCode, nil)

assert.NoError(err)
assert.Equal(http.StatusInternalServerError, resp.StatusCode)

spans := mt.FinishedSpans()
assert.Equal(2, len(spans))
gateway_span := spans[0]
web_req_span := spans[1]
assert.Equal("aws.apigateway", gateway_span.OperationName())
assert.Equal("http.request", web_req_span.OperationName())
assert.True(web_req_span.ParentID() == gateway_span.SpanID())
for _, arg := range inferredHeaders {
header, tag := normalizer.HeaderTag(arg)
gateway_span_tags, ok := gateway_span.Tags()[tag]
if !ok {
gateway_span_tags = ""
}
assert.Equal(strings.Join(req.Header.Values(header), ","), gateway_span_tags)
}
assert.Equal(2, len(spans))

})

t.Run("should not create API Gateway spanif headers are missing", func(t *testing.T) {
t.Setenv("DD_INFERRED_PROXY_SERVICES_ENABLED", "true")
mt := mocktracer.Start()
defer mt.Stop()
loadTest(t)
defer cleanupTest()

client := &http.Client{}
req, err := http.NewRequest("GET", fmt.Sprintf("%s/no-aws-headers", appListener.URL), nil)
assert := assert.New(t)
assert.NoError(err)

sp, _ := StartRequestSpan(req)
resp, err := client.Do(req)
FinishRequestSpan(sp, resp.StatusCode, nil)
assert.NoError(err)
assert.Equal(http.StatusOK, resp.StatusCode)

spans := mt.FinishedSpans()
assert.Equal(1, len(spans))
assert.Equal("http.request", spans[0].OperationName())

})
t.Run("should not create API Gateway span if x-dd-proxy is missing", func(t *testing.T) {
t.Setenv("DD_INFERRED_PROXY_SERVICES_ENABLED", "true")
mt := mocktracer.Start()
defer mt.Stop()
loadTest(t)
defer cleanupTest()

client := &http.Client{}
req, err := http.NewRequest("GET", fmt.Sprintf("%s/no-aws-headers", appListener.URL), nil)
assert := assert.New(t)
assert.NoError(err)

for k, v := range inferredHeaders {
if k != "x-dd-proxy" {
req.Header.Set(k, v)
}
}

sp, _ := StartRequestSpan(req)
resp, err := client.Do(req)
FinishRequestSpan(sp, resp.StatusCode, nil)

assert.NoError(err)
assert.Equal(http.StatusOK, resp.StatusCode)

spans := mt.FinishedSpans()
assert.Equal(1, len(spans))
assert.Equal("http.request", spans[0].OperationName())

})

//loadTest(nil)

}
142 changes: 142 additions & 0 deletions contrib/internal/httptrace/inferred_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package httptrace

import (
"fmt"
"net/http"
"strconv"
"time"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

const (
PROXY_HEADER_SYSTEM = "X-Dd-Proxy"
//PROXY_HEADER_START_TIME_MS = "x-dd-proxy-request-time-ms"
PROXY_HEADER_START_TIME_MS = "X-Dd-Proxy-Request-Time-Ms"
PROXY_HEADER_PATH = "X-Dd-Proxy-Path"
PROXY_HEADER_HTTPMETHOD = "X-Dd-Proxy-Httpmethod"
PROXY_HEADER_DOMAIN = "X-Dd-Proxy-Domain-Name"
PROXY_HEADER_STAGE = "X-Dd-Proxy-Stage"
)

type ProxyDetails struct {
SpanName string `json:"spanName"`
Component string `json:"component"`
}

var (
supportedProxies = map[string]ProxyDetails{
"aws-apigateway": {
SpanName: "aws.apigateway",
Component: "aws-apigateway",
},
}
)

type ProxyContext struct {
RequestTime string `json:"requestTime"`
Method string `json:"method"`
Path string `json:"path"`
Stage string `json:"stage"`
DomainName string `json:"domainName"`
ProxySystemName string `json:"proxySystemName"`
}

func extractInferredProxyContext(headers http.Header) *ProxyContext {
//proxyContent := make(map[string][]string)

_, exists := headers[PROXY_HEADER_START_TIME_MS]
if !exists {
println("no proxy header start time")
return nil
}

proxyHeaderSystem, exists := headers[PROXY_HEADER_SYSTEM]
if !exists {
println("no proxy header system")
return nil
}
if _, ok := supportedProxies[proxyHeaderSystem[0]]; !ok {
println("unsupported Proxy header system")
return nil
}

// Q: is it possible to have multiple values for any of these http headers??
return &ProxyContext{
RequestTime: headers[PROXY_HEADER_START_TIME_MS][0],
Method: headers[PROXY_HEADER_HTTPMETHOD][0],
Path: headers[PROXY_HEADER_PATH][0],
Stage: headers[PROXY_HEADER_STAGE][0],
DomainName: headers[PROXY_HEADER_DOMAIN][0],
ProxySystemName: headers[PROXY_HEADER_SYSTEM][0],
}

}

func tryCreateInferredProxySpan(headers http.Header, parent ddtrace.SpanContext) ddtrace.SpanContext {
println("IN TRYCREATE")
println("headers are:")
for key, values := range headers {
fmt.Printf("Key: %s\n", key)
println(key)
for _, value := range values {
fmt.Printf(" Value: %s\n", value)
println(value)
}
}
if headers == nil {
println("headers nil")
return nil

}
// if internal.BoolEnv(inferredProxyServicesEnabled, false) {
// println("bool env false")
// return nil
// }

requestProxyContext := extractInferredProxyContext(headers)
if requestProxyContext == nil {
println("requestProxyContext nil")
return nil
}

proxySpanInfo := supportedProxies[requestProxyContext.ProxySystemName]
fmt.Printf(`Successfully extracted inferred span info ${proxyContext} for proxy: ${proxyContext.proxySystemName}`)

// Parse Time string to Time Type
millis, err := strconv.ParseInt(requestProxyContext.RequestTime, 10, 64)
if err != nil {
fmt.Println("Error parsing time string:", err)
return nil
}

// Convert milliseconds to seconds and nanoseconds
seconds := millis / 1000
nanoseconds := (millis % 1000) * int64(time.Millisecond)

// Create time.Time from Unix timestamp
parsedTime := time.Unix(seconds, nanoseconds)

config := ddtrace.StartSpanConfig{
Parent: parent,
//StartTime: requestProxyContext.RequestTime,
StartTime: parsedTime,
Tags: map[string]interface{}{
"service": requestProxyContext.DomainName,
"HTTP_METHOD": requestProxyContext.Method,
"PATH": requestProxyContext.Path,
"STAGE": requestProxyContext.Stage,
"DOMAIN_NAME": requestProxyContext.DomainName,
"PROXY_SYSTEM_NAME": requestProxyContext.ProxySystemName,
},
}

span := tracer.StartSpan(proxySpanInfo.SpanName, tracer.StartTime(config.StartTime), tracer.ChildOf(config.Parent), tracer.Tag("service", config.Tags["service"]))
defer span.Finish()
for k, v := range config.Tags {
span.SetTag(k, v)
}

return span.Context()
}

0 comments on commit 6fb5400

Please sign in to comment.