Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zipkin native instrumentation using zipkin-go #2

Merged
merged 4 commits into from
Jan 17, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions examples/addsvc/cmd/addcli/addcli.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"github.com/apache/thrift/lib/go/thrift"
lightstep "github.com/lightstep/lightstep-tracer-go"
stdopentracing "github.com/opentracing/opentracing-go"
zipkin "github.com/openzipkin/zipkin-go-opentracing"
zipkin "github.com/openzipkin/zipkin-go"
zipkinot "github.com/openzipkin/zipkin-go-opentracing"
zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
"sourcegraph.com/sourcegraph/appdash"
appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"

Expand All @@ -40,9 +42,10 @@ func main() {
thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans")
lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
zipkinV2URL = fs.String("zipkin-url", "", "Enable Zipkin v2 tracing (zipkin-go) via HTTP Reporter URL e.g. http://localhost:94111/api/v2/spans")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx for making this explicit, as people have unfortunately sent v2 to the v1 endpoint in the past!

zipkinV1URL = fs.String("zipkin-v1-url", "", "Enable Zipkin v1 tracing (zipkin-go-opentracing) via a collector URL e.g. http://localhost:9411/api/v1/spans")
lightstepToken = fs.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
appdashAddr = fs.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
method = fs.String("method", "sum", "sum, concat")
)
fs.Usage = usageFor(fs, os.Args[0]+" [flags] <a> <b>")
Expand All @@ -54,35 +57,55 @@ func main() {

// This is a demonstration client, which supports multiple tracers.
// Your clients will probably just use one tracer.
var tracer stdopentracing.Tracer
var otTracer stdopentracing.Tracer
{
if *zipkinURL != "" {
collector, err := zipkin.NewHTTPCollector(*zipkinURL)
if *zipkinV1URL != "" {
collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
defer collector.Close()
var (
debug = false
hostPort = "localhost:80"
serviceName = "addsvc"
hostPort = "localhost:0"
serviceName = "addsvc-cli"
)
recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName)
tracer, err = zipkin.NewTracer(recorder)
recorder := zipkinot.NewRecorder(collector, debug, hostPort, serviceName)
otTracer, err = zipkinot.NewTracer(recorder)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
} else if *lightstepToken != "" {
tracer = lightstep.NewTracer(lightstep.Options{
otTracer = lightstep.NewTracer(lightstep.Options{
AccessToken: *lightstepToken,
})
defer lightstep.FlushLightStepTracer(tracer)
defer lightstep.FlushLightStepTracer(otTracer)
} else if *appdashAddr != "" {
tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
otTracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr))
} else {
tracer = stdopentracing.GlobalTracer() // no-op
otTracer = stdopentracing.GlobalTracer() // no-op
}
}

var zipkinTracer *zipkin.Tracer
{
var (
err error
hostPort = "" // if host:port is unknown we can keep this empty
serviceName = "addsvc-cli"
)
noopTracer := (*zipkinV2URL == "")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for readability purposes I'd call it useNoopTracer. It took me a couple of seconds to realize what was the intention in line 104.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed... will update

reporter := zipkinhttp.NewReporter(*zipkinV2URL)
defer reporter.Close()
zEP, _ := zipkin.NewEndpoint(serviceName, hostPort)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pronounced "zee endpoint"

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol

zipkinTracer, err = zipkin.NewTracer(
reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(noopTracer),
)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
Copy link

@jcchavezs jcchavezs Jan 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could add more context to this error. Something like

fmt.Fprintf(os.Stderr, "error when creating the zipkin tracer: %s" err.Error()).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

os.Exit(1)
}
}

Expand All @@ -93,15 +116,15 @@ func main() {
err error
)
if *httpAddr != "" {
svc, err = addtransport.NewHTTPClient(*httpAddr, tracer, log.NewNopLogger())
svc, err = addtransport.NewHTTPClient(*httpAddr, otTracer, zipkinTracer, log.NewNopLogger())
} else if *grpcAddr != "" {
conn, err := grpc.Dial(*grpcAddr, grpc.WithInsecure(), grpc.WithTimeout(time.Second))
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v", err)
os.Exit(1)
}
defer conn.Close()
svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger())
svc = addtransport.NewGRPCClient(conn, otTracer, zipkinTracer, log.NewNopLogger())
} else if *thriftAddr != "" {
// It's necessary to do all of this construction in the func main,
// because (among other reasons) we need to control the lifecycle of the
Expand Down
53 changes: 39 additions & 14 deletions examples/addsvc/cmd/addsvc/addsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
lightstep "github.com/lightstep/lightstep-tracer-go"
"github.com/oklog/oklog/pkg/group"
stdopentracing "github.com/opentracing/opentracing-go"
zipkin "github.com/openzipkin/zipkin-go-opentracing"
zipkin "github.com/openzipkin/zipkin-go"
zipkinot "github.com/openzipkin/zipkin-go-opentracing"
zipkinhttp "github.com/openzipkin/zipkin-go/reporter/http"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
Expand All @@ -24,6 +26,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/prometheus"
kitgrpc "github.com/go-kit/kit/transport/grpc"

addpb "github.com/go-kit/kit/examples/addsvc/pb"
"github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
Expand All @@ -45,9 +48,10 @@ func main() {
thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans")
lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
zipkinV2URL = fs.String("zipkin-url", "", "Enable Zipkin v2 tracing (zipkin-go) using a Reporter URL e.g. http://localhost:9411/api/v2/spans")
zipkinV1URL = fs.String("zipkin-v1-url", "", "Enable Zipkin v1 tracing (zipkin-go-opentracing) using a collector URL e.g. http://localhost:9411/api/v1/spans")
lightstepToken = fs.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token")
appdashAddr = fs.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port")
)
fs.Usage = usageFor(fs, os.Args[0]+" [flags]")
fs.Parse(os.Args[1:])
Expand All @@ -60,13 +64,13 @@ func main() {
logger = log.With(logger, "caller", log.DefaultCaller)
}

// Determine which tracer to use. We'll pass the tracer to all the
// Determine which OpenTracing tracer to use. We'll pass the tracer to all the
// components that use it, as a dependency.
var tracer stdopentracing.Tracer
{
if *zipkinURL != "" {
logger.Log("tracer", "Zipkin", "URL", *zipkinURL)
collector, err := zipkin.NewHTTPCollector(*zipkinURL)
if *zipkinV1URL != "" {
logger.Log("tracer", "Zipkin", "URL", *zipkinV1URL)
collector, err := zipkinot.NewHTTPCollector(*zipkinV1URL)
if err != nil {
logger.Log("err", err)
os.Exit(1)
Expand All @@ -77,8 +81,8 @@ func main() {
hostPort = "localhost:80"
serviceName = "addsvc"
)
recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName)
tracer, err = zipkin.NewTracer(recorder)
recorder := zipkinot.NewRecorder(collector, debug, hostPort, serviceName)
tracer, err = zipkinot.NewTracer(recorder)
if err != nil {
logger.Log("err", err)
os.Exit(1)
Expand All @@ -98,6 +102,25 @@ func main() {
}
}

var zipkinTracer *zipkin.Tracer
{
var (
err error
hostPort = "localhost:80"
serviceName = "addsvc"
)
noopTracer := (*zipkinV2URL == "")
zEP, _ := zipkin.NewEndpoint(serviceName, hostPort)
reporter := zipkinhttp.NewReporter(*zipkinV2URL)
zipkinTracer, err = zipkin.NewTracer(
reporter, zipkin.WithLocalEndpoint(zEP), zipkin.WithNoopTracer(noopTracer),
)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
}

// Create the (sparse) metrics we'll use in the service. They, too, are
// dependencies that we pass to components that use them.
var ints, chars metrics.Counter
Expand Down Expand Up @@ -136,9 +159,9 @@ func main() {
// them to ports or anything yet; we'll do that next.
var (
service = addservice.New(logger, ints, chars)
endpoints = addendpoint.New(service, logger, duration, tracer)
httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger)
grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger)
endpoints = addendpoint.New(service, logger, duration, tracer, zipkinTracer)
httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, zipkinTracer, logger)
grpcServer = addtransport.NewGRPCServer(endpoints, tracer, zipkinTracer, logger)
thriftServer = addtransport.NewThriftServer(endpoints)
)

Expand Down Expand Up @@ -194,7 +217,9 @@ func main() {
}
g.Add(func() error {
logger.Log("transport", "gRPC", "addr", *grpcAddr)
baseServer := grpc.NewServer()
// we add the Go Kit gRPC Interceptor to our gRPC service as it is used by
// the here demonstrated zipkin tracing middleware.
baseServer := grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))
addpb.RegisterAddServer(baseServer, grpcServer)
return baseServer.Serve(grpcListener)
}, func(error) {
Expand Down
6 changes: 4 additions & 2 deletions examples/addsvc/cmd/addsvc/wiring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

"github.com/opentracing/opentracing-go"
zipkin "github.com/openzipkin/zipkin-go"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics/discard"
Expand All @@ -18,9 +19,10 @@ import (
)

func TestHTTP(t *testing.T) {
zkt, _ := zipkin.NewTracer(nil, zipkin.WithNoopTracer(true))
svc := addservice.New(log.NewNopLogger(), discard.NewCounter(), discard.NewCounter())
eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer())
mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), log.NewNopLogger())
eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer(), zkt)
mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), zkt, log.NewNopLogger())
srv := httptest.NewServer(mux)
defer srv.Close()

Expand Down
10 changes: 7 additions & 3 deletions examples/addsvc/pkg/addendpoint/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"golang.org/x/time/rate"

stdopentracing "github.com/opentracing/opentracing-go"
stdzipkin "github.com/openzipkin/zipkin-go"
"github.com/sony/gobreaker"

"github.com/go-kit/kit/circuitbreaker"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/tracing/opentracing"
"github.com/go-kit/kit/tracing/zipkin"

"github.com/go-kit/kit/examples/addsvc/pkg/addservice"
)
Expand All @@ -29,13 +31,14 @@ type Set struct {

// New returns a Set that wraps the provided server, and wires in all of the
// expected endpoint middlewares via the various parameters.
func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, trace stdopentracing.Tracer) Set {
func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer) Set {
var sumEndpoint endpoint.Endpoint
{
sumEndpoint = MakeSumEndpoint(svc)
sumEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 1))(sumEndpoint)
sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint)
sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint)
sumEndpoint = opentracing.TraceServer(otTracer, "Sum")(sumEndpoint)
sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint)
sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint)
sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint)
}
Expand All @@ -44,7 +47,8 @@ func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram,
concatEndpoint = MakeConcatEndpoint(svc)
concatEndpoint = ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))(concatEndpoint)
concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint)
concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint)
concatEndpoint = opentracing.TraceServer(otTracer, "Concat")(concatEndpoint)
concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint)
concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint)
concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint)
}
Expand Down
48 changes: 40 additions & 8 deletions examples/addsvc/pkg/addtransport/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"google.golang.org/grpc"

stdopentracing "github.com/opentracing/opentracing-go"
stdzipkin "github.com/openzipkin/zipkin-go"
"github.com/sony/gobreaker"
oldcontext "golang.org/x/net/context"
"golang.org/x/time/rate"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/tracing/opentracing"
"github.com/go-kit/kit/tracing/zipkin"
grpctransport "github.com/go-kit/kit/transport/grpc"

"github.com/go-kit/kit/examples/addsvc/pb"
Expand All @@ -30,22 +32,32 @@ type grpcServer struct {
}

// NewGRPCServer makes a set of endpoints available as a gRPC AddServer.
func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer {
func NewGRPCServer(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) pb.AddServer {
// Zipkin GRPC Server Trace can either be instantiated per endpoint with a
// provided operation name or a global tracing service can be instantiated
// without an operation name and fed to each Go kit endpoint as ServerOption.
// In the latter case, the operation name will be the endpoint's grpc method
// path.
// We demonstrate a global tracing service here.
zipkinServer := zipkin.GRPCServerTrace(zipkinTracer)

options := []grpctransport.ServerOption{
grpctransport.ServerErrorLogger(logger),
zipkinServer,
}

return &grpcServer{
sum: grpctransport.NewServer(
endpoints.SumEndpoint,
decodeGRPCSumRequest,
encodeGRPCSumResponse,
append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Sum", logger)))...,
append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Sum", logger)))...,
),
concat: grpctransport.NewServer(
endpoints.ConcatEndpoint,
decodeGRPCConcatRequest,
encodeGRPCConcatResponse,
append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(tracer, "Concat", logger)))...,
append(options, grpctransport.ServerBefore(opentracing.GRPCToContext(otTracer, "Concat", logger)))...,
),
}
}
Expand All @@ -70,14 +82,26 @@ func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.
// of the conn. The caller is responsible for constructing the conn, and
// eventually closing the underlying transport. We bake-in certain middlewares,
// implementing the client library pattern.
func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service {
func NewGRPCClient(conn *grpc.ClientConn, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) addservice.Service {
// We construct a single ratelimiter middleware, to limit the total outgoing
// QPS from this client to all methods on the remote instance. We also
// construct per-endpoint circuitbreaker middlewares to demonstrate how
// that's done, although they could easily be combined into a single breaker
// for the entire remote instance, too.
limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))

// Zipkin GRPC Client Trace can either be instantiated per endpoint with a
// provided operation name or a global tracing client can be instantiated
// without an operation name and fed to each Go kit endpoint as ClientOption.
// In the latter case, the operation name will be the endpoint's grpc method
// path.
zipkinClient := zipkin.GRPCClientTrace(zipkinTracer)

// global client middlewares
options := []grpctransport.ClientOption{
zipkinClient,
}

// Each individual endpoint is an http/transport.Client (which implements
// endpoint.Endpoint) that gets wrapped with various middlewares. If you
// made your own client library, you'd do this work there, so your server
Expand All @@ -91,9 +115,13 @@ func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger l
encodeGRPCSumRequest,
decodeGRPCSumResponse,
pb.SumReply{},
grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)),
append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))...,
).Endpoint()
sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint)
// For additional information TraceEndpoint is added as endpoint middleware.
// If instantiating per endpoint ClientTracers on the Go kit gRPC client,
// you might not want this additional middleware and thus could be omitted.
sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint)
sumEndpoint = limiter(sumEndpoint)
sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Sum",
Expand All @@ -112,9 +140,13 @@ func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger l
encodeGRPCConcatRequest,
decodeGRPCConcatResponse,
pb.ConcatReply{},
grpctransport.ClientBefore(opentracing.ContextToGRPC(tracer, logger)),
append(options, grpctransport.ClientBefore(opentracing.ContextToGRPC(otTracer, logger)))...,
).Endpoint()
concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint)
// For additional information TraceEndpoint is added as endpoint middleware.
// If instantiating per endpoint ClientTracers on the Go kit gRPC client,
// you might not want this additional middleware and thus could be omitted.
concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint)
concatEndpoint = limiter(concatEndpoint)
concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Concat",
Expand Down
Loading