From 0dc1263a138a235b4270423c91966ae79a47101e Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Sun, 24 Dec 2017 01:10:34 +0100 Subject: [PATCH 1/4] Added new Zipkin tracing middleware using the native zipkin-go library --- examples/addsvc/cmd/addcli/addcli.go | 57 +++++--- examples/addsvc/cmd/addsvc/addsvc.go | 48 +++++-- examples/addsvc/pkg/addendpoint/set.go | 10 +- examples/addsvc/pkg/addtransport/grpc.go | 32 +++-- examples/addsvc/pkg/addtransport/http.go | 32 +++-- tracing/README.md | 50 ++++--- tracing/zipkin/README.md | 158 +++++++---------------- tracing/zipkin/doc.go | 5 + tracing/zipkin/endpoint.go | 51 ++++++++ tracing/zipkin/endpoint_test.go | 125 ++++++++++++++++++ tracing/zipkin/grpc.go | 40 ++++++ tracing/zipkin/grpc_test.go | 74 +++++++++++ tracing/zipkin/http.go | 52 ++++++++ tracing/zipkin/http_test.go | 131 +++++++++++++++++++ 14 files changed, 684 insertions(+), 181 deletions(-) create mode 100644 tracing/zipkin/doc.go create mode 100644 tracing/zipkin/endpoint.go create mode 100644 tracing/zipkin/endpoint_test.go create mode 100644 tracing/zipkin/grpc.go create mode 100644 tracing/zipkin/grpc_test.go create mode 100644 tracing/zipkin/http.go create mode 100644 tracing/zipkin/http_test.go diff --git a/examples/addsvc/cmd/addcli/addcli.go b/examples/addsvc/cmd/addcli/addcli.go index fe24fc278..d25b080f1 100644 --- a/examples/addsvc/cmd/addcli/addcli.go +++ b/examples/addsvc/cmd/addcli/addcli.go @@ -14,7 +14,9 @@ import ( "github.com/apache/thrift/lib/go/thrift" lightstep "github.com/lightstep/lightstep-tracer-go" stdopentracing "github.com/opentracing/opentracing-go" - zipkin "github.com/openzipkin/zipkin-go-opentracing" + zipkin "github.com/openzipkin/zipkin-go" + zipkinot "github.com/openzipkin/zipkin-go-opentracing" + zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http" "sourcegraph.com/sourcegraph/appdash" appdashot "sourcegraph.com/sourcegraph/appdash/opentracing" @@ -40,9 +42,10 @@ func main() { thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson") thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered") thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing") - zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans") - lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token") - appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port") + zipkinV2URL = fs.String("zipkin-url", "", "Enable Zipkin v2 tracing (zipkin-go) via HTTP Reporter URL e.g. http://localhost:94111/api/v2/spans") + zipkinV1URL = fs.String("zipkin-v1-url", "", "Enable Zipkin v1 tracing (zipkin-go-opentracing) via a collector URL e.g. http://localhost:9411/api/v1/spans") + lightstepToken = fs.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token") + appdashAddr = fs.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port") method = fs.String("method", "sum", "sum, concat") ) fs.Usage = usageFor(fs, os.Args[0]+" [flags] ") @@ -54,10 +57,10 @@ func main() { // This is a demonstration client, which supports multiple tracers. // Your clients will probably just use one tracer. - var tracer stdopentracing.Tracer + var otTracer stdopentracing.Tracer { - if *zipkinURL != "" { - collector, err := zipkin.NewHTTPCollector(*zipkinURL) + if *zipkinV1URL != "" { + collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL) if err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) @@ -65,24 +68,44 @@ func main() { defer collector.Close() var ( debug = false - hostPort = "localhost:80" - serviceName = "addsvc" + hostPort = "localhost:0" + serviceName = "addsvc-cli" ) - recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName) - tracer, err = zipkin.NewTracer(recorder) + recorder := zipkinot.NewRecorder(collector, debug, hostPort, serviceName) + otTracer, err = zipkinot.NewTracer(recorder) if err != nil { fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) } } else if *lightstepToken != "" { - tracer = lightstep.NewTracer(lightstep.Options{ + otTracer = lightstep.NewTracer(lightstep.Options{ AccessToken: *lightstepToken, }) - defer lightstep.FlushLightStepTracer(tracer) + defer lightstep.FlushLightStepTracer(otTracer) } else if *appdashAddr != "" { - tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) + otTracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) } else { - tracer = stdopentracing.GlobalTracer() // no-op + otTracer = stdopentracing.GlobalTracer() // no-op + } + } + + var zipkinTracer *zipkin.Tracer + { + var ( + err error + hostPort = "" // if host:port is unknown we can keep this empty + serviceName = "addsvc-cli" + ) + noopTracer := (*zipkinV2URL == "") + reporter := zipkinhttp.NewReporter(*zipkinV2URL) + defer reporter.Close() + zEP, _ := zipkin.NewEndpoint(serviceName, hostPort) + zipkinTracer, err = zipkin.NewTracer( + reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(noopTracer), + ) + if err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) } } @@ -93,7 +116,7 @@ func main() { err error ) if *httpAddr != "" { - svc, err = addtransport.NewHTTPClient(*httpAddr, tracer, log.NewNopLogger()) + svc, err = addtransport.NewHTTPClient(*httpAddr, otTracer, zipkinTracer, log.NewNopLogger()) } else if *grpcAddr != "" { conn, err := grpc.Dial(*grpcAddr, grpc.WithInsecure(), grpc.WithTimeout(time.Second)) if err != nil { @@ -101,7 +124,7 @@ func main() { os.Exit(1) } defer conn.Close() - svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger()) + svc = addtransport.NewGRPCClient(conn, otTracer, zipkinTracer, log.NewNopLogger()) } else if *thriftAddr != "" { // It's necessary to do all of this construction in the func main, // because (among other reasons) we need to control the lifecycle of the diff --git a/examples/addsvc/cmd/addsvc/addsvc.go b/examples/addsvc/cmd/addsvc/addsvc.go index b1886e2f7..1adf84c44 100644 --- a/examples/addsvc/cmd/addsvc/addsvc.go +++ b/examples/addsvc/cmd/addsvc/addsvc.go @@ -14,7 +14,9 @@ import ( lightstep "github.com/lightstep/lightstep-tracer-go" "github.com/oklog/oklog/pkg/group" stdopentracing "github.com/opentracing/opentracing-go" - zipkin "github.com/openzipkin/zipkin-go-opentracing" + zipkin "github.com/openzipkin/zipkin-go" + zipkinot "github.com/openzipkin/zipkin-go-opentracing" + zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http" stdprometheus "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" @@ -45,9 +47,10 @@ func main() { thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson") thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered") thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing") - zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans") - lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token") - appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port") + zipkinV2URL = fs.String("zipkin-url", "", "Enable Zipkin v2 tracing (zipkin-go) using a Reporter URL e.g. http://localhost:9411/api/v2/spans") + zipkinV1URL = fs.String("zipkin-v1-url", "", "Enable Zipkin v1 tracing (zipkin-go-opentracing) using a collector URL e.g. http://localhost:9411/api/v1/spans") + lightstepToken = fs.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token") + appdashAddr = fs.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port") ) fs.Usage = usageFor(fs, os.Args[0]+" [flags]") fs.Parse(os.Args[1:]) @@ -60,13 +63,13 @@ func main() { logger = log.With(logger, "caller", log.DefaultCaller) } - // Determine which tracer to use. We'll pass the tracer to all the + // Determine which OpenTracing tracer to use. We'll pass the tracer to all the // components that use it, as a dependency. var tracer stdopentracing.Tracer { - if *zipkinURL != "" { - logger.Log("tracer", "Zipkin", "URL", *zipkinURL) - collector, err := zipkin.NewHTTPCollector(*zipkinURL) + if *zipkinV1URL != "" { + logger.Log("tracer", "Zipkin", "URL", *zipkinV1URL) + collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL) if err != nil { logger.Log("err", err) os.Exit(1) @@ -77,8 +80,8 @@ func main() { hostPort = "localhost:80" serviceName = "addsvc" ) - recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName) - tracer, err = zipkin.NewTracer(recorder) + recorder := zipkinot.NewRecorder(collector, debug, hostPort, serviceName) + tracer, err = zipkinot.NewTracer(recorder) if err != nil { logger.Log("err", err) os.Exit(1) @@ -98,6 +101,25 @@ func main() { } } + var zipkinTracer *zipkin.Tracer + { + var ( + err error + hostPort = "localhost:80" + serviceName = "addsvc" + ) + noopTracer := (*zipkinV2URL == "") + zEP, _ := zipkin.NewEndpoint(serviceName, hostPort) + reporter := zipkinhttp.NewReporter(*zipkinV2URL) + zipkinTracer, err = zipkin.NewTracer( + reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(noopTracer), + ) + if err != nil { + logger.Log("err", err) + os.Exit(1) + } + } + // Create the (sparse) metrics we'll use in the service. They, too, are // dependencies that we pass to components that use them. var ints, chars metrics.Counter @@ -136,9 +158,9 @@ func main() { // them to ports or anything yet; we'll do that next. var ( service = addservice.New(logger, ints, chars) - endpoints = addendpoint.New(service, logger, duration, tracer) - httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger) - grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger) + endpoints = addendpoint.New(service, logger, duration, tracer, zipkinTracer) + httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, zipkinTracer, logger) + grpcServer = addtransport.NewGRPCServer(endpoints, tracer, zipkinTracer, logger) thriftServer = addtransport.NewThriftServer(endpoints) ) diff --git a/examples/addsvc/pkg/addendpoint/set.go b/examples/addsvc/pkg/addendpoint/set.go index e4acaff47..b2c509142 100644 --- a/examples/addsvc/pkg/addendpoint/set.go +++ b/examples/addsvc/pkg/addendpoint/set.go @@ -7,6 +7,7 @@ import ( "golang.org/x/time/rate" stdopentracing "github.com/opentracing/opentracing-go" + stdzipkin "github.com/openzipkin/zipkin-go" "github.com/sony/gobreaker" "github.com/go-kit/kit/circuitbreaker" @@ -15,6 +16,7 @@ import ( "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/tracing/opentracing" + "github.com/go-kit/kit/tracing/zipkin" "github.com/go-kit/kit/examples/addsvc/pkg/addservice" ) @@ -29,13 +31,14 @@ type Set struct { // New returns a Set that wraps the provided server, and wires in all of the // expected endpoint middlewares via the various parameters. -func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, trace stdopentracing.Tracer) Set { +func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer) Set { var sumEndpoint endpoint.Endpoint { sumEndpoint = MakeSumEndpoint(svc) sumEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 1))(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint) - sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint) + sumEndpoint = opentracing.TraceServer(otTracer, "Sum")(sumEndpoint) + sumEndpoint = zipkin.TraceServer(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint) sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint) } @@ -44,7 +47,8 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, concatEndpoint = MakeConcatEndpoint(svc) concatEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint) - concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint) + concatEndpoint = opentracing.TraceServer(otTracer, "Concat")(concatEndpoint) + concatEndpoint = zipkin.TraceServer(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint) concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint) } diff --git a/examples/addsvc/pkg/addtransport/grpc.go b/examples/addsvc/pkg/addtransport/grpc.go index 6ec58d7f4..18a52dc3d 100644 --- a/examples/addsvc/pkg/addtransport/grpc.go +++ b/examples/addsvc/pkg/addtransport/grpc.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc" stdopentracing "github.com/opentracing/opentracing-go" + stdzipkin "github.com/openzipkin/zipkin-go" "github.com/sony/gobreaker" oldcontext "golang.org/x/net/context" "golang.org/x/time/rate" @@ -17,6 +18,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/tracing/opentracing" + "github.com/go-kit/kit/tracing/zipkin" grpctransport "github.com/go-kit/kit/transport/grpc" "github.com/go-kit/kit/examples/addsvc/pb" @@ -30,7 +32,7 @@ type grpcServer struct { } // NewGRPCServer makes a set of endpoints available as a gRPC AddServer. -func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer { +func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) pb.AddServer { options := []grpctransport.ServerOption{ grpctransport.ServerErrorLogger(logger), } @@ -39,13 +41,19 @@ func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logg endpoints.SumEndpoint, decodeGRPCSumRequest, encodeGRPCSumResponse, - append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Sum", logger)))..., + append(options, grpctransport.ServerBefore( + opentracing.GRPCToContext(otTracer, "Sum", logger), + zipkin.GRPCToContext(zipkinTracer, "Sum", logger), + ))..., ), concat: grpctransport.NewServer( endpoints.ConcatEndpoint, decodeGRPCConcatRequest, encodeGRPCConcatResponse, - append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Concat", logger)))..., + append(options, grpctransport.ServerBefore( + opentracing.GRPCToContext(otTracer, "Concat", logger), + zipkin.GRPCToContext(zipkinTracer, "Concat", logger), + ))..., ), } } @@ -70,7 +78,7 @@ func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb. // of the conn. The caller is responsible for constructing the conn, and // eventually closing the underlying transport. We bake-in certain middlewares, // implementing the client library pattern. -func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service { +func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) addservice.Service { // We construct a single ratelimiter middleware, to limit the total outgoing // QPS from this client to all methods on the remote instance. We also // construct per-endpoint circuitbreaker middlewares to demonstrate how @@ -91,9 +99,13 @@ func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger l encodeGRPCSumRequest, decodeGRPCSumResponse, pb.SumReply{}, - grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)), + grpctransport.ClientBefore( + opentracing.ContextToGRPC(otTracer, logger), + zipkin.ContextToGRPC(zipkinTracer, logger), + ), ).Endpoint() - sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) + sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) + sumEndpoint = zipkin.TraceClient(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", @@ -112,9 +124,13 @@ func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger l encodeGRPCConcatRequest, decodeGRPCConcatResponse, pb.ConcatReply{}, - grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)), + grpctransport.ClientBefore( + opentracing.ContextToGRPC(otTracer, logger), + zipkin.ContextToGRPC(zipkinTracer, logger), + ), ).Endpoint() - concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) + concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) + concatEndpoint = zipkin.TraceClient(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", diff --git a/examples/addsvc/pkg/addtransport/http.go b/examples/addsvc/pkg/addtransport/http.go index 3819c6d87..ebc5cc74e 100644 --- a/examples/addsvc/pkg/addtransport/http.go +++ b/examples/addsvc/pkg/addtransport/http.go @@ -14,6 +14,7 @@ import ( "golang.org/x/time/rate" stdopentracing "github.com/opentracing/opentracing-go" + stdzipkin "github.com/openzipkin/zipkin-go" "github.com/sony/gobreaker" "github.com/go-kit/kit/circuitbreaker" @@ -21,6 +22,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/tracing/opentracing" + "github.com/go-kit/kit/tracing/zipkin" httptransport "github.com/go-kit/kit/transport/http" "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" @@ -29,7 +31,7 @@ import ( // NewHTTPHandler returns an HTTP handler that makes a set of endpoints // available on predefined paths. -func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler { +func NewHTTPHandler(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) http.Handler { options := []httptransport.ServerOption{ httptransport.ServerErrorEncoder(errorEncoder), httptransport.ServerErrorLogger(logger), @@ -39,13 +41,19 @@ func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, log endpoints.SumEndpoint, decodeHTTPSumRequest, encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore(opentracing.HTTPToContext(tracer, "Sum", logger)))..., + append(options, httptransport.ServerBefore( + opentracing.HTTPToContext(otTracer, "Sum", logger), + zipkin.HTTPToContext(zipkinTracer, "Sum", logger), + ))..., )) m.Handle("/concat", httptransport.NewServer( endpoints.ConcatEndpoint, decodeHTTPConcatRequest, encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore(opentracing.HTTPToContext(tracer, "Concat", logger)))..., + append(options, httptransport.ServerBefore( + opentracing.HTTPToContext(otTracer, "Concat", logger), + zipkin.HTTPToContext(zipkinTracer, "Concat", logger), + ))..., )) return m } @@ -54,7 +62,7 @@ func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, log // remote instance. We expect instance to come from a service discovery system, // so likely of the form "host:port". We bake-in certain middlewares, // implementing the client library pattern. -func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) { +func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) (addservice.Service, error) { // Quickly sanitize the instance string. if !strings.HasPrefix(instance, "http") { instance = "http://" + instance @@ -82,9 +90,13 @@ func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Log copyURL(u, "/sum"), encodeHTTPGenericRequest, decodeHTTPSumResponse, - httptransport.ClientBefore(opentracing.ContextToHTTP(tracer, logger)), + httptransport.ClientBefore( + opentracing.ContextToHTTP(otTracer, logger), + zipkin.ContextToHTTP(zipkinTracer, logger), + ), ).Endpoint() - sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) + sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) + sumEndpoint = zipkin.TraceClient(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", @@ -101,9 +113,13 @@ func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Log copyURL(u, "/concat"), encodeHTTPGenericRequest, decodeHTTPConcatResponse, - httptransport.ClientBefore(opentracing.ContextToHTTP(tracer, logger)), + httptransport.ClientBefore( + opentracing.ContextToHTTP(otTracer, logger), + zipkin.ContextToHTTP(zipkinTracer, logger), + ), ).Endpoint() - concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) + concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) + concatEndpoint = zipkin.TraceClient(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", diff --git a/tracing/README.md b/tracing/README.md index d5e60e826..87242e4cc 100644 --- a/tracing/README.md +++ b/tracing/README.md @@ -1,6 +1,6 @@ # package tracing -`package tracing` provides [Dapper][]-style request tracing to services. +`package tracing` provides [Dapper]-style request tracing to services. ## Rationale @@ -10,37 +10,47 @@ hot spots, and diagnosing errors. All microservice infrastructures will benefit from request tracing; sufficiently large infrastructures will require it. -## OpenTracing +## Zipkin -Go kit builds on top of the [OpenTracing] API and uses the [opentracing-go] -package to provide tracing middlewares for its servers and clients. Currently -`kit/transport/http` and `kit/transport/grpc` transports are supported. +[Zipkin] is the most used OSS distributed tracing platform available with +support for many different languages and frameworks. Go kit provides bindings +to the native Go tracing implementation [zipkin-go]. If using Zipkin with Go +kit in a polyglot microservices environment, this is the preferred binding to +use. Instrumentation exists for `kit/transport/http` and `kit/transport/grpc`. +The bindings are highlighted in the [addsvc] example. For more information +regarding Zipkin feel free to visit [Zipkin's Gitter]. -Since [OpenTracing] is an upcoming standard API, Go kit should support a -multitude of tracing backends. If a Tracer implementation in Go for your -back-end exists, it should work out of the box. The following tracing back-ends -are known to work with Go kit through the OpenTracing interface and are -highlighted in the [addsvc] example. +## OpenTracing +Go kit supports the [OpenTracing] API and uses the [opentracing-go] package to +provide tracing middlewares for its servers and clients. Currently OpenTracing +instrumentation exists for `kit/transport/http` and `kit/transport/grpc`. -### LightStep +Since [OpenTracing] is an effort to provide a generic API, Go kit should support +a multitude of tracing backends. If a Tracer implementation or OpenTracing +bridge in Go for your back-end exists, it should work out of the box. -[LightStep] support is available through their standard Go package -[lightstep-tracer-go]. +Please note that the "world view" of existing tracing systems do differ. +OpenTracing can not guarantee you that tracing alignment is perfect in a +polyglot microservice environment or switching from one tracing backend to +another truly entails just a change in configuration. + +The following tracing back-ends are known to work with Go kit through the +OpenTracing interface and are highlighted in the [addsvc] example. ### AppDash [Appdash] support is available straight from their system repository in the [appdash/opentracing] directory. +### LightStep + +[LightStep] support is available through their standard Go package +[lightstep-tracer-go]. + ### Zipkin -[Zipkin] support is now available from the [zipkin-go-opentracing] package which -can be found at the [Open Zipkin GitHub] page. This means our old custom -`tracing/zipkin` package is now deprecated. In the `kit/tracing/zipkin` -directory you can still find the `docker-compose` script to bootstrap a Zipkin -development environment and a [README] detailing how to transition from the -old package to the new. +[Zipkin] support is available through the [zipkin-go-opentracing] package. [Dapper]: http://research.google.com/pubs/pub36356.html [addsvc]:https://github.com/go-kit/kit/tree/master/examples/addsvc @@ -52,6 +62,8 @@ old package to the new. [Zipkin]: http://zipkin.io/ [Open Zipkin GitHub]: https://github.com/openzipkin [zipkin-go-opentracing]: https://github.com/openzipkin/zipkin-go-opentracing +[zipkin-go]: https://github.com/openzipkin/zipkin-go +[Zipkin's Gitter]: https://gitter.im/openzipkin/zipkin [Appdash]: https://github.com/sourcegraph/appdash [appdash/opentracing]: https://github.com/sourcegraph/appdash/tree/master/opentracing diff --git a/tracing/zipkin/README.md b/tracing/zipkin/README.md index 36579b576..25549acd8 100644 --- a/tracing/zipkin/README.md +++ b/tracing/zipkin/README.md @@ -25,149 +25,81 @@ Collector. Follow the [addsvc] example to check out how to wire the Zipkin Middleware. The changes should be relatively minor. -The [zipkin-go-opentracing] package has support for HTTP, Kafka and Scribe -collectors as well as using Go Kit's [Log] package for logging. +The [zipkin-go] package has Reporters to send Spans to the Zipkin +HTTP and Kafka Collectors. -### Configuring for the Zipkin HTTP Collector +### Configuring the Zipkin HTTP Reporter -To select the transport for the HTTP Collector, you configure the `Recorder` -with the appropriate collector like this: +To use the HTTP Reporter with a Zipkin instance running on localhost you +bootstrap zipkin-go like this: ```go var ( - debugMode = false serviceName = "MyService" serviceHostPort = "localhost:8000" - zipkinHTTPEndpoint = "localhost:9411" + zipkinHTTPEndpoint = "http://localhost:9411/api/v2/spans" ) -collector, err = zipkin.NewHTTPCollector(zipkinHTTPEndpoint) -if err != nil { - // handle error -} -tracer, err = zipkin.NewTracer( - zipkin.NewRecorder(collector, debugMode, serviceHostPort, serviceName), - ... -) -``` - -### Span per Node vs. Span per RPC -By default Zipkin V1 considers either side of an RPC to have the same identity -and differs in that respect from many other tracing systems which consider the -caller to be the parent and the receiver to be the child. The OpenTracing -specification does not dictate one model over the other, but the Zipkin team is -looking into these [single-host-spans] to potentially bring Zipkin more in-line -with the other tracing systems. - -[single-host-spans]: https://github.com/openzipkin/zipkin/issues/963 - -In case of a `span per node` the receiver will create a child span from the -propagated parent span like this: - -``` -Span per Node propagation and identities - -CALLER: RECEIVER: ---------------------------------- -traceId -> traceId - spanId (new) -spanId -> parentSpanId -parentSpanId -``` -**Note:** most tracing implementations supporting the `span per node` model -therefore do not propagate their `parentSpanID` as its not needed. +// create an instance of the HTTP Reporter. +reporter := zipkin.NewReporter(zipkinHTTPEndpoint) -A typical Zipkin implementation will use the `span per RPC` model and recreate -the span identity from the caller on the receiver's end and then annotates its -values on top of it. Propagation will happen like this: +// create our tracer's local endpoint (how the service is identified in Zipkin). +localEndpoint, _ := zipkin.NewEndpoint(serviceName, serviceHostPort) -``` -Span per RPC propagation and identities - -CALLER: RECEIVER: ---------------------------------- -traceId -> traceId -spanId -> spanId -parentSpanId -> parentSpanId -``` - -The [zipkin-go-opentracing] implementation allows you to choose which model you -wish to use. Make sure you select the same model consistently for all your -services that are required to communicate with each other or you will have trace -propagation issues. If using non OpenTracing / legacy instrumentation, it's -probably best to use the `span per RPC call` model. - -To adhere to the more common tracing philosophy of `span per node`, the Tracer -defaults to `span per node`. To set the `span per RPC call` mode start your -tracer like this: +// create our tracer instance. +tracer, err = zipkin.NewTracer(reporter, zipkin.WithLocalEndpoint(localEndpoint)) + ... -```go -tracer, err = zipkin.NewTracer( - zipkin.NewRecorder(...), - zipkin.ClientServerSameSpan(true), -) ``` -[zipkin-go-opentracing]: https://github.com/openzipkin/zipkin-go-opentracing +[zipkin-go]: https://github.com/openzipkin/zipkin-go [addsvc]:https://github.com/go-kit/kit/tree/master/examples/addsvc [Log]: https://github.com/go-kit/kit/tree/master/log ### Tracing Resources -In our legacy implementation we had the `NewChildSpan` method to allow -annotation of resources such as databases, caches and other services that do not -have server side tracing support. Since OpenTracing has no specific method of -dealing with these items explicitely that is compatible with Zipkin's `SA` -annotation, the [zipkin-go-opentracing] has implemented support using the -OpenTracing Tags system. Here is an example of how one would be able to record -a resource span compatible with standard OpenTracing and triggering an `SA` -annotation in [zipkin-go-opentracing]: - +Here is an example of how you could trace resources and work with local spans. ```go -// you need to import the ext package for the Tag helper functions import ( - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" + zipkin "github.com/openzipkin/zipkin-go" ) func (svc *Service) GetMeSomeExamples(ctx context.Context, ...) ([]Examples, error) { - // Example of annotating a database query: - var ( - serviceName = "MySQL" - serviceHost = "mysql.example.com" - servicePort = uint16(3306) - queryLabel = "GetExamplesByParam" - query = "select * from example where param = 'value'" - ) - - // retrieve the parent span, if not found create a new trace - parentSpan := opentracing.SpanFromContext(ctx) - if parentSpan == nil { - parentSpan = opentracing.StartSpan(queryLabel) - defer parentSpan.Finish() - } - - // create a new span to record the resource interaction - span := opentracing.StartChildSpan(parentSpan, queryLabel) - - // span.kind "resource" triggers SA annotation - ext.SpanKind.Set(span, "resource") - - // this will label the span's service & hostPort (called Endpoint in Zipkin) - ext.PeerService.Set(span, serviceName) - ext.PeerHostname.Set(span, serviceHost) - ext.PeerPort.Set(span, servicePort) - - // a Tag is the equivalent of a Zipkin Binary Annotation (key:value pair) + // Example of annotating a database query: + var ( + spanContext model.SpanContext + serviceName = "MySQL" + serviceHost = "mysql.example.com:3306" + queryLabel = "GetExamplesByParam" + query = "select * from example where param = :value" + ) + + // retrieve the parent span from context to use as parent, if not found we + // start a new trace + if parentSpan := zipkin.SpanFromContext(ctx); parentSpan != nil { + spanContext = parentSpan.Context() + } + + // create the remote Zipkin endpoint + ep, _ := zipkin.NewEndpoint(serviceName, serviceHost) + + // create a new span to record the resource interaction + span := zipkin.StartSpan( + queryLabel, + zipkin.Parent(parentSpan.Context()), + zipkin.WithRemoteEndpoint(ep), + ) + + // add interesting key/value pair to our span span.SetTag("query", query) - // a LogEvent is the equivalent of a Zipkin Annotation (timestamped) - span.LogEvent("query:start") + // add interesting timed event to our span + span.Annotate(time.Now(), "query:start") // do the actual query... // let's annotate the end... - span.LogEvent("query:end") + span.Annotate(time.Now(), "query:end") // we're done with this span. span.Finish() diff --git a/tracing/zipkin/doc.go b/tracing/zipkin/doc.go new file mode 100644 index 000000000..82f6d6c57 --- /dev/null +++ b/tracing/zipkin/doc.go @@ -0,0 +1,5 @@ +// Package zipkin provides Go kit integration to the OpenZipkin project through +// the use of zipkin-go, the official OpenZipkin tracer implementation for Go. +// OpenZipkin is the most used open source distributed tracing ecosystem with +// many different libraries and interoperability options. +package zipkin diff --git a/tracing/zipkin/endpoint.go b/tracing/zipkin/endpoint.go new file mode 100644 index 000000000..324de66c1 --- /dev/null +++ b/tracing/zipkin/endpoint.go @@ -0,0 +1,51 @@ +package zipkin + +import ( + "context" + + "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/model" + + "github.com/go-kit/kit/endpoint" +) + +// TraceServer returns a Middleware that wraps the `next` Endpoint in a Zipkin +// Span called `operationName`. +// +// If `ctx` already has a Span, it is re-used and the operation name is +// overwritten. If `ctx` does not yet have a Span, one is created here. +func TraceServer(tracer *zipkin.Tracer, operationName string) endpoint.Middleware { + return func(next endpoint.Endpoint) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + var sp zipkin.Span + // try to retrieve Span from Go context, create new Span if not found. + if sp = zipkin.SpanFromContext(ctx); sp == nil { + sp = tracer.StartSpan(operationName, zipkin.Kind(model.Server)) + ctx = zipkin.NewContext(ctx, sp) + } else { + sp.SetName(operationName) + } + defer sp.Finish() + return next(ctx, request) + } + } +} + +// TraceClient returns a Middleware that wraps the `next` Endpoint in a Zipkin +// Span called `operationName`. +func TraceClient(tracer *zipkin.Tracer, operationName string) endpoint.Middleware { + return func(next endpoint.Endpoint) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + var spanOpts = []zipkin.SpanOption{zipkin.Kind(model.Client)} + // try to retrieve Span from Go context, use its SpanContext if found. + if parentSpan := zipkin.SpanFromContext(ctx); parentSpan != nil { + spanOpts = append(spanOpts, zipkin.Parent(parentSpan.Context())) + } + // create new client span (if sc is empty, Parent is a noop) + sp := tracer.StartSpan(operationName, spanOpts...) + defer sp.Finish() + ctx = zipkin.NewContext(ctx, sp) + return next(ctx, request) + } + } +} diff --git a/tracing/zipkin/endpoint_test.go b/tracing/zipkin/endpoint_test.go new file mode 100644 index 000000000..b3a817b08 --- /dev/null +++ b/tracing/zipkin/endpoint_test.go @@ -0,0 +1,125 @@ +package zipkin_test + +import ( + "context" + "testing" + + "github.com/go-kit/kit/endpoint" + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/reporter/recorder" + + kitzipkin "github.com/go-kit/kit/tracing/zipkin" +) + +func TestTraceServer(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + // Initialize the ctx with a nameless Span. + contextSpan := tracer.StartSpan("") + ctx := zipkin.NewContext(context.Background(), contextSpan) + + tracedEndpoint := kitzipkin.TraceServer(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { + t.Fatal(err) + } + + finishedSpans := reporter.Flush() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + // Test that the op name is updated + endpointSpan := finishedSpans[0] + if want, have := "testOp", endpointSpan.Name; want != have { + t.Fatalf("Want %q, have %q", want, have) + } + contextContext := contextSpan.Context() + endpointContext := endpointSpan.SpanContext + // ...and that the ID is unmodified. + if want, have := contextContext.ID, endpointContext.ID; want != have { + t.Errorf("Want SpanID %q, have %q", want, have) + } +} + +func TestTraceServerNoContextSpan(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + // Empty/background context. + tracedEndpoint := kitzipkin.TraceServer(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil { + t.Fatal(err) + } + + // tracedEndpoint created a new Span. + finishedSpans := reporter.Flush() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + endpointSpan := finishedSpans[0] + if want, have := "testOp", endpointSpan.Name; want != have { + t.Fatalf("Want %q, have %q", want, have) + } +} + +func TestTraceClient(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + // Initialize the ctx with a parent Span. + parentSpan := tracer.StartSpan("parent") + defer parentSpan.Finish() + ctx := zipkin.NewContext(context.Background(), parentSpan) + + tracedEndpoint := kitzipkin.TraceClient(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { + t.Fatal(err) + } + + // tracedEndpoint created a new Span. + finishedSpans := reporter.Flush() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + endpointSpan := finishedSpans[0] + if want, have := "testOp", endpointSpan.Name; want != have { + t.Fatalf("Want %q, have %q", want, have) + } + + parentContext := parentSpan.Context() + endpointContext := parentSpan.Context() + + // ... and that the parent ID is set appropriately. + if want, have := parentContext.ID, endpointContext.ID; want != have { + t.Errorf("Want ParentID %q, have %q", want, have) + } +} + +func TestTraceClientNoContextSpan(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + // Empty/background context. + tracedEndpoint := kitzipkin.TraceClient(tracer, "testOp")(endpoint.Nop) + if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil { + t.Fatal(err) + } + + // tracedEndpoint created a new Span. + finishedSpans := reporter.Flush() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + endpointSpan := finishedSpans[0] + if want, have := "testOp", endpointSpan.Name; want != have { + t.Fatalf("Want %q, have %q", want, have) + } +} diff --git a/tracing/zipkin/grpc.go b/tracing/zipkin/grpc.go new file mode 100644 index 000000000..18a66937f --- /dev/null +++ b/tracing/zipkin/grpc.go @@ -0,0 +1,40 @@ +package zipkin + +import ( + "context" + + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/model" + "github.com/openzipkin/zipkin-go/propagation/b3" + "google.golang.org/grpc/metadata" + + "github.com/go-kit/kit/log" +) + +// ContextToGRPC returns a grpc RequestFunc that injects a Zipkin Span found in +// `ctx` into the grpc Metadata. If no such Span can be found, the RequestFunc +// is a noop. +func ContextToGRPC(tracer *zipkin.Tracer, logger log.Logger) func(context.Context, *metadata.MD) context.Context { + return func(ctx context.Context, md *metadata.MD) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + // There's nothing we can do with an error here. + if err := b3.InjectGRPC(md)(span.Context()); err != nil { + logger.Log("err", err) + } + } + return ctx + } +} + +// GRPCToContext returns a grpc RequestFunc that tries to join with a Zipkin +// trace found in `req` and starts a new Span called `operationName` +// accordingly. If no trace could be found in `req`, the Span +// will be a trace root. The Span is incorporated in the returned Context and +// can be retrieved with zipkin.SpanFromContext(ctx). +func GRPCToContext(tracer *zipkin.Tracer, operationName string, logger log.Logger) func(ctx context.Context, md metadata.MD) context.Context { + return func(ctx context.Context, md metadata.MD) context.Context { + spanContext := tracer.Extract(b3.ExtractGRPC(&md)) + span := tracer.StartSpan(operationName, zipkin.Kind(model.Server), zipkin.Parent(spanContext)) + return zipkin.NewContext(ctx, span) + } +} diff --git a/tracing/zipkin/grpc_test.go b/tracing/zipkin/grpc_test.go new file mode 100644 index 000000000..3b2326f71 --- /dev/null +++ b/tracing/zipkin/grpc_test.go @@ -0,0 +1,74 @@ +package zipkin_test + +import ( + "context" + "testing" + + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/reporter/recorder" + "google.golang.org/grpc/metadata" + + "github.com/go-kit/kit/log" + kitzipkin "github.com/go-kit/kit/tracing/zipkin" +) + +func TestTraceGRPCRequestRoundtrip(t *testing.T) { + logger := log.NewNopLogger() + reporter := recorder.NewReporter() + defer reporter.Close() + + // we disable shared rpc spans so we can test parent-child relationship + tracer, _ := zipkin.NewTracer(reporter, zipkin.WithSharedSpans(false)) + + // Initialize the ctx with a Span to inject. + beforeSpan := tracer.StartSpan("to_inject") + beforeCtx := zipkin.NewContext(context.Background(), beforeSpan) + + toGRPCFunc := kitzipkin.ContextToGRPC(tracer, logger) + md := metadata.Pairs() + // Call the RequestFunc. + afterCtx := toGRPCFunc(beforeCtx, &md) + + // The Span should not have changed. + afterSpan := zipkin.SpanFromContext(afterCtx) + if beforeSpan != afterSpan { + t.Error("Should not swap in a new span") + } + + // No spans should have finished yet. + finishedSpans := reporter.Flush() + if want, have := 0, len(finishedSpans); want != have { + t.Errorf("Want %v span(s), found %v", want, have) + } + + // Use GRPCToContext to verify that we can join with the trace given MD. + fromGRPCFunc := kitzipkin.GRPCToContext(tracer, "joined", logger) + joinCtx := fromGRPCFunc(afterCtx, md) + joinedSpan := zipkin.SpanFromContext(joinCtx) + + joinedSpan.Finish() + beforeSpan.Finish() + + finishedSpans = reporter.Flush() + if want, have := 2, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + joined := finishedSpans[0] + before := finishedSpans[1] + + if joined.SpanContext.ID == before.SpanContext.ID { + t.Error("Span.ID should have changed", joined.SpanContext.ID, before.SpanContext.ID) + } + + // Check that the parent/child relationship is as expected for the joined span. + if joined.SpanContext.ParentID == nil { + t.Fatalf("Want ParentID %q, have nil", before.SpanContext.ID) + } + if want, have := before.SpanContext.ID, *joined.SpanContext.ParentID; want != have { + t.Errorf("Want ParentID %q, have %q", want, have) + } + if want, have := "joined", joined.Name; want != have { + t.Errorf("Want %q, have %q", want, have) + } +} diff --git a/tracing/zipkin/http.go b/tracing/zipkin/http.go new file mode 100644 index 000000000..c1cc0a197 --- /dev/null +++ b/tracing/zipkin/http.go @@ -0,0 +1,52 @@ +package zipkin + +import ( + "context" + "net/http" + + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/model" + "github.com/openzipkin/zipkin-go/propagation/b3" + + "github.com/go-kit/kit/log" + kithttp "github.com/go-kit/kit/transport/http" +) + +// ContextToHTTP returns an http RequestFunc that injects a Zipkin Span found +// in `ctx` into the http headers. If no such Span can be found, the RequestFunc +// is a noop. +func ContextToHTTP(tracer *zipkin.Tracer, logger log.Logger) kithttp.RequestFunc { + return func(ctx context.Context, req *http.Request) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + // add some common Zipkin Tags + zipkin.TagHTTPMethod.Set(span, req.Method) + zipkin.TagHTTPUrl.Set(span, req.URL.String()) + if endpoint, err := zipkin.NewEndpoint("", req.URL.Host); err == nil { + span.SetRemoteEndpoint(endpoint) + } + // There's nothing we can do with any errors here. + if err := b3.InjectHTTP(req)(span.Context()); err != nil { + logger.Log("err", err) + } + } + return ctx + } +} + +// HTTPToContext returns an http RequestFunc that tries to join with a Zipkin +// trace found in `req` and starts a new Span called `operationName` +// accordingly. If no trace could be found in `req`, the Span will be a trace +// root. The Span is incorporated in the returned Context and can be retrieved +// with zipkin.SpanFromContext(ctx). +func HTTPToContext(tracer *zipkin.Tracer, operationName string, logger log.Logger) kithttp.RequestFunc { + return func(ctx context.Context, req *http.Request) context.Context { + spanContext := tracer.Extract(b3.ExtractHTTP(req)) + span := tracer.StartSpan( + operationName, zipkin.Kind(model.Server), zipkin.Parent(spanContext), + ) + // add some common Zipkin Tags + zipkin.TagHTTPMethod.Set(span, req.Method) + zipkin.TagHTTPUrl.Set(span, req.URL.String()) + return zipkin.NewContext(ctx, span) + } +} diff --git a/tracing/zipkin/http_test.go b/tracing/zipkin/http_test.go new file mode 100644 index 000000000..99afacfdc --- /dev/null +++ b/tracing/zipkin/http_test.go @@ -0,0 +1,131 @@ +package zipkin_test + +import ( + "context" + "net/http" + "reflect" + "testing" + + zipkin "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/propagation/b3" + "github.com/openzipkin/zipkin-go/reporter/recorder" + + "github.com/go-kit/kit/log" + kitzipkin "github.com/go-kit/kit/tracing/zipkin" +) + +func TestTraceHTTPRequestRoundtrip(t *testing.T) { + logger := log.NewNopLogger() + reporter := recorder.NewReporter() + defer reporter.Close() + + // we disable shared rpc spans so we can test parent-child relationship + tracer, _ := zipkin.NewTracer(reporter, zipkin.WithSharedSpans(false)) + + // Initialize the ctx with a Span to inject. + beforeSpan := tracer.StartSpan("to_inject") + beforeCtx := zipkin.NewContext(context.Background(), beforeSpan) + + toHTTPFunc := kitzipkin.ContextToHTTP(tracer, logger) + req, _ := http.NewRequest("GET", "http://test.biz/path", nil) + // Call the RequestFunc. + afterCtx := toHTTPFunc(beforeCtx, req) + + // The Span should not have changed. + afterSpan := zipkin.SpanFromContext(afterCtx) + if beforeSpan != afterSpan { + t.Error("Should not swap in a new span") + } + + // No spans should have finished yet. + finishedSpans := reporter.Flush() + if want, have := 0, len(finishedSpans); want != have { + t.Errorf("Want %v span(s), found %v", want, have) + } + + // Use HTTPToContext to verify that we can join with the trace given a req. + fromHTTPFunc := kitzipkin.HTTPToContext(tracer, "joined", logger) + joinCtx := fromHTTPFunc(afterCtx, req) + joinedSpan := zipkin.SpanFromContext(joinCtx) + + joinedSpan.Finish() + beforeSpan.Finish() + + finishedSpans = reporter.Flush() + if want, have := 2, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + joined := finishedSpans[0] + before := finishedSpans[1] + + if joined.SpanContext.ID == before.SpanContext.ID { + t.Error("SpanID should have changed", joined.SpanContext.ID, before.SpanContext.ID) + } + + // Check that the parent/child relationship is as expected for the joined span. + if joined.SpanContext.ParentID == nil { + t.Fatalf("Want ParentID %q, have nil", before.SpanContext.ID) + } + if want, have := before.SpanContext.ID, *joined.SpanContext.ParentID; want != have { + t.Errorf("Want ParentID %q, have %q", want, have) + } + if want, have := "joined", joined.Name; want != have { + t.Errorf("Want %q, have %q", want, have) + } +} + +func TestContextToHTTPTags(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + span := tracer.StartSpan("to_inject") + ctx := zipkin.NewContext(context.Background(), span) + req, _ := http.NewRequest("GET", "http://test.biz/path", nil) + + kitzipkin.ContextToHTTP(tracer, log.NewNopLogger())(ctx, req) + + expectedTags := map[string]string{ + string(zipkin.TagHTTPMethod): "GET", + string(zipkin.TagHTTPUrl): "http://test.biz/path", + } + + span.Finish() + + finishedSpans := reporter.Flush() + if want, have := 1, len(finishedSpans); want != have { + t.Fatalf("Want %v span(s), found %v", want, have) + } + + if !reflect.DeepEqual(expectedTags, finishedSpans[0].Tags) { + t.Errorf("Want %q, have %q", expectedTags, finishedSpans[0].Tags) + } +} + +func TestHTTPToContextTags(t *testing.T) { + reporter := recorder.NewReporter() + defer reporter.Close() + tracer, _ := zipkin.NewTracer(reporter) + + parentSpan := tracer.StartSpan("to_extract") + defer parentSpan.Finish() + + req, _ := http.NewRequest("GET", "http://test.biz/path", nil) + b3.InjectHTTP(req)(parentSpan.Context()) + + ctx := kitzipkin.HTTPToContext(tracer, "op", log.NewNopLogger())(context.Background(), req) + zipkin.SpanFromContext(ctx).Finish() + + childSpan := reporter.Flush()[0] + expectedTags := map[string]string{ + string(zipkin.TagHTTPMethod): "GET", + string(zipkin.TagHTTPUrl): "http://test.biz/path", + } + if !reflect.DeepEqual(expectedTags, childSpan.Tags) { + t.Errorf("Want %q, have %q", expectedTags, childSpan.Tags) + } + if want, have := "op", childSpan.Name; want != have { + t.Errorf("Want %q, have %q", want, have) + } +} From 367c60a418ccfde522d1d729962e99d1fbc2d37a Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Tue, 9 Jan 2018 20:42:15 +0100 Subject: [PATCH 2/4] new approach to Zipkin tracing middleware for Go kit --- examples/addsvc/cmd/addsvc/addsvc.go | 5 +- examples/addsvc/cmd/addsvc/wiring_test.go | 6 +- examples/addsvc/pkg/addendpoint/set.go | 4 +- examples/addsvc/pkg/addtransport/grpc.go | 52 ++++-- examples/addsvc/pkg/addtransport/http.go | 44 +++-- examples/apigateway/main.go | 12 +- tracing/zipkin/endpoint.go | 39 +--- tracing/zipkin/endpoint_test.go | 125 ------------- tracing/zipkin/grpc.go | 195 +++++++++++++++++--- tracing/zipkin/grpc_test.go | 74 -------- tracing/zipkin/http.go | 206 ++++++++++++++++++---- tracing/zipkin/http_test.go | 131 -------------- tracing/zipkin/options.go | 54 ++++++ transport/grpc/client.go | 35 +++- transport/grpc/request_response_funcs.go | 6 + transport/grpc/server.go | 58 ++++-- transport/http/client.go | 10 +- transport/http/server.go | 10 +- 18 files changed, 582 insertions(+), 484 deletions(-) delete mode 100644 tracing/zipkin/endpoint_test.go delete mode 100644 tracing/zipkin/grpc_test.go delete mode 100644 tracing/zipkin/http_test.go create mode 100644 tracing/zipkin/options.go diff --git a/examples/addsvc/cmd/addsvc/addsvc.go b/examples/addsvc/cmd/addsvc/addsvc.go index 1adf84c44..b9701ef60 100644 --- a/examples/addsvc/cmd/addsvc/addsvc.go +++ b/examples/addsvc/cmd/addsvc/addsvc.go @@ -26,6 +26,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/prometheus" + kitgrpc "github.com/go-kit/kit/transport/grpc" addpb "github.com/go-kit/kit/examples/addsvc/pb" "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" @@ -216,7 +217,9 @@ func main() { } g.Add(func() error { logger.Log("transport", "gRPC", "addr", *grpcAddr) - baseServer := grpc.NewServer() + // we add the Go Kit gRPC Interceptor to our gRPC service as it is used by + // the here demonstrated zipkin tracing middleware. + baseServer := grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor)) addpb.RegisterAddServer(baseServer, grpcServer) return baseServer.Serve(grpcListener) }, func(error) { diff --git a/examples/addsvc/cmd/addsvc/wiring_test.go b/examples/addsvc/cmd/addsvc/wiring_test.go index ca64bac1f..f9b3551b2 100644 --- a/examples/addsvc/cmd/addsvc/wiring_test.go +++ b/examples/addsvc/cmd/addsvc/wiring_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/opentracing/opentracing-go" + zipkin "github.com/openzipkin/zipkin-go" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics/discard" @@ -18,9 +19,10 @@ import ( ) func TestHTTP(t *testing.T) { + zkt, _ := zipkin.NewTracer(nil, zipkin.WithNoopTracer(true)) svc := addservice.New(log.NewNopLogger(), discard.NewCounter(), discard.NewCounter()) - eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer()) - mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), log.NewNopLogger()) + eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer(), zkt) + mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), zkt, log.NewNopLogger()) srv := httptest.NewServer(mux) defer srv.Close() diff --git a/examples/addsvc/pkg/addendpoint/set.go b/examples/addsvc/pkg/addendpoint/set.go index b2c509142..8e42d6f31 100644 --- a/examples/addsvc/pkg/addendpoint/set.go +++ b/examples/addsvc/pkg/addendpoint/set.go @@ -38,7 +38,7 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, sumEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 1))(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint) sumEndpoint = opentracing.TraceServer(otTracer, "Sum")(sumEndpoint) - sumEndpoint = zipkin.TraceServer(zipkinTracer, "Sum")(sumEndpoint) + sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint) sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint) } @@ -48,7 +48,7 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, concatEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint) concatEndpoint = opentracing.TraceServer(otTracer, "Concat")(concatEndpoint) - concatEndpoint = zipkin.TraceServer(zipkinTracer, "Concat")(concatEndpoint) + concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint) concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint) } diff --git a/examples/addsvc/pkg/addtransport/grpc.go b/examples/addsvc/pkg/addtransport/grpc.go index 18a52dc3d..6f54219ed 100644 --- a/examples/addsvc/pkg/addtransport/grpc.go +++ b/examples/addsvc/pkg/addtransport/grpc.go @@ -33,27 +33,31 @@ type grpcServer struct { // NewGRPCServer makes a set of endpoints available as a gRPC AddServer. func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) pb.AddServer { + // Zipkin GRPC Server Trace can either be instantiated per endpoint with a + // provided operation name or a global tracing service can be instantiated + // without an operation name and fed to each Go kit endpoint as ServerOption. + // In the latter case, the operation name will be the endpoint's grpc method + // path. + // We demonstrate a global tracing service here. + zipkinServer := zipkin.GRPCServerTrace(zipkinTracer) + options := []grpctransport.ServerOption{ grpctransport.ServerErrorLogger(logger), + zipkinServer, } + return &grpcServer{ sum: grpctransport.NewServer( endpoints.SumEndpoint, decodeGRPCSumRequest, encodeGRPCSumResponse, - append(options, grpctransport.ServerBefore( - opentracing.GRPCToContext(otTracer, "Sum", logger), - zipkin.GRPCToContext(zipkinTracer, "Sum", logger), - ))..., + append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Sum", logger)))..., ), concat: grpctransport.NewServer( endpoints.ConcatEndpoint, decodeGRPCConcatRequest, encodeGRPCConcatResponse, - append(options, grpctransport.ServerBefore( - opentracing.GRPCToContext(otTracer, "Concat", logger), - zipkin.GRPCToContext(zipkinTracer, "Concat", logger), - ))..., + append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Concat", logger)))..., ), } } @@ -86,6 +90,18 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin // for the entire remote instance, too. limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) + // Zipkin GRPC Client Trace can either be instantiated per endpoint with a + // provided operation name or a global tracing client can be instantiated + // without an operation name and fed to each Go kit endpoint as ClientOption. + // In the latter case, the operation name will be the endpoint's grpc method + // path. + zipkinClient := zipkin.GRPCClientTrace(zipkinTracer) + + // global client middlewares + options := []grpctransport.ClientOption{ + zipkinClient, + } + // Each individual endpoint is an http/transport.Client (which implements // endpoint.Endpoint) that gets wrapped with various middlewares. If you // made your own client library, you'd do this work there, so your server @@ -99,13 +115,13 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin encodeGRPCSumRequest, decodeGRPCSumResponse, pb.SumReply{}, - grpctransport.ClientBefore( - opentracing.ContextToGRPC(otTracer, logger), - zipkin.ContextToGRPC(zipkinTracer, logger), - ), + append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))..., ).Endpoint() sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) - sumEndpoint = zipkin.TraceClient(zipkinTracer, "Sum")(sumEndpoint) + // For additional information TraceEndpoint is added as endpoint middleware. + // If instantiating per endpoint ClientTracers on the Go kit gRPC client, + // you might not want this additional middleware and thus could be omitted. + sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", @@ -124,13 +140,13 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin encodeGRPCConcatRequest, decodeGRPCConcatResponse, pb.ConcatReply{}, - grpctransport.ClientBefore( - opentracing.ContextToGRPC(otTracer, logger), - zipkin.ContextToGRPC(zipkinTracer, logger), - ), + append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))..., ).Endpoint() concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) - concatEndpoint = zipkin.TraceClient(zipkinTracer, "Concat")(concatEndpoint) + // For additional information TraceEndpoint is added as endpoint middleware. + // If instantiating per endpoint ClientTracers on the Go kit gRPC client, + // you might not want this additional middleware and thus could be omitted. + concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", diff --git a/examples/addsvc/pkg/addtransport/http.go b/examples/addsvc/pkg/addtransport/http.go index ebc5cc74e..0eb4e7358 100644 --- a/examples/addsvc/pkg/addtransport/http.go +++ b/examples/addsvc/pkg/addtransport/http.go @@ -32,28 +32,31 @@ import ( // NewHTTPHandler returns an HTTP handler that makes a set of endpoints // available on predefined paths. func NewHTTPHandler(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) http.Handler { + // Zipkin HTTP Server Trace can either be instantiated per endpoint with a + // provided operation name or a global tracing service can be instantiated + // without an operation name and fed to each Go kit endpoint as ServerOption. + // In the latter case, the operation name will be the endpoint's http method. + // We demonstrate a global tracing service here. + zipkinServer := zipkin.HTTPServerTrace(zipkinTracer) + options := []httptransport.ServerOption{ httptransport.ServerErrorEncoder(errorEncoder), httptransport.ServerErrorLogger(logger), + zipkinServer, } + m := http.NewServeMux() m.Handle("/sum", httptransport.NewServer( endpoints.SumEndpoint, decodeHTTPSumRequest, encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore( - opentracing.HTTPToContext(otTracer, "Sum", logger), - zipkin.HTTPToContext(zipkinTracer, "Sum", logger), - ))..., + append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Sum", logger)))..., )) m.Handle("/concat", httptransport.NewServer( endpoints.ConcatEndpoint, decodeHTTPConcatRequest, encodeHTTPGenericResponse, - append(options, httptransport.ServerBefore( - opentracing.HTTPToContext(otTracer, "Concat", logger), - zipkin.HTTPToContext(zipkinTracer, "Concat", logger), - ))..., + append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Concat", logger)))..., )) return m } @@ -79,6 +82,17 @@ func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer // for the entire remote instance, too. limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) + // Zipkin HTTP Client Trace can either be instantiated per endpoint with a + // provided operation name or a global tracing client can be instantiated + // without an operation name and fed to each Go kit endpoint as ClientOption. + // In the latter case, the operation name will be the endpoint's http method. + zipkinClient := zipkin.HTTPClientTrace(zipkinTracer) + + // global client middlewares + options := []httptransport.ClientOption{ + zipkinClient, + } + // Each individual endpoint is an http/transport.Client (which implements // endpoint.Endpoint) that gets wrapped with various middlewares. If you // made your own client library, you'd do this work there, so your server @@ -90,13 +104,10 @@ func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer copyURL(u, "/sum"), encodeHTTPGenericRequest, decodeHTTPSumResponse, - httptransport.ClientBefore( - opentracing.ContextToHTTP(otTracer, logger), - zipkin.ContextToHTTP(zipkinTracer, logger), - ), + append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))..., ).Endpoint() sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) - sumEndpoint = zipkin.TraceClient(zipkinTracer, "Sum")(sumEndpoint) + sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", @@ -113,13 +124,10 @@ func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer copyURL(u, "/concat"), encodeHTTPGenericRequest, decodeHTTPConcatResponse, - httptransport.ClientBefore( - opentracing.ContextToHTTP(otTracer, logger), - zipkin.ContextToHTTP(zipkinTracer, logger), - ), + append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))..., ).Endpoint() concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) - concatEndpoint = zipkin.TraceClient(zipkinTracer, "Concat")(concatEndpoint) + concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", diff --git a/examples/apigateway/main.go b/examples/apigateway/main.go index 3b4cd675e..5891241d1 100644 --- a/examples/apigateway/main.go +++ b/examples/apigateway/main.go @@ -20,6 +20,7 @@ import ( "github.com/gorilla/mux" "github.com/hashicorp/consul/api" stdopentracing "github.com/opentracing/opentracing-go" + stdzipkin "github.com/openzipkin/zipkin-go" "google.golang.org/grpc" "github.com/go-kit/kit/endpoint" @@ -67,6 +68,7 @@ func main() { // Transport domain. tracer := stdopentracing.GlobalTracer() // no-op + zipkinTracer, _ := stdzipkin.NewTracer(nil, stdzipkin.WithNoopTracer(true)) ctx := context.Background() r := mux.NewRouter() @@ -88,14 +90,14 @@ func main() { instancer = consulsd.NewInstancer(client, logger, "addsvc", tags, passingOnly) ) { - factory := addsvcFactory(addendpoint.MakeSumEndpoint, tracer, logger) + factory := addsvcFactory(addendpoint.MakeSumEndpoint, tracer, zipkinTracer, logger) endpointer := sd.NewEndpointer(instancer, factory, logger) balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(*retryMax, *retryTimeout, balancer) endpoints.SumEndpoint = retry } { - factory := addsvcFactory(addendpoint.MakeConcatEndpoint, tracer, logger) + factory := addsvcFactory(addendpoint.MakeConcatEndpoint, tracer, zipkinTracer, logger) endpointer := sd.NewEndpointer(instancer, factory, logger) balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(*retryMax, *retryTimeout, balancer) @@ -106,7 +108,7 @@ func main() { // HTTP handler, and just install it under a particular path prefix in // our router. - r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addtransport.NewHTTPHandler(endpoints, tracer, logger))) + r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addtransport.NewHTTPHandler(endpoints, tracer, zipkinTracer, logger))) } // stringsvc routes. @@ -165,7 +167,7 @@ func main() { logger.Log("exit", <-errc) } -func addsvcFactory(makeEndpoint func(addservice.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, logger log.Logger) sd.Factory { +func addsvcFactory(makeEndpoint func(addservice.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) sd.Factory { return func(instance string) (endpoint.Endpoint, io.Closer, error) { // We could just as easily use the HTTP or Thrift client package to make // the connection to addsvc. We've chosen gRPC arbitrarily. Note that @@ -176,7 +178,7 @@ func addsvcFactory(makeEndpoint func(addservice.Service) endpoint.Endpoint, trac if err != nil { return nil, nil, err } - service := addtransport.NewGRPCClient(conn, tracer, logger) + service := addtransport.NewGRPCClient(conn, tracer, zipkinTracer, logger) endpoint := makeEndpoint(service) // Notice that the addsvc gRPC client converts the connection to a diff --git a/tracing/zipkin/endpoint.go b/tracing/zipkin/endpoint.go index 324de66c1..e004bf24f 100644 --- a/tracing/zipkin/endpoint.go +++ b/tracing/zipkin/endpoint.go @@ -9,41 +9,20 @@ import ( "github.com/go-kit/kit/endpoint" ) -// TraceServer returns a Middleware that wraps the `next` Endpoint in a Zipkin -// Span called `operationName`. -// -// If `ctx` already has a Span, it is re-used and the operation name is -// overwritten. If `ctx` does not yet have a Span, one is created here. -func TraceServer(tracer *zipkin.Tracer, operationName string) endpoint.Middleware { +// TraceEndpoint returns an Endpoint middleware, tracing a Go kit endpoint. +// This endpoint tracer should be used in combination with a Go kit Transport +// tracing middleware or custom before and after transport functions as +// propagation of SpanContext is not provided in this middleware. +func TraceEndpoint(tracer *zipkin.Tracer, name string) endpoint.Middleware { return func(next endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { - var sp zipkin.Span - // try to retrieve Span from Go context, create new Span if not found. - if sp = zipkin.SpanFromContext(ctx); sp == nil { - sp = tracer.StartSpan(operationName, zipkin.Kind(model.Server)) - ctx = zipkin.NewContext(ctx, sp) - } else { - sp.SetName(operationName) - } - defer sp.Finish() - return next(ctx, request) - } - } -} - -// TraceClient returns a Middleware that wraps the `next` Endpoint in a Zipkin -// Span called `operationName`. -func TraceClient(tracer *zipkin.Tracer, operationName string) endpoint.Middleware { - return func(next endpoint.Endpoint) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - var spanOpts = []zipkin.SpanOption{zipkin.Kind(model.Client)} - // try to retrieve Span from Go context, use its SpanContext if found. + var sc model.SpanContext if parentSpan := zipkin.SpanFromContext(ctx); parentSpan != nil { - spanOpts = append(spanOpts, zipkin.Parent(parentSpan.Context())) + sc = parentSpan.Context() } - // create new client span (if sc is empty, Parent is a noop) - sp := tracer.StartSpan(operationName, spanOpts...) + sp := tracer.StartSpan(name, zipkin.Parent(sc)) defer sp.Finish() + ctx = zipkin.NewContext(ctx, sp) return next(ctx, request) } diff --git a/tracing/zipkin/endpoint_test.go b/tracing/zipkin/endpoint_test.go deleted file mode 100644 index b3a817b08..000000000 --- a/tracing/zipkin/endpoint_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package zipkin_test - -import ( - "context" - "testing" - - "github.com/go-kit/kit/endpoint" - zipkin "github.com/openzipkin/zipkin-go" - "github.com/openzipkin/zipkin-go/reporter/recorder" - - kitzipkin "github.com/go-kit/kit/tracing/zipkin" -) - -func TestTraceServer(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - // Initialize the ctx with a nameless Span. - contextSpan := tracer.StartSpan("") - ctx := zipkin.NewContext(context.Background(), contextSpan) - - tracedEndpoint := kitzipkin.TraceServer(tracer, "testOp")(endpoint.Nop) - if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { - t.Fatal(err) - } - - finishedSpans := reporter.Flush() - if want, have := 1, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - // Test that the op name is updated - endpointSpan := finishedSpans[0] - if want, have := "testOp", endpointSpan.Name; want != have { - t.Fatalf("Want %q, have %q", want, have) - } - contextContext := contextSpan.Context() - endpointContext := endpointSpan.SpanContext - // ...and that the ID is unmodified. - if want, have := contextContext.ID, endpointContext.ID; want != have { - t.Errorf("Want SpanID %q, have %q", want, have) - } -} - -func TestTraceServerNoContextSpan(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - // Empty/background context. - tracedEndpoint := kitzipkin.TraceServer(tracer, "testOp")(endpoint.Nop) - if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil { - t.Fatal(err) - } - - // tracedEndpoint created a new Span. - finishedSpans := reporter.Flush() - if want, have := 1, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - endpointSpan := finishedSpans[0] - if want, have := "testOp", endpointSpan.Name; want != have { - t.Fatalf("Want %q, have %q", want, have) - } -} - -func TestTraceClient(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - // Initialize the ctx with a parent Span. - parentSpan := tracer.StartSpan("parent") - defer parentSpan.Finish() - ctx := zipkin.NewContext(context.Background(), parentSpan) - - tracedEndpoint := kitzipkin.TraceClient(tracer, "testOp")(endpoint.Nop) - if _, err := tracedEndpoint(ctx, struct{}{}); err != nil { - t.Fatal(err) - } - - // tracedEndpoint created a new Span. - finishedSpans := reporter.Flush() - if want, have := 1, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - endpointSpan := finishedSpans[0] - if want, have := "testOp", endpointSpan.Name; want != have { - t.Fatalf("Want %q, have %q", want, have) - } - - parentContext := parentSpan.Context() - endpointContext := parentSpan.Context() - - // ... and that the parent ID is set appropriately. - if want, have := parentContext.ID, endpointContext.ID; want != have { - t.Errorf("Want ParentID %q, have %q", want, have) - } -} - -func TestTraceClientNoContextSpan(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - // Empty/background context. - tracedEndpoint := kitzipkin.TraceClient(tracer, "testOp")(endpoint.Nop) - if _, err := tracedEndpoint(context.Background(), struct{}{}); err != nil { - t.Fatal(err) - } - - // tracedEndpoint created a new Span. - finishedSpans := reporter.Flush() - if want, have := 1, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - endpointSpan := finishedSpans[0] - if want, have := "testOp", endpointSpan.Name; want != have { - t.Fatalf("Want %q, have %q", want, have) - } -} diff --git a/tracing/zipkin/grpc.go b/tracing/zipkin/grpc.go index 18a66937f..e582ad0e7 100644 --- a/tracing/zipkin/grpc.go +++ b/tracing/zipkin/grpc.go @@ -2,39 +2,190 @@ package zipkin import ( "context" + "strconv" zipkin "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/model" "github.com/openzipkin/zipkin-go/propagation/b3" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "github.com/go-kit/kit/log" + kitgrpc "github.com/go-kit/kit/transport/grpc" ) -// ContextToGRPC returns a grpc RequestFunc that injects a Zipkin Span found in -// `ctx` into the grpc Metadata. If no such Span can be found, the RequestFunc -// is a noop. -func ContextToGRPC(tracer *zipkin.Tracer, logger log.Logger) func(context.Context, *metadata.MD) context.Context { - return func(ctx context.Context, md *metadata.MD) context.Context { - if span := zipkin.SpanFromContext(ctx); span != nil { - // There's nothing we can do with an error here. - if err := b3.InjectGRPC(md)(span.Context()); err != nil { - logger.Log("err", err) - } - } - return ctx +// GRPCClientTrace enables Zipkin tracing of a Go kit gRPC Client Transport. +func GRPCClientTrace(tracer *zipkin.Tracer, options ...Option) kitgrpc.ClientOption { + config := tracerOptions{ + tags: make(map[string]string), + name: "", + logger: log.NewNopLogger(), + propagate: true, } + + for _, option := range options { + option(&config) + } + + clientBefore := kitgrpc.ClientBefore( + func(ctx context.Context, md *metadata.MD) context.Context { + var ( + spanContext model.SpanContext + name string + ) + + if config.name != "" { + name = config.name + } else { + name = ctx.Value(kitgrpc.ContextKeyRequestMethod).(string) + } + + if parent := zipkin.SpanFromContext(ctx); parent != nil { + spanContext = parent.Context() + } + + span := tracer.StartSpan( + name, + zipkin.Kind(model.Client), + zipkin.Tags(config.tags), + zipkin.Parent(spanContext), + zipkin.FlushOnFinish(false), + ) + + if config.propagate { + if err := b3.InjectGRPC(md)(span.Context()); err != nil { + config.logger.Log("err", err) + } + } + + return zipkin.NewContext(ctx, span) + }, + ) + + clientAfter := kitgrpc.ClientAfter( + func(ctx context.Context, _ metadata.MD, _ metadata.MD) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + span.Finish() + } + + return ctx + }, + ) + + clientFinalizer := kitgrpc.ClientFinalizer( + func(ctx context.Context, err error) { + if span := zipkin.SpanFromContext(ctx); span != nil { + if err != nil { + zipkin.TagError.Set(span, err.Error()) + } + // calling span.Finish() a second time is a noop, if we didn't get to + // ClientAfter we can at least time the early bail out by calling it + // here. + span.Finish() + // send span to the Reporter + span.Flush() + } + }, + ) + + return func(c *kitgrpc.Client) { + clientBefore(c) + clientAfter(c) + clientFinalizer(c) + } + } -// GRPCToContext returns a grpc RequestFunc that tries to join with a Zipkin -// trace found in `req` and starts a new Span called `operationName` -// accordingly. If no trace could be found in `req`, the Span -// will be a trace root. The Span is incorporated in the returned Context and -// can be retrieved with zipkin.SpanFromContext(ctx). -func GRPCToContext(tracer *zipkin.Tracer, operationName string, logger log.Logger) func(ctx context.Context, md metadata.MD) context.Context { - return func(ctx context.Context, md metadata.MD) context.Context { - spanContext := tracer.Extract(b3.ExtractGRPC(&md)) - span := tracer.StartSpan(operationName, zipkin.Kind(model.Server), zipkin.Parent(spanContext)) - return zipkin.NewContext(ctx, span) +// GRPCServerTrace enables Zipkin tracing of a Go kit gRPC Server Transport. +func GRPCServerTrace(tracer *zipkin.Tracer, options ...Option) kitgrpc.ServerOption { + config := tracerOptions{ + tags: make(map[string]string), + name: "", + logger: log.NewNopLogger(), + propagate: true, + } + + for _, option := range options { + option(&config) + } + + serverBefore := kitgrpc.ServerBefore( + func(ctx context.Context, md metadata.MD) context.Context { + var ( + spanContext model.SpanContext + name string + tags = make(map[string]string) + ) + + rpcMethod, ok := ctx.Value(kitgrpc.ContextKeyRequestMethod).(string) + if !ok { + config.logger.Log("unable to retrieve method name: missing gRPC interceptor hook") + } else { + tags["grpc.method"] = rpcMethod + } + + if config.name != "" { + name = config.name + } else { + name = rpcMethod + } + + if config.propagate { + spanContext = tracer.Extract(b3.ExtractGRPC(&md)) + if spanContext.Err != nil { + config.logger.Log("err", spanContext.Err) + } + } + + span := tracer.StartSpan( + name, + zipkin.Kind(model.Server), + zipkin.Tags(config.tags), + zipkin.Tags(tags), + zipkin.Parent(spanContext), + zipkin.FlushOnFinish(false), + ) + + return zipkin.NewContext(ctx, span) + }, + ) + + serverAfter := kitgrpc.ServerAfter( + func(ctx context.Context, _ *metadata.MD, _ *metadata.MD) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + span.Finish() + } + + return ctx + }, + ) + + serverFinalizer := kitgrpc.ServerFinalizer( + func(ctx context.Context, err error) { + if span := zipkin.SpanFromContext(ctx); span != nil { + if err != nil { + if status, ok := status.FromError(err); ok { + statusCode := strconv.FormatUint(uint64(status.Code()), 10) + zipkin.TagGRPCStatusCode.Set(span, statusCode) + zipkin.TagError.Set(span, status.Message()) + } else { + zipkin.TagError.Set(span, err.Error()) + } + } + + // calling span.Finish() a second time is a noop, if we didn't get to + // ServerAfter we can at least time the early bail out by calling it + // here. + span.Finish() + // send span to the Reporter + span.Flush() + } + }, + ) + + return func(s *kitgrpc.Server) { + serverBefore(s) + serverAfter(s) + serverFinalizer(s) } } diff --git a/tracing/zipkin/grpc_test.go b/tracing/zipkin/grpc_test.go deleted file mode 100644 index 3b2326f71..000000000 --- a/tracing/zipkin/grpc_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package zipkin_test - -import ( - "context" - "testing" - - zipkin "github.com/openzipkin/zipkin-go" - "github.com/openzipkin/zipkin-go/reporter/recorder" - "google.golang.org/grpc/metadata" - - "github.com/go-kit/kit/log" - kitzipkin "github.com/go-kit/kit/tracing/zipkin" -) - -func TestTraceGRPCRequestRoundtrip(t *testing.T) { - logger := log.NewNopLogger() - reporter := recorder.NewReporter() - defer reporter.Close() - - // we disable shared rpc spans so we can test parent-child relationship - tracer, _ := zipkin.NewTracer(reporter, zipkin.WithSharedSpans(false)) - - // Initialize the ctx with a Span to inject. - beforeSpan := tracer.StartSpan("to_inject") - beforeCtx := zipkin.NewContext(context.Background(), beforeSpan) - - toGRPCFunc := kitzipkin.ContextToGRPC(tracer, logger) - md := metadata.Pairs() - // Call the RequestFunc. - afterCtx := toGRPCFunc(beforeCtx, &md) - - // The Span should not have changed. - afterSpan := zipkin.SpanFromContext(afterCtx) - if beforeSpan != afterSpan { - t.Error("Should not swap in a new span") - } - - // No spans should have finished yet. - finishedSpans := reporter.Flush() - if want, have := 0, len(finishedSpans); want != have { - t.Errorf("Want %v span(s), found %v", want, have) - } - - // Use GRPCToContext to verify that we can join with the trace given MD. - fromGRPCFunc := kitzipkin.GRPCToContext(tracer, "joined", logger) - joinCtx := fromGRPCFunc(afterCtx, md) - joinedSpan := zipkin.SpanFromContext(joinCtx) - - joinedSpan.Finish() - beforeSpan.Finish() - - finishedSpans = reporter.Flush() - if want, have := 2, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - joined := finishedSpans[0] - before := finishedSpans[1] - - if joined.SpanContext.ID == before.SpanContext.ID { - t.Error("Span.ID should have changed", joined.SpanContext.ID, before.SpanContext.ID) - } - - // Check that the parent/child relationship is as expected for the joined span. - if joined.SpanContext.ParentID == nil { - t.Fatalf("Want ParentID %q, have nil", before.SpanContext.ID) - } - if want, have := before.SpanContext.ID, *joined.SpanContext.ParentID; want != have { - t.Errorf("Want ParentID %q, have %q", want, have) - } - if want, have := "joined", joined.Name; want != have { - t.Errorf("Want %q, have %q", want, have) - } -} diff --git a/tracing/zipkin/http.go b/tracing/zipkin/http.go index c1cc0a197..d17f39d56 100644 --- a/tracing/zipkin/http.go +++ b/tracing/zipkin/http.go @@ -3,6 +3,7 @@ package zipkin import ( "context" "net/http" + "strconv" zipkin "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/model" @@ -12,41 +13,180 @@ import ( kithttp "github.com/go-kit/kit/transport/http" ) -// ContextToHTTP returns an http RequestFunc that injects a Zipkin Span found -// in `ctx` into the http headers. If no such Span can be found, the RequestFunc -// is a noop. -func ContextToHTTP(tracer *zipkin.Tracer, logger log.Logger) kithttp.RequestFunc { - return func(ctx context.Context, req *http.Request) context.Context { - if span := zipkin.SpanFromContext(ctx); span != nil { - // add some common Zipkin Tags - zipkin.TagHTTPMethod.Set(span, req.Method) - zipkin.TagHTTPUrl.Set(span, req.URL.String()) - if endpoint, err := zipkin.NewEndpoint("", req.URL.Host); err == nil { - span.SetRemoteEndpoint(endpoint) - } - // There's nothing we can do with any errors here. - if err := b3.InjectHTTP(req)(span.Context()); err != nil { - logger.Log("err", err) - } - } - return ctx +// HTTPClientTrace enables Zipkin tracing of a Go kit HTTP Client Transport. +func HTTPClientTrace(tracer *zipkin.Tracer, options ...Option) kithttp.ClientOption { + config := tracerOptions{ + tags: make(map[string]string), + name: "", + logger: log.NewNopLogger(), + propagate: true, + } + + for _, option := range options { + option(&config) + } + + clientBefore := kithttp.ClientBefore( + func(ctx context.Context, req *http.Request) context.Context { + var ( + spanContext model.SpanContext + name string + ) + + if config.name != "" { + name = config.name + } else { + name = req.Method + } + + if parent := zipkin.SpanFromContext(ctx); parent != nil { + spanContext = parent.Context() + } + + tags := map[string]string{ + string(zipkin.TagHTTPMethod): req.Method, + string(zipkin.TagHTTPUrl): req.URL.String(), + } + + span := tracer.StartSpan( + name, + zipkin.Kind(model.Client), + zipkin.Tags(config.tags), + zipkin.Tags(tags), + zipkin.Parent(spanContext), + zipkin.FlushOnFinish(false), + ) + + if config.propagate { + if err := b3.InjectHTTP(req)(span.Context()); err != nil { + config.logger.Log("err", err) + } + } + + return zipkin.NewContext(ctx, span) + }, + ) + + clientAfter := kithttp.ClientAfter( + func(ctx context.Context, res *http.Response) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + zipkin.TagHTTPResponseSize.Set(span, strconv.FormatInt(res.ContentLength, 10)) + zipkin.TagHTTPStatusCode.Set(span, strconv.Itoa(res.StatusCode)) + if res.StatusCode > 399 { + zipkin.TagError.Set(span, strconv.Itoa(res.StatusCode)) + } + span.Finish() + } + + return ctx + }, + ) + + clientFinalizer := kithttp.ClientFinalizer( + func(ctx context.Context, err error) { + if span := zipkin.SpanFromContext(ctx); span != nil { + if err != nil { + zipkin.TagError.Set(span, err.Error()) + } + // calling span.Finish() a second time is a noop, if we didn't get to + // ClientAfter we can at least time the early bail out by calling it + // here. + span.Finish() + // send span to the Reporter + span.Flush() + } + }, + ) + + return func(c *kithttp.Client) { + clientBefore(c) + clientAfter(c) + clientFinalizer(c) } } -// HTTPToContext returns an http RequestFunc that tries to join with a Zipkin -// trace found in `req` and starts a new Span called `operationName` -// accordingly. If no trace could be found in `req`, the Span will be a trace -// root. The Span is incorporated in the returned Context and can be retrieved -// with zipkin.SpanFromContext(ctx). -func HTTPToContext(tracer *zipkin.Tracer, operationName string, logger log.Logger) kithttp.RequestFunc { - return func(ctx context.Context, req *http.Request) context.Context { - spanContext := tracer.Extract(b3.ExtractHTTP(req)) - span := tracer.StartSpan( - operationName, zipkin.Kind(model.Server), zipkin.Parent(spanContext), - ) - // add some common Zipkin Tags - zipkin.TagHTTPMethod.Set(span, req.Method) - zipkin.TagHTTPUrl.Set(span, req.URL.String()) - return zipkin.NewContext(ctx, span) +// HTTPServerTrace enables Zipkin tracing of a Go kit HTTP Server Transport. +func HTTPServerTrace(tracer *zipkin.Tracer, options ...Option) kithttp.ServerOption { + config := tracerOptions{ + tags: make(map[string]string), + name: "", + logger: log.NewNopLogger(), + propagate: true, + } + + for _, option := range options { + option(&config) + } + + serverBefore := kithttp.ServerBefore( + func(ctx context.Context, req *http.Request) context.Context { + var ( + spanContext model.SpanContext + name string + ) + + if config.name != "" { + name = config.name + } else { + name = req.Method + } + + if config.propagate { + spanContext = tracer.Extract(b3.ExtractHTTP(req)) + if spanContext.Err != nil { + config.logger.Log("err", spanContext.Err) + } + } + + tags := map[string]string{ + string(zipkin.TagHTTPMethod): req.Method, + string(zipkin.TagHTTPPath): req.URL.Path, + } + + span := tracer.StartSpan( + name, + zipkin.Kind(model.Server), + zipkin.Tags(config.tags), + zipkin.Tags(tags), + zipkin.Parent(spanContext), + zipkin.FlushOnFinish(false), + ) + + return zipkin.NewContext(ctx, span) + }, + ) + + serverAfter := kithttp.ServerAfter( + func(ctx context.Context, _ http.ResponseWriter) context.Context { + if span := zipkin.SpanFromContext(ctx); span != nil { + span.Finish() + } + + return ctx + }, + ) + + serverFinalizer := kithttp.ServerFinalizer( + func(ctx context.Context, code int, r *http.Request) { + if span := zipkin.SpanFromContext(ctx); span != nil { + zipkin.TagHTTPStatusCode.Set(span, strconv.Itoa(code)) + if rs, ok := ctx.Value(kithttp.ContextKeyResponseSize).(int64); ok { + zipkin.TagHTTPResponseSize.Set(span, strconv.FormatInt(rs, 10)) + } + + // calling span.Finish() a second time is a noop, if we didn't get to + // ServerAfter we can at least time the early bail out by calling it + // here. + span.Finish() + // send span to the Reporter + span.Flush() + } + }, + ) + + return func(s *kithttp.Server) { + serverBefore(s) + serverAfter(s) + serverFinalizer(s) } } diff --git a/tracing/zipkin/http_test.go b/tracing/zipkin/http_test.go deleted file mode 100644 index 99afacfdc..000000000 --- a/tracing/zipkin/http_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package zipkin_test - -import ( - "context" - "net/http" - "reflect" - "testing" - - zipkin "github.com/openzipkin/zipkin-go" - "github.com/openzipkin/zipkin-go/propagation/b3" - "github.com/openzipkin/zipkin-go/reporter/recorder" - - "github.com/go-kit/kit/log" - kitzipkin "github.com/go-kit/kit/tracing/zipkin" -) - -func TestTraceHTTPRequestRoundtrip(t *testing.T) { - logger := log.NewNopLogger() - reporter := recorder.NewReporter() - defer reporter.Close() - - // we disable shared rpc spans so we can test parent-child relationship - tracer, _ := zipkin.NewTracer(reporter, zipkin.WithSharedSpans(false)) - - // Initialize the ctx with a Span to inject. - beforeSpan := tracer.StartSpan("to_inject") - beforeCtx := zipkin.NewContext(context.Background(), beforeSpan) - - toHTTPFunc := kitzipkin.ContextToHTTP(tracer, logger) - req, _ := http.NewRequest("GET", "http://test.biz/path", nil) - // Call the RequestFunc. - afterCtx := toHTTPFunc(beforeCtx, req) - - // The Span should not have changed. - afterSpan := zipkin.SpanFromContext(afterCtx) - if beforeSpan != afterSpan { - t.Error("Should not swap in a new span") - } - - // No spans should have finished yet. - finishedSpans := reporter.Flush() - if want, have := 0, len(finishedSpans); want != have { - t.Errorf("Want %v span(s), found %v", want, have) - } - - // Use HTTPToContext to verify that we can join with the trace given a req. - fromHTTPFunc := kitzipkin.HTTPToContext(tracer, "joined", logger) - joinCtx := fromHTTPFunc(afterCtx, req) - joinedSpan := zipkin.SpanFromContext(joinCtx) - - joinedSpan.Finish() - beforeSpan.Finish() - - finishedSpans = reporter.Flush() - if want, have := 2, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - joined := finishedSpans[0] - before := finishedSpans[1] - - if joined.SpanContext.ID == before.SpanContext.ID { - t.Error("SpanID should have changed", joined.SpanContext.ID, before.SpanContext.ID) - } - - // Check that the parent/child relationship is as expected for the joined span. - if joined.SpanContext.ParentID == nil { - t.Fatalf("Want ParentID %q, have nil", before.SpanContext.ID) - } - if want, have := before.SpanContext.ID, *joined.SpanContext.ParentID; want != have { - t.Errorf("Want ParentID %q, have %q", want, have) - } - if want, have := "joined", joined.Name; want != have { - t.Errorf("Want %q, have %q", want, have) - } -} - -func TestContextToHTTPTags(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - span := tracer.StartSpan("to_inject") - ctx := zipkin.NewContext(context.Background(), span) - req, _ := http.NewRequest("GET", "http://test.biz/path", nil) - - kitzipkin.ContextToHTTP(tracer, log.NewNopLogger())(ctx, req) - - expectedTags := map[string]string{ - string(zipkin.TagHTTPMethod): "GET", - string(zipkin.TagHTTPUrl): "http://test.biz/path", - } - - span.Finish() - - finishedSpans := reporter.Flush() - if want, have := 1, len(finishedSpans); want != have { - t.Fatalf("Want %v span(s), found %v", want, have) - } - - if !reflect.DeepEqual(expectedTags, finishedSpans[0].Tags) { - t.Errorf("Want %q, have %q", expectedTags, finishedSpans[0].Tags) - } -} - -func TestHTTPToContextTags(t *testing.T) { - reporter := recorder.NewReporter() - defer reporter.Close() - tracer, _ := zipkin.NewTracer(reporter) - - parentSpan := tracer.StartSpan("to_extract") - defer parentSpan.Finish() - - req, _ := http.NewRequest("GET", "http://test.biz/path", nil) - b3.InjectHTTP(req)(parentSpan.Context()) - - ctx := kitzipkin.HTTPToContext(tracer, "op", log.NewNopLogger())(context.Background(), req) - zipkin.SpanFromContext(ctx).Finish() - - childSpan := reporter.Flush()[0] - expectedTags := map[string]string{ - string(zipkin.TagHTTPMethod): "GET", - string(zipkin.TagHTTPUrl): "http://test.biz/path", - } - if !reflect.DeepEqual(expectedTags, childSpan.Tags) { - t.Errorf("Want %q, have %q", expectedTags, childSpan.Tags) - } - if want, have := "op", childSpan.Name; want != have { - t.Errorf("Want %q, have %q", want, have) - } -} diff --git a/tracing/zipkin/options.go b/tracing/zipkin/options.go new file mode 100644 index 000000000..073a94040 --- /dev/null +++ b/tracing/zipkin/options.go @@ -0,0 +1,54 @@ +package zipkin + +import "github.com/go-kit/kit/log" + +// Option allows for functional options to our Zipkin tracing middleware. +type Option func(o *tracerOptions) + +// Name sets the name for an instrumented transport endpoint. If name is omitted +// at tracing middleware creation, the method of the transport or transport rpc +// name is used. +func Name(name string) Option { + return func(o *tracerOptions) { + o.name = name + } +} + +// Tags adds default tags to our Zipkin transport spans. +func Tags(tags map[string]string) Option { + return func(o *tracerOptions) { + for k, v := range tags { + o.tags[k] = v + } + } +} + +// Logger adds a Go kit logger to our Zipkin Middleware to log SpanContext +// extract / inject errors if they occur. Default is Noop. +func Logger(logger log.Logger) Option { + return func(o *tracerOptions) { + if logger != nil { + o.logger = logger + } + } +} + +// AllowPropagation instructs the tracer to allow or deny propagation of the +// span context between this instrumented client or service and its peers. If +// the instrumented client connects to services outside its own platform or if +// the instrumented service receives requests from untrusted clients it is +// strongly advised to disallow propagation. Propagation between services inside +// your own platform benefit from propagation. Default for both TraceClient and +// TraceServer is to allow propagation. +func AllowPropagation(propagate bool) Option { + return func(o *tracerOptions) { + o.propagate = propagate + } +} + +type tracerOptions struct { + tags map[string]string + name string + logger log.Logger + propagate bool +} diff --git a/transport/grpc/client.go b/transport/grpc/client.go index 28c203f82..5d96c6b4d 100644 --- a/transport/grpc/client.go +++ b/transport/grpc/client.go @@ -22,6 +22,7 @@ type Client struct { grpcReply reflect.Type before []ClientRequestFunc after []ClientResponseFunc + finalizer []ClientFinalizerFunc } // NewClient constructs a usable Client for a single remote endpoint. @@ -75,13 +76,29 @@ func ClientAfter(after ...ClientResponseFunc) ClientOption { return func(c *Client) { c.after = append(c.after, after...) } } +// ClientFinalizer is executed at the end of every gRPC request. +// By default, no finalizer is registered. +func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption { + return func(s *Client) { s.finalizer = append(s.finalizer, f...) } +} + // Endpoint returns a usable endpoint that will invoke the gRPC specified by the // client. func (c Client) Endpoint() endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { ctx, cancel := context.WithCancel(ctx) defer cancel() + if c.finalizer != nil { + defer func() { + for _, f := range c.finalizer { + f(ctx, err) + } + }() + } + + ctx = context.WithValue(ctx, ContextKeyRequestMethod, c.method) + req, err := c.enc(ctx, request) if err != nil { return nil, err @@ -95,9 +112,9 @@ func (c Client) Endpoint() endpoint.Endpoint { var header, trailer metadata.MD grpcReply := reflect.New(c.grpcReply).Interface() - if err = grpc.Invoke( - ctx, c.method, req, grpcReply, c.client, - grpc.Header(&header), grpc.Trailer(&trailer), + if err = c.client.Invoke( + ctx, c.method, req, grpcReply, grpc.Header(&header), + grpc.Trailer(&trailer), ); err != nil { return nil, err } @@ -106,10 +123,18 @@ func (c Client) Endpoint() endpoint.Endpoint { ctx = f(ctx, header, trailer) } - response, err := c.dec(ctx, grpcReply) + response, err = c.dec(ctx, grpcReply) if err != nil { return nil, err } return response, nil } } + +// ClientFinalizerFunc can be used to perform work at the end of a client gRPC +// request, after the response is returned. The principal +// intended use is for error logging. Additional response parameters are +// provided in the context under keys with the ContextKeyResponse prefix. +// Note: err may be nil. There maybe also no additional response parameters depending on +// when an error occurs. +type ClientFinalizerFunc func(ctx context.Context, err error) diff --git a/transport/grpc/request_response_funcs.go b/transport/grpc/request_response_funcs.go index 8d072ede7..a797d3df3 100644 --- a/transport/grpc/request_response_funcs.go +++ b/transport/grpc/request_response_funcs.go @@ -74,3 +74,9 @@ func EncodeKeyValue(key, val string) (string, string) { } return key, val } + +type contextKey int + +const ( + ContextKeyRequestMethod contextKey = iota +) diff --git a/transport/grpc/server.go b/transport/grpc/server.go index 6da4bdb97..178c3f3b8 100644 --- a/transport/grpc/server.go +++ b/transport/grpc/server.go @@ -1,6 +1,8 @@ package grpc import ( + "context" + oldcontext "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -18,12 +20,13 @@ type Handler interface { // Server wraps an endpoint and implements grpc.Handler. type Server struct { - e endpoint.Endpoint - dec DecodeRequestFunc - enc EncodeResponseFunc - before []ServerRequestFunc - after []ServerResponseFunc - logger log.Logger + e endpoint.Endpoint + dec DecodeRequestFunc + enc EncodeResponseFunc + before []ServerRequestFunc + after []ServerResponseFunc + finalizer []ServerFinalizerFunc + logger log.Logger } // NewServer constructs a new server, which implements wraps the provided @@ -70,25 +73,45 @@ func ServerErrorLogger(logger log.Logger) ServerOption { return func(s *Server) { s.logger = logger } } +// ServerFinalizer is executed at the end of every gRPC request. +// By default, no finalizer is registered. +func ServerFinalizer(f ...ServerFinalizerFunc) ServerOption { + return func(s *Server) { s.finalizer = append(s.finalizer, f...) } +} + // ServeGRPC implements the Handler interface. -func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (oldcontext.Context, interface{}, error) { +func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (retctx oldcontext.Context, resp interface{}, err error) { // Retrieve gRPC metadata. md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.MD{} } + if s.finalizer != nil { + defer func() { + for _, f := range s.finalizer { + f(ctx, err) + } + }() + } + for _, f := range s.before { ctx = f(ctx, md) } - request, err := s.dec(ctx, req) + var ( + request interface{} + response interface{} + grpcResp interface{} + ) + + request, err = s.dec(ctx, req) if err != nil { s.logger.Log("err", err) return ctx, nil, err } - response, err := s.e(ctx, request) + response, err = s.e(ctx, request) if err != nil { s.logger.Log("err", err) return ctx, nil, err @@ -99,7 +122,7 @@ func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (oldcontext.C ctx = f(ctx, &mdHeader, &mdTrailer) } - grpcResp, err := s.enc(ctx, response) + grpcResp, err = s.enc(ctx, response) if err != nil { s.logger.Log("err", err) return ctx, nil, err @@ -121,3 +144,18 @@ func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (oldcontext.C return ctx, grpcResp, nil } + +// ServerFinalizerFunc can be used to perform work at the end of an gRPC +// request, after the response has been written to the client. +type ServerFinalizerFunc func(ctx context.Context, err error) + +// Interceptor is a grpc UnaryInterceptor that injects the method name into +// context so it can be consumed by Go kit gRPC middlewares. The Interceptor +// typically is added at creation time of the grpc-go server. +// Like this: `grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))` +func Interceptor( + ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, +) (resp interface{}, err error) { + ctx = context.WithValue(ctx, ContextKeyRequestMethod, info.FullMethod) + return handler(ctx, req) +} diff --git a/transport/http/client.go b/transport/http/client.go index f1ca9c3a4..6814c4a32 100644 --- a/transport/http/client.go +++ b/transport/http/client.go @@ -21,7 +21,7 @@ type Client struct { dec DecodeResponseFunc before []RequestFunc after []ClientResponseFunc - finalizer ClientFinalizerFunc + finalizer []ClientFinalizerFunc bufferedStream bool } @@ -73,8 +73,8 @@ func ClientAfter(after ...ClientResponseFunc) ClientOption { // ClientFinalizer is executed at the end of every HTTP request. // By default, no finalizer is registered. -func ClientFinalizer(f ClientFinalizerFunc) ClientOption { - return func(s *Client) { s.finalizer = f } +func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption { + return func(s *Client) { s.finalizer = append(s.finalizer, f...) } } // BufferedStream sets whether the Response.Body is left open, allowing it @@ -99,7 +99,9 @@ func (c Client) Endpoint() endpoint.Endpoint { ctx = context.WithValue(ctx, ContextKeyResponseHeaders, resp.Header) ctx = context.WithValue(ctx, ContextKeyResponseSize, resp.ContentLength) } - c.finalizer(ctx, err) + for _, f := range c.finalizer { + f(ctx, err) + } }() } diff --git a/transport/http/server.go b/transport/http/server.go index 311f76c81..e2d2850ca 100644 --- a/transport/http/server.go +++ b/transport/http/server.go @@ -17,7 +17,7 @@ type Server struct { before []RequestFunc after []ServerResponseFunc errorEncoder ErrorEncoder - finalizer ServerFinalizerFunc + finalizer []ServerFinalizerFunc logger log.Logger } @@ -76,8 +76,8 @@ func ServerErrorLogger(logger log.Logger) ServerOption { // ServerFinalizer is executed at the end of every HTTP request. // By default, no finalizer is registered. -func ServerFinalizer(f ServerFinalizerFunc) ServerOption { - return func(s *Server) { s.finalizer = f } +func ServerFinalizer(f ...ServerFinalizerFunc) ServerOption { + return func(s *Server) { s.finalizer = append(s.finalizer, f...) } } // ServeHTTP implements http.Handler. @@ -89,7 +89,9 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer func() { ctx = context.WithValue(ctx, ContextKeyResponseHeaders, iw.Header()) ctx = context.WithValue(ctx, ContextKeyResponseSize, iw.written) - s.finalizer(ctx, iw.code, r) + for _, f := range s.finalizer { + f(ctx, iw.code, r) + } }() w = iw } From c633c2809bb34e0218c0b807bf51b2316d1c7fa4 Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Thu, 11 Jan 2018 16:09:00 +0100 Subject: [PATCH 3/4] improvements to zipkin tracer docs and code --- examples/addsvc/pkg/addtransport/grpc.go | 25 +++++++++++------------- tracing/zipkin/grpc.go | 17 +++++++++++++--- tracing/zipkin/http.go | 4 ++-- tracing/zipkin/options.go | 12 ++++++------ transport/grpc/server.go | 2 +- transport/http/server.go | 2 +- 6 files changed, 35 insertions(+), 27 deletions(-) diff --git a/examples/addsvc/pkg/addtransport/grpc.go b/examples/addsvc/pkg/addtransport/grpc.go index 6f54219ed..5bb8f91aa 100644 --- a/examples/addsvc/pkg/addtransport/grpc.go +++ b/examples/addsvc/pkg/addtransport/grpc.go @@ -33,12 +33,15 @@ type grpcServer struct { // NewGRPCServer makes a set of endpoints available as a gRPC AddServer. func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) pb.AddServer { - // Zipkin GRPC Server Trace can either be instantiated per endpoint with a + // Zipkin GRPC Server Trace can either be instantiated per gRPC method with a // provided operation name or a global tracing service can be instantiated - // without an operation name and fed to each Go kit endpoint as ServerOption. + // without an operation name and fed to each Go kit gRPC server as a + // ServerOption. // In the latter case, the operation name will be the endpoint's grpc method - // path. - // We demonstrate a global tracing service here. + // path if used in combination with the Go kit gRPC Interceptor. + // + // In this example, we demonstrate a global Zipkin tracing service with + // Go kit gRPC Interceptor. zipkinServer := zipkin.GRPCServerTrace(zipkinTracer) options := []grpctransport.ServerOption{ @@ -90,11 +93,13 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin // for the entire remote instance, too. limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) - // Zipkin GRPC Client Trace can either be instantiated per endpoint with a + // Zipkin GRPC Client Trace can either be instantiated per gRPC method with a // provided operation name or a global tracing client can be instantiated - // without an operation name and fed to each Go kit endpoint as ClientOption. + // without an operation name and fed to each Go kit client as ClientOption. // In the latter case, the operation name will be the endpoint's grpc method // path. + // + // In this example, we demonstrace a global tracing client. zipkinClient := zipkin.GRPCClientTrace(zipkinTracer) // global client middlewares @@ -118,10 +123,6 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))..., ).Endpoint() sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint) - // For additional information TraceEndpoint is added as endpoint middleware. - // If instantiating per endpoint ClientTracers on the Go kit gRPC client, - // you might not want this additional middleware and thus could be omitted. - sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Sum", @@ -143,10 +144,6 @@ func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkin append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))..., ).Endpoint() concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint) - // For additional information TraceEndpoint is added as endpoint middleware. - // If instantiating per endpoint ClientTracers on the Go kit gRPC client, - // you might not want this additional middleware and thus could be omitted. - concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ Name: "Concat", diff --git a/tracing/zipkin/grpc.go b/tracing/zipkin/grpc.go index e582ad0e7..5a54616d9 100644 --- a/tracing/zipkin/grpc.go +++ b/tracing/zipkin/grpc.go @@ -14,8 +14,19 @@ import ( kitgrpc "github.com/go-kit/kit/transport/grpc" ) -// GRPCClientTrace enables Zipkin tracing of a Go kit gRPC Client Transport. -func GRPCClientTrace(tracer *zipkin.Tracer, options ...Option) kitgrpc.ClientOption { +// GRPCClientTrace enables native Zipkin tracing of a Go kit gRPC Client +// Transport invocation. +// +// Go kit creates client transports per gRPC method. This middleware can be +// set-up individually per service method by adding the method name for each of +// the Go kit method clients using the Name() TracerOption. +// If wanting to use the gRPC FullMethod (/service/method) as Span name you can +// create a global client tracer omitting the Name() TracerOption, which you can +// then feed to each Go kit method client. +// If instrumenting a client to an external (not on your platform) service, you +// might want to disallow propagation of SpanContext using the AllowPropagation +// TracerOption and setting it to false. +func GRPCClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.ClientOption { config := tracerOptions{ tags: make(map[string]string), name: "", @@ -97,7 +108,7 @@ func GRPCClientTrace(tracer *zipkin.Tracer, options ...Option) kitgrpc.ClientOpt } // GRPCServerTrace enables Zipkin tracing of a Go kit gRPC Server Transport. -func GRPCServerTrace(tracer *zipkin.Tracer, options ...Option) kitgrpc.ServerOption { +func GRPCServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.ServerOption { config := tracerOptions{ tags: make(map[string]string), name: "", diff --git a/tracing/zipkin/http.go b/tracing/zipkin/http.go index d17f39d56..c8f2c03ef 100644 --- a/tracing/zipkin/http.go +++ b/tracing/zipkin/http.go @@ -14,7 +14,7 @@ import ( ) // HTTPClientTrace enables Zipkin tracing of a Go kit HTTP Client Transport. -func HTTPClientTrace(tracer *zipkin.Tracer, options ...Option) kithttp.ClientOption { +func HTTPClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ClientOption { config := tracerOptions{ tags: make(map[string]string), name: "", @@ -106,7 +106,7 @@ func HTTPClientTrace(tracer *zipkin.Tracer, options ...Option) kithttp.ClientOpt } // HTTPServerTrace enables Zipkin tracing of a Go kit HTTP Server Transport. -func HTTPServerTrace(tracer *zipkin.Tracer, options ...Option) kithttp.ServerOption { +func HTTPServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ServerOption { config := tracerOptions{ tags: make(map[string]string), name: "", diff --git a/tracing/zipkin/options.go b/tracing/zipkin/options.go index 073a94040..bdb4f97ee 100644 --- a/tracing/zipkin/options.go +++ b/tracing/zipkin/options.go @@ -2,20 +2,20 @@ package zipkin import "github.com/go-kit/kit/log" -// Option allows for functional options to our Zipkin tracing middleware. -type Option func(o *tracerOptions) +// TracerOption allows for functional options to our Zipkin tracing middleware. +type TracerOption func(o *tracerOptions) // Name sets the name for an instrumented transport endpoint. If name is omitted // at tracing middleware creation, the method of the transport or transport rpc // name is used. -func Name(name string) Option { +func Name(name string) TracerOption { return func(o *tracerOptions) { o.name = name } } // Tags adds default tags to our Zipkin transport spans. -func Tags(tags map[string]string) Option { +func Tags(tags map[string]string) TracerOption { return func(o *tracerOptions) { for k, v := range tags { o.tags[k] = v @@ -25,7 +25,7 @@ func Tags(tags map[string]string) Option { // Logger adds a Go kit logger to our Zipkin Middleware to log SpanContext // extract / inject errors if they occur. Default is Noop. -func Logger(logger log.Logger) Option { +func Logger(logger log.Logger) TracerOption { return func(o *tracerOptions) { if logger != nil { o.logger = logger @@ -40,7 +40,7 @@ func Logger(logger log.Logger) Option { // strongly advised to disallow propagation. Propagation between services inside // your own platform benefit from propagation. Default for both TraceClient and // TraceServer is to allow propagation. -func AllowPropagation(propagate bool) Option { +func AllowPropagation(propagate bool) TracerOption { return func(o *tracerOptions) { o.propagate = propagate } diff --git a/transport/grpc/server.go b/transport/grpc/server.go index 178c3f3b8..ccf4f0947 100644 --- a/transport/grpc/server.go +++ b/transport/grpc/server.go @@ -87,7 +87,7 @@ func (s Server) ServeGRPC(ctx oldcontext.Context, req interface{}) (retctx oldco md = metadata.MD{} } - if s.finalizer != nil { + if len(s.finalizer) > 0 { defer func() { for _, f := range s.finalizer { f(ctx, err) diff --git a/transport/http/server.go b/transport/http/server.go index e2d2850ca..7b4b00bf5 100644 --- a/transport/http/server.go +++ b/transport/http/server.go @@ -84,7 +84,7 @@ func ServerFinalizer(f ...ServerFinalizerFunc) ServerOption { func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - if s.finalizer != nil { + if len(s.finalizer) > 0 { iw := &interceptingWriter{w, http.StatusOK, 0} defer func() { ctx = context.WithValue(ctx, ContextKeyResponseHeaders, iw.Header()) From 2593fe75b29fd4b5cc055f2776ad997a16187899 Mon Sep 17 00:00:00 2001 From: Bas van Beek Date: Wed, 17 Jan 2018 14:46:18 +0100 Subject: [PATCH 4/4] source comments changes --- examples/addsvc/cmd/addcli/addcli.go | 16 ++++++++------- examples/addsvc/cmd/addsvc/addsvc.go | 13 ++++++------ tracing/zipkin/grpc.go | 30 +++++++++++++++++++--------- tracing/zipkin/http.go | 27 +++++++++++++++++++++++-- 4 files changed, 62 insertions(+), 24 deletions(-) diff --git a/examples/addsvc/cmd/addcli/addcli.go b/examples/addsvc/cmd/addcli/addcli.go index d25b080f1..a162dfbbc 100644 --- a/examples/addsvc/cmd/addcli/addcli.go +++ b/examples/addsvc/cmd/addcli/addcli.go @@ -89,22 +89,24 @@ func main() { } } + // This is a demonstration of the native Zipkin tracing client. If using + // Zipkin this is the more idiomatic client over OpenTracing. var zipkinTracer *zipkin.Tracer { var ( - err error - hostPort = "" // if host:port is unknown we can keep this empty - serviceName = "addsvc-cli" + err error + hostPort = "" // if host:port is unknown we can keep this empty + serviceName = "addsvc-cli" + useNoopTracer = (*zipkinV2URL == "") + reporter = zipkinhttp.NewReporter(*zipkinV2URL) ) - noopTracer := (*zipkinV2URL == "") - reporter := zipkinhttp.NewReporter(*zipkinV2URL) defer reporter.Close() zEP, _ := zipkin.NewEndpoint(serviceName, hostPort) zipkinTracer, err = zipkin.NewTracer( - reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(noopTracer), + reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(useNoopTracer), ) if err != nil { - fmt.Fprintln(os.Stderr, err.Error()) + fmt.Fprintf(os.Stderr, "unable to create zipkin tracer: %s\n", err.Error()) os.Exit(1) } } diff --git a/examples/addsvc/cmd/addsvc/addsvc.go b/examples/addsvc/cmd/addsvc/addsvc.go index b9701ef60..65901472c 100644 --- a/examples/addsvc/cmd/addsvc/addsvc.go +++ b/examples/addsvc/cmd/addsvc/addsvc.go @@ -105,15 +105,16 @@ func main() { var zipkinTracer *zipkin.Tracer { var ( - err error - hostPort = "localhost:80" - serviceName = "addsvc" + err error + hostPort = "localhost:80" + serviceName = "addsvc" + useNoopTracer = (*zipkinV2URL == "") + reporter = zipkinhttp.NewReporter(*zipkinV2URL) ) - noopTracer := (*zipkinV2URL == "") + defer reporter.Close() zEP, _ := zipkin.NewEndpoint(serviceName, hostPort) - reporter := zipkinhttp.NewReporter(*zipkinV2URL) zipkinTracer, err = zipkin.NewTracer( - reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(noopTracer), + reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(useNoopTracer), ) if err != nil { logger.Log("err", err) diff --git a/tracing/zipkin/grpc.go b/tracing/zipkin/grpc.go index 5a54616d9..adfa1745b 100644 --- a/tracing/zipkin/grpc.go +++ b/tracing/zipkin/grpc.go @@ -14,18 +14,18 @@ import ( kitgrpc "github.com/go-kit/kit/transport/grpc" ) -// GRPCClientTrace enables native Zipkin tracing of a Go kit gRPC Client -// Transport invocation. +// GRPCClientTrace enables native Zipkin tracing of a Go kit gRPC transport +// Client. // -// Go kit creates client transports per gRPC method. This middleware can be -// set-up individually per service method by adding the method name for each of -// the Go kit method clients using the Name() TracerOption. +// Go kit creates gRPC transport clients per remote endpoint. This middleware +// can be set-up individually by adding the endpoint name for each of the Go kit +// transport clients using the Name() TracerOption. // If wanting to use the gRPC FullMethod (/service/method) as Span name you can // create a global client tracer omitting the Name() TracerOption, which you can -// then feed to each Go kit method client. +// then feed to each Go kit gRPC transport client. // If instrumenting a client to an external (not on your platform) service, you -// might want to disallow propagation of SpanContext using the AllowPropagation -// TracerOption and setting it to false. +// will probably want to disallow propagation of SpanContext using the +// AllowPropagation TracerOption and setting it to false. func GRPCClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.ClientOption { config := tracerOptions{ tags: make(map[string]string), @@ -107,7 +107,19 @@ func GRPCClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.Cli } -// GRPCServerTrace enables Zipkin tracing of a Go kit gRPC Server Transport. +// GRPCServerTrace enables native Zipkin tracing of a Go kit gRPC transport +// Server. +// +// Go kit creates gRPC transport servers per gRPC method. This middleware can be +// set-up individually by adding the method name for each of the Go kit method +// servers using the Name() TracerOption. +// If wanting to use the gRPC FullMethod (/service/method) as Span name you can +// create a global server tracer omitting the Name() TracerOption, which you can +// then feed to each Go kit method server. For this to work you will need to +// wire the Go kit gRPC Interceptor too. +// If instrumenting a service to external (not on your platform) clients, you +// will probably want to disallow propagation of a client SpanContext using +// the AllowPropagation TracerOption and setting it to false. func GRPCServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kitgrpc.ServerOption { config := tracerOptions{ tags: make(map[string]string), diff --git a/tracing/zipkin/http.go b/tracing/zipkin/http.go index c8f2c03ef..aa5ff32ae 100644 --- a/tracing/zipkin/http.go +++ b/tracing/zipkin/http.go @@ -13,7 +13,18 @@ import ( kithttp "github.com/go-kit/kit/transport/http" ) -// HTTPClientTrace enables Zipkin tracing of a Go kit HTTP Client Transport. +// HTTPClientTrace enables native Zipkin tracing of a Go kit HTTP transport +// Client. +// +// Go kit creates HTTP transport clients per remote endpoint. This middleware +// can be set-up individually by adding the endpoint name for each of the Go kit +// transport clients using the Name() TracerOption. +// If wanting to use the HTTP Method (Get, Post, Put, etc.) as Span name you can +// create a global client tracer omitting the Name() TracerOption, which you can +// then feed to each Go kit transport client. +// If instrumenting a client to an external (not on your platform) service, you +// will probably want to disallow propagation of SpanContext using the +// AllowPropagation TracerOption and setting it to false. func HTTPClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ClientOption { config := tracerOptions{ tags: make(map[string]string), @@ -105,7 +116,19 @@ func HTTPClientTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.Cli } } -// HTTPServerTrace enables Zipkin tracing of a Go kit HTTP Server Transport. +// HTTPServerTrace enables native Zipkin tracing of a Go kit HTTP transport +// Server. +// +// Go kit creates HTTP transport servers per HTTP endpoint. This middleware can +// be set-up individually by adding the method name for each of the Go kit +// method servers using the Name() TracerOption. +// If wanting to use the HTTP method (Get, Post, Put, etc.) as Span name you can +// create a global server tracer omitting the Name() TracerOption, which you can +// then feed to each Go kit method server. +// +// If instrumenting a service to external (not on your platform) clients, you +// will probably want to disallow propagation of a client SpanContext using +// the AllowPropagation TracerOption and setting it to false. func HTTPServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ServerOption { config := tracerOptions{ tags: make(map[string]string),