Skip to content

Commit

Permalink
httptrace calls arrive async/threaded. (open-telemetry#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
lizthegrey authored and rghetia committed Sep 12, 2019
1 parent 9f85824 commit 5df3c07
Showing 1 changed file with 34 additions and 35 deletions.
69 changes: 34 additions & 35 deletions plugin/httptrace/clienttrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package httptrace
import (
"context"
"crypto/tls"
"fmt"
"net/http/httptrace"
"net/textproto"
"strings"
"sync"

"google.golang.org/grpc/codes"

Expand All @@ -28,11 +30,6 @@ import (
"go.opentelemetry.io/api/trace"
)

type clientLevel struct {
trace.Span
ident string
}

var (
HTTPStatus = key.New("http.status")
HTTPHeaderMIME = key.New("http.mime")
Expand All @@ -47,60 +44,62 @@ type clientTracer struct {
context.Context
httptrace.ClientTrace

levels []clientLevel
levels map[string]trace.Span
root trace.Span
mtx sync.Mutex
}

func newClientTracer(ctx context.Context) *clientTracer {
ct := &clientTracer{
Context: ctx,
levels: make(map[string]trace.Span),
}
ct.open("http.request")
return ct
}

func (ct *clientTracer) open(name string, attrs ...core.KeyValue) {
_, sp := trace.Start(ct.Context, name, trace.WithAttributes(attrs...))
ct.levels = append(ct.levels, clientLevel{
Span: sp,
ident: name,
})
ct.mtx.Lock()
defer ct.mtx.Unlock()
if ct.root == nil {
ct.root = sp
}
ct.levels[name] = sp
}

func (ct *clientTracer) close(name string) {
if len(ct.levels) == 0 {
panic("remove me")
ct.mtx.Lock()
defer ct.mtx.Unlock()
if s, ok := ct.levels[name]; ok {
s.Finish()
delete(ct.levels, name)
} else {
panic(fmt.Sprintf("failed to find span %s in levels.", name))
}
l := len(ct.levels)
ct.levels[l-1].Finish()
ct.levels = ct.levels[0 : l-1]
}

func (ct *clientTracer) current() trace.Span {
return ct.levels[len(ct.levels)-1].Span
}

func (ct *clientTracer) currentName() string {
if len(ct.levels) == 0 {
return ""
}
return ct.levels[len(ct.levels)-1].ident
func (ct *clientTracer) span(name string) trace.Span {
ct.mtx.Lock()
defer ct.mtx.Unlock()
return ct.levels[name]
}

func (ct *clientTracer) getConn(host string) {
ct.open("http.getconn", HostKey.String(host))
}

func (ct *clientTracer) gotConn(info httptrace.GotConnInfo) {
ct.current().SetAttribute(HTTPRemoteAddr.String(info.Conn.RemoteAddr().String()))
ct.current().SetAttribute(HTTPLocalAddr.String(info.Conn.LocalAddr().String()))
ct.span("http.getconn").SetAttribute(HTTPRemoteAddr.String(info.Conn.RemoteAddr().String()))
ct.span("http.getconn").SetAttribute(HTTPLocalAddr.String(info.Conn.LocalAddr().String()))

ct.close("http.getconn")
}

func (ct *clientTracer) putIdleConn(err error) {
if err != nil {
ct.current().SetAttribute(MessageKey.String(err.Error()))
ct.current().SetStatus(codes.Unknown)
ct.span("http.receive").SetAttribute(MessageKey.String(err.Error()))
ct.span("http.receive").SetStatus(codes.Unknown)
}
ct.close("http.receive")
}
Expand Down Expand Up @@ -134,10 +133,10 @@ func (ct *clientTracer) tlsHandshakeDone(tls.ConnectionState, error) {
}

func (ct *clientTracer) wroteHeaderField(k string, v []string) {
if ct.currentName() != "http.headers" {
if ct.span("http.headers") == nil {
ct.open("http.headers")
}
ct.levels[0].SetAttribute(key.New("http." + strings.ToLower(k)).String(sa2s(v)))
ct.root.SetAttribute(key.New("http." + strings.ToLower(k)).String(sa2s(v)))
}

func (ct *clientTracer) wroteHeaders() {
Expand All @@ -146,22 +145,22 @@ func (ct *clientTracer) wroteHeaders() {

func (ct *clientTracer) wroteRequest(info httptrace.WroteRequestInfo) {
if info.Err != nil {
ct.levels[0].SetAttribute(MessageKey.String(info.Err.Error()))
ct.levels[0].SetStatus(codes.Unknown)
ct.root.SetAttribute(MessageKey.String(info.Err.Error()))
ct.root.SetStatus(codes.Unknown)
}
ct.close("http.send")
}

func (ct *clientTracer) got100Continue() {
ct.current().AddEvent(ct.Context, "GOT 100 - Continue")
ct.span("http.receive").AddEvent(ct.Context, "GOT 100 - Continue")
}

func (ct *clientTracer) wait100Continue() {
ct.current().AddEvent(ct.Context, "GOT 100 - Wait")
ct.span("http.receive").AddEvent(ct.Context, "GOT 100 - Wait")
}

func (ct *clientTracer) got1xxResponse(code int, header textproto.MIMEHeader) error {
ct.current().AddEvent(ct.Context, "GOT 1xx",
ct.span("http.receive").AddEvent(ct.Context, "GOT 1xx",
HTTPStatus.Int(code),
HTTPHeaderMIME.String(sm2s(header)),
)
Expand Down

0 comments on commit 5df3c07

Please sign in to comment.