Skip to content

Commit

Permalink
feat(grpc): adds support for grpc parsing.
Browse files Browse the repository at this point in the history
  • Loading branch information
jcchavezs committed Sep 9, 2020
1 parent f2e3397 commit 1c762e0
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 6 deletions.
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
Expand Down Expand Up @@ -88,6 +89,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down
27 changes: 26 additions & 1 deletion middleware/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
type clientHandler struct {
tracer *zipkin.Tracer
remoteServiceName string
handleRPCParser handleRPCParser
}

// A ClientOption can be passed to NewClientHandler to customize the returned handler.
Expand All @@ -41,6 +42,30 @@ func WithRemoteServiceName(name string) ClientOption {
}
}

// WithClientInPayloadParser adds a parser for the stats.InPayload to be able to access
// the request payload
func WithClientInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inPayload = parser
}
}

// WithClientInTrailerParser adds a parser for the stats.InTrailer to be able to access
// the request trailer
func WithClientInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inTrailer = parser
}
}

// WithClientInHeaderParser adds a parser for the stats.InHeader to be able to access
// the request payload
func WithClientInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inHeader = parser
}
}

// NewClientHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add
// tracing to a gRPC client. The gRPC method name is used as the span name and by default the only
// tags are the gRPC status code if the call fails.
Expand All @@ -67,7 +92,7 @@ func (c *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) con

// HandleRPC implements per-RPC tracing and stats instrumentation.
func (c *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
handleRPC(ctx, rs, c.handleRPCParser)
}

// TagRPC implements per-RPC context management.
Expand Down
38 changes: 37 additions & 1 deletion middleware/grpc/grpc_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package grpc_test
import (
"context"
"errors"
"log"
"net"
"testing"

Expand All @@ -26,6 +27,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"

"github.com/openzipkin/zipkin-go"
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
Expand Down Expand Up @@ -121,7 +123,7 @@ func (g *sequentialIdGenerator) reset() {
g.nextSpanId = g.start
}

type TestHelloService struct{
type TestHelloService struct {
service.UnimplementedHelloServiceServer
}

Expand Down Expand Up @@ -158,3 +160,37 @@ func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest)

return resp, nil
}

func initListener(s *grpc.Server) func(context.Context, string) (net.Conn, error) {
const bufSize = 1024 * 1024

listener := bufconn.Listen(bufSize)
bufDialer := func(context.Context, string) (net.Conn, error) {
return listener.Dial()
}

go func() {
if err := s.Serve(listener); err != nil {
log.Fatalf("Server exited with error: %v", err)
}
}()

return bufDialer
}

func createTracer(joinSpans bool) (*zipkin.Tracer, func() []model.SpanModel) {
recorder := recorder.NewReporter()
ep, _ := zipkin.NewEndpoint("grpc-server", "")

serverIdGenerator = newSequentialIdGenerator(0x1000000)

tracer, _ := zipkin.NewTracer(
recorder,
zipkin.WithLocalEndpoint(ep),
zipkin.WithSharedSpans(joinSpans),
zipkin.WithIDGenerator(serverIdGenerator),
)
return tracer, func() []model.SpanModel {
return recorder.Flush()
}
}
31 changes: 28 additions & 3 deletions middleware/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (
)

type serverHandler struct {
tracer *zipkin.Tracer
defaultTags map[string]string
tracer *zipkin.Tracer
defaultTags map[string]string
handleRPCParser handleRPCParser
}

// A ServerOption can be passed to NewServerHandler to customize the returned handler.
Expand All @@ -39,6 +40,30 @@ func ServerTags(tags map[string]string) ServerOption {
}
}

// WithServerInPayloadParser adds a parser for the stats.InPayload to be able to access
// the request payload
func WithServerInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inPayload = parser
}
}

// WithserverInTrailerParser adds a parser for the stats.InTrailer to be able to access
// the request trailer
func WithserverInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inTrailer = parser
}
}

// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access
// the request payload
func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inHeader = parser
}
}

// NewServerHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add
// tracing to a gRPC server. The gRPC method name is used as the span name and by default the only
// tags are the gRPC status code if the call fails. Use ServerTags to add additional tags that
Expand Down Expand Up @@ -66,7 +91,7 @@ func (s *serverHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) con

// HandleRPC implements per-RPC tracing and stats instrumentation.
func (s *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
handleRPC(ctx, rs, s.handleRPCParser)
}

// TagRPC implements per-RPC context management.
Expand Down
138 changes: 138 additions & 0 deletions middleware/grpc/server_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2019 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc_test

import (
"context"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"

"github.com/openzipkin/zipkin-go"
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
"github.com/openzipkin/zipkin-go/model"
service "github.com/openzipkin/zipkin-go/proto/testing"
)

func TestGRPCServerCreatesASpanAndContext(t *testing.T) {
tracer, flusher := createTracer(false)

s := grpc.NewServer(
grpc.StatsHandler(
zipkingrpc.NewServerHandler(
tracer,
zipkingrpc.ServerTags(map[string]string{"default": "tag"}),
),
),
)
defer s.Stop()

service.RegisterHelloServiceServer(s, &TestHelloService{})

dialer := initListener(s)

ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithInsecure(),
)
if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()

client := service.NewHelloServiceClient(conn)

_, err = client.Hello(ctx, &service.HelloRequest{
Payload: "Hello",
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

spans := flusher()
if want, have := 1, len(spans); want != have {
t.Errorf("unexpected number of spans, want %d, have %d", want, have)
}

span := spans[0]
if want, have := model.Server, span.Kind; want != have {
t.Errorf("unexpected kind, want %q, have %q", want, have)
}
}

func TestGRPCServerCanAccessToHeaders(t *testing.T) {
tracer, flusher := createTracer(false)

s := grpc.NewServer(
grpc.StatsHandler(
zipkingrpc.NewServerHandler(
tracer,
zipkingrpc.ServerTags(map[string]string{"default": "tag"}),
zipkingrpc.WithServerInHeaderParser(func(inHeader *stats.InHeader, span zipkin.Span) {
if want, have := "test_value", inHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have)
}
}),
zipkingrpc.WithServerInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.Span) {
if want, have := "test_value", inTrailer.Trailer.Get("test_key")[0]; want != have {
t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have)
}
}),
),
),
)
defer s.Stop()

service.RegisterHelloServiceServer(s, &TestHelloService{})

dialer := initListener(s)

ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithInsecure(),
)
if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()

client := service.NewHelloServiceClient(conn)

ctx = metadata.AppendToOutgoingContext(ctx, "test_key", "test_value")
_, err = client.Hello(ctx, &service.HelloRequest{
Payload: "Hello",
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

spans := flusher()
if want, have := 1, len(spans); want != have {
t.Errorf("unexpected number of spans, want %d, have %d", want, have)
}

span := spans[0]
if want, have := model.Server, span.Kind; want != have {
t.Errorf("unexpected kind, want %q, have %q", want, have)
}
}
20 changes: 19 additions & 1 deletion middleware/grpc/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
"github.com/openzipkin/zipkin-go/model"
)

type handleRPCParser struct {
inPayload func(*stats.InPayload, zipkin.Span)
inTrailer func(*stats.InTrailer, zipkin.Span)
inHeader func(*stats.InHeader, zipkin.Span)
}

// A RPCHandler can be registered using WithClientRPCHandler or WithServerRPCHandler to intercept calls to HandleRPC of
// a handler for additional span customization.
type RPCHandler func(span zipkin.Span, rpcStats stats.RPCStats)
Expand All @@ -37,10 +43,22 @@ func spanName(rti *stats.RPCTagInfo) string {
return name
}

func handleRPC(ctx context.Context, rs stats.RPCStats) {
func handleRPC(ctx context.Context, rs stats.RPCStats, h handleRPCParser) {
span := zipkin.SpanFromContext(ctx)

switch rs := rs.(type) {
case *stats.InPayload:
if h.inPayload != nil {
h.inPayload(rs, span)
}
case *stats.InHeader:
if h.inHeader != nil {
h.inHeader(rs, span)
}
case *stats.InTrailer:
if h.inTrailer != nil {
h.inTrailer(rs, span)
}
case *stats.End:
s, ok := status.FromError(rs.Error)
// rs.Error should always be convertable to a status, this is just a defensive check.
Expand Down

0 comments on commit 1c762e0

Please sign in to comment.