Skip to content

Commit

Permalink
Merge pull request #2 from basvanbeek/zipkin
Browse files Browse the repository at this point in the history
Zipkin native instrumentation using zipkin-go
  • Loading branch information
basvanbeek authored Jan 17, 2018
2 parents 953e747 + 2593fe7 commit 335191f
Show file tree
Hide file tree
Showing 19 changed files with 860 additions and 213 deletions.
59 changes: 42 additions & 17 deletions examples/addsvc/cmd/addcli/addcli.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"github.com/apache/thrift/lib/go/thrift"
lightstep "github.com/lightstep/lightstep-tracer-go"
stdopentracing "github.com/opentracing/opentracing-go"
zipkin "github.com/openzipkin/zipkin-go-opentracing"
zipkin "github.com/openzipkin/zipkin-go"
zipkinot "github.com/openzipkin/zipkin-go-opentracing"
zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
"sourcegraph.com/sourcegraph/appdash"
appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"

Expand All @@ -40,9 +42,10 @@ func main() {
thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans")
lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
zipkinV2URL = fs.String("zipkin-url", "", "Enable Zipkin v2 tracing (zipkin-go) via HTTP Reporter URL e.g. http://localhost:94111/api/v2/spans")
zipkinV1URL = fs.String("zipkin-v1-url", "", "Enable Zipkin v1 tracing (zipkin-go-opentracing) via a collector URL e.g. http://localhost:9411/api/v1/spans")
lightstepToken = fs.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
appdashAddr = fs.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
method = fs.String("method", "sum", "sum, concat")
)
fs.Usage = usageFor(fs, os.Args[0]+" [flags] <a> <b>")
Expand All @@ -54,35 +57,57 @@ func main() {

// This is a demonstration client, which supports multiple tracers.
// Your clients will probably just use one tracer.
var tracer stdopentracing.Tracer
var otTracer stdopentracing.Tracer
{
if *zipkinURL != "" {
collector, err := zipkin.NewHTTPCollector(*zipkinURL)
if *zipkinV1URL != "" {
collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
defer collector.Close()
var (
debug = false
hostPort = "localhost:80"
serviceName = "addsvc"
hostPort = "localhost:0"
serviceName = "addsvc-cli"
)
recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName)
tracer, err = zipkin.NewTracer(recorder)
recorder := zipkinot.NewRecorder(collector, debug, hostPort, serviceName)
otTracer, err = zipkinot.NewTracer(recorder)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
} else if *lightstepToken != "" {
tracer = lightstep.NewTracer(lightstep.Options{
otTracer = lightstep.NewTracer(lightstep.Options{
AccessToken: *lightstepToken,
})
defer lightstep.FlushLightStepTracer(tracer)
defer lightstep.FlushLightStepTracer(otTracer)
} else if *appdashAddr != "" {
tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
otTracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
} else {
tracer = stdopentracing.GlobalTracer() // no-op
otTracer = stdopentracing.GlobalTracer() // no-op
}
}

// This is a demonstration of the native Zipkin tracing client. If using
// Zipkin this is the more idiomatic client over OpenTracing.
var zipkinTracer *zipkin.Tracer
{
var (
err error
hostPort = "" // if host:port is unknown we can keep this empty
serviceName = "addsvc-cli"
useNoopTracer = (*zipkinV2URL == "")
reporter = zipkinhttp.NewReporter(*zipkinV2URL)
)
defer reporter.Close()
zEP, _ := zipkin.NewEndpoint(serviceName, hostPort)
zipkinTracer, err = zipkin.NewTracer(
reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(useNoopTracer),
)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to create zipkin tracer: %s\n", err.Error())
os.Exit(1)
}
}

Expand All @@ -93,15 +118,15 @@ func main() {
err error
)
if *httpAddr != "" {
svc, err = addtransport.NewHTTPClient(*httpAddr, tracer, log.NewNopLogger())
svc, err = addtransport.NewHTTPClient(*httpAddr, otTracer, zipkinTracer, log.NewNopLogger())
} else if *grpcAddr != "" {
conn, err := grpc.Dial(*grpcAddr, grpc.WithInsecure(), grpc.WithTimeout(time.Second))
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v", err)
os.Exit(1)
}
defer conn.Close()
svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger())
svc = addtransport.NewGRPCClient(conn, otTracer, zipkinTracer, log.NewNopLogger())
} else if *thriftAddr != "" {
// It's necessary to do all of this construction in the func main,
// because (among other reasons) we need to control the lifecycle of the
Expand Down
54 changes: 40 additions & 14 deletions examples/addsvc/cmd/addsvc/addsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
lightstep "github.com/lightstep/lightstep-tracer-go"
"github.com/oklog/oklog/pkg/group"
stdopentracing "github.com/opentracing/opentracing-go"
zipkin "github.com/openzipkin/zipkin-go-opentracing"
zipkin "github.com/openzipkin/zipkin-go"
zipkinot "github.com/openzipkin/zipkin-go-opentracing"
zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
Expand All @@ -24,6 +26,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/prometheus"
kitgrpc "github.com/go-kit/kit/transport/grpc"

addpb "github.com/go-kit/kit/examples/addsvc/pb"
"github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
Expand All @@ -45,9 +48,10 @@ func main() {
thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans")
lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
zipkinV2URL = fs.String("zipkin-url", "", "Enable Zipkin v2 tracing (zipkin-go) using a Reporter URL e.g. http://localhost:9411/api/v2/spans")
zipkinV1URL = fs.String("zipkin-v1-url", "", "Enable Zipkin v1 tracing (zipkin-go-opentracing) using a collector URL e.g. http://localhost:9411/api/v1/spans")
lightstepToken = fs.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
appdashAddr = fs.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
)
fs.Usage = usageFor(fs, os.Args[0]+" [flags]")
fs.Parse(os.Args[1:])
Expand All @@ -60,13 +64,13 @@ func main() {
logger = log.With(logger, "caller", log.DefaultCaller)
}

// Determine which tracer to use. We'll pass the tracer to all the
// Determine which OpenTracing tracer to use. We'll pass the tracer to all the
// components that use it, as a dependency.
var tracer stdopentracing.Tracer
{
if *zipkinURL != "" {
logger.Log("tracer", "Zipkin", "URL", *zipkinURL)
collector, err := zipkin.NewHTTPCollector(*zipkinURL)
if *zipkinV1URL != "" {
logger.Log("tracer", "Zipkin", "URL", *zipkinV1URL)
collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL)
if err != nil {
logger.Log("err", err)
os.Exit(1)
Expand All @@ -77,8 +81,8 @@ func main() {
hostPort = "localhost:80"
serviceName = "addsvc"
)
recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName)
tracer, err = zipkin.NewTracer(recorder)
recorder := zipkinot.NewRecorder(collector, debug, hostPort, serviceName)
tracer, err = zipkinot.NewTracer(recorder)
if err != nil {
logger.Log("err", err)
os.Exit(1)
Expand All @@ -98,6 +102,26 @@ func main() {
}
}

var zipkinTracer *zipkin.Tracer
{
var (
err error
hostPort = "localhost:80"
serviceName = "addsvc"
useNoopTracer = (*zipkinV2URL == "")
reporter = zipkinhttp.NewReporter(*zipkinV2URL)
)
defer reporter.Close()
zEP, _ := zipkin.NewEndpoint(serviceName, hostPort)
zipkinTracer, err = zipkin.NewTracer(
reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(useNoopTracer),
)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
}

// Create the (sparse) metrics we'll use in the service. They, too, are
// dependencies that we pass to components that use them.
var ints, chars metrics.Counter
Expand Down Expand Up @@ -136,9 +160,9 @@ func main() {
// them to ports or anything yet; we'll do that next.
var (
service = addservice.New(logger, ints, chars)
endpoints = addendpoint.New(service, logger, duration, tracer)
httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger)
grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger)
endpoints = addendpoint.New(service, logger, duration, tracer, zipkinTracer)
httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, zipkinTracer, logger)
grpcServer = addtransport.NewGRPCServer(endpoints, tracer, zipkinTracer, logger)
thriftServer = addtransport.NewThriftServer(endpoints)
)

Expand Down Expand Up @@ -194,7 +218,9 @@ func main() {
}
g.Add(func() error {
logger.Log("transport", "gRPC", "addr", *grpcAddr)
baseServer := grpc.NewServer()
// we add the Go Kit gRPC Interceptor to our gRPC service as it is used by
// the here demonstrated zipkin tracing middleware.
baseServer := grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))
addpb.RegisterAddServer(baseServer, grpcServer)
return baseServer.Serve(grpcListener)
}, func(error) {
Expand Down
6 changes: 4 additions & 2 deletions examples/addsvc/cmd/addsvc/wiring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

"github.com/opentracing/opentracing-go"
zipkin "github.com/openzipkin/zipkin-go"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics/discard"
Expand All @@ -18,9 +19,10 @@ import (
)

func TestHTTP(t *testing.T) {
zkt, _ := zipkin.NewTracer(nil, zipkin.WithNoopTracer(true))
svc := addservice.New(log.NewNopLogger(), discard.NewCounter(), discard.NewCounter())
eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer())
mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), log.NewNopLogger())
eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer(), zkt)
mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), zkt, log.NewNopLogger())
srv := httptest.NewServer(mux)
defer srv.Close()

Expand Down
10 changes: 7 additions & 3 deletions examples/addsvc/pkg/addendpoint/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"golang.org/x/time/rate"

stdopentracing "github.com/opentracing/opentracing-go"
stdzipkin "github.com/openzipkin/zipkin-go"
"github.com/sony/gobreaker"

"github.com/go-kit/kit/circuitbreaker"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/tracing/opentracing"
"github.com/go-kit/kit/tracing/zipkin"

"github.com/go-kit/kit/examples/addsvc/pkg/addservice"
)
Expand All @@ -29,13 +31,14 @@ type Set struct {

// New returns a Set that wraps the provided server, and wires in all of the
// expected endpoint middlewares via the various parameters.
func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, trace stdopentracing.Tracer) Set {
func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer) Set {
var sumEndpoint endpoint.Endpoint
{
sumEndpoint = MakeSumEndpoint(svc)
sumEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 1))(sumEndpoint)
sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint)
sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint)
sumEndpoint = opentracing.TraceServer(otTracer, "Sum")(sumEndpoint)
sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint)
sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint)
sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint)
}
Expand All @@ -44,7 +47,8 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram,
concatEndpoint = MakeConcatEndpoint(svc)
concatEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))(concatEndpoint)
concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint)
concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint)
concatEndpoint = opentracing.TraceServer(otTracer, "Concat")(concatEndpoint)
concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint)
concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint)
concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint)
}
Expand Down
45 changes: 37 additions & 8 deletions examples/addsvc/pkg/addtransport/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"google.golang.org/grpc"

stdopentracing "github.com/opentracing/opentracing-go"
stdzipkin "github.com/openzipkin/zipkin-go"
"github.com/sony/gobreaker"
oldcontext "golang.org/x/net/context"
"golang.org/x/time/rate"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/tracing/opentracing"
"github.com/go-kit/kit/tracing/zipkin"
grpctransport "github.com/go-kit/kit/transport/grpc"

"github.com/go-kit/kit/examples/addsvc/pb"
Expand All @@ -30,22 +32,35 @@ type grpcServer struct {
}

// NewGRPCServer makes a set of endpoints available as a gRPC AddServer.
func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer {
func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) pb.AddServer {
// Zipkin GRPC Server Trace can either be instantiated per gRPC method with a
// provided operation name or a global tracing service can be instantiated
// without an operation name and fed to each Go kit gRPC server as a
// ServerOption.
// In the latter case, the operation name will be the endpoint's grpc method
// path if used in combination with the Go kit gRPC Interceptor.
//
// In this example, we demonstrate a global Zipkin tracing service with
// Go kit gRPC Interceptor.
zipkinServer := zipkin.GRPCServerTrace(zipkinTracer)

options := []grpctransport.ServerOption{
grpctransport.ServerErrorLogger(logger),
zipkinServer,
}

return &grpcServer{
sum: grpctransport.NewServer(
endpoints.SumEndpoint,
decodeGRPCSumRequest,
encodeGRPCSumResponse,
append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Sum", logger)))...,
append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Sum", logger)))...,
),
concat: grpctransport.NewServer(
endpoints.ConcatEndpoint,
decodeGRPCConcatRequest,
encodeGRPCConcatResponse,
append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Concat", logger)))...,
append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Concat", logger)))...,
),
}
}
Expand All @@ -70,14 +85,28 @@ func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.
// of the conn. The caller is responsible for constructing the conn, and
// eventually closing the underlying transport. We bake-in certain middlewares,
// implementing the client library pattern.
func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service {
func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) addservice.Service {
// We construct a single ratelimiter middleware, to limit the total outgoing
// QPS from this client to all methods on the remote instance. We also
// construct per-endpoint circuitbreaker middlewares to demonstrate how
// that's done, although they could easily be combined into a single breaker
// for the entire remote instance, too.
limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))

// Zipkin GRPC Client Trace can either be instantiated per gRPC method with a
// provided operation name or a global tracing client can be instantiated
// without an operation name and fed to each Go kit client as ClientOption.
// In the latter case, the operation name will be the endpoint's grpc method
// path.
//
// In this example, we demonstrace a global tracing client.
zipkinClient := zipkin.GRPCClientTrace(zipkinTracer)

// global client middlewares
options := []grpctransport.ClientOption{
zipkinClient,
}

// Each individual endpoint is an http/transport.Client (which implements
// endpoint.Endpoint) that gets wrapped with various middlewares. If you
// made your own client library, you'd do this work there, so your server
Expand All @@ -91,9 +120,9 @@ func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger l
encodeGRPCSumRequest,
decodeGRPCSumResponse,
pb.SumReply{},
grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)),
append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))...,
).Endpoint()
sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint)
sumEndpoint = limiter(sumEndpoint)
sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Sum",
Expand All @@ -112,9 +141,9 @@ func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger l
encodeGRPCConcatRequest,
decodeGRPCConcatResponse,
pb.ConcatReply{},
grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)),
append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))...,
).Endpoint()
concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint)
concatEndpoint = limiter(concatEndpoint)
concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Concat",
Expand Down
Loading

0 comments on commit 335191f

Please sign in to comment.