Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(grpc): adds support for grpc parsing. #177

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
Expand Down Expand Up @@ -88,6 +89,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down
45 changes: 43 additions & 2 deletions middleware/grpc/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The OpenZipkin Authors
// Copyright 2020 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@ import (
type clientHandler struct {
tracer *zipkin.Tracer
remoteServiceName string
handleRPCParser handleRPCParser
}

// A ClientOption can be passed to NewClientHandler to customize the returned handler.
Expand All @@ -41,6 +42,46 @@ func WithRemoteServiceName(name string) ClientOption {
}
}

// WithClientOutPayloadParser adds a parser for the stats.OutPayload to be able to access
// the outgoing request payload
func WithClientOutPayloadParser(parser func(*stats.OutPayload, zipkin.SpanCustomizer)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.outPayload = parser
}
}

// WithClientOutHeaderParser adds a parser for the stats.OutHeader to be able to access
// the outgoing request payload
func WithClientOutHeaderParser(parser func(*stats.OutHeader, zipkin.SpanCustomizer)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.outHeader = parser
}
}

// WithClientInPayloadParser adds a parser for the stats.InPayload to be able to access
// the incoming response payload
func WithClientInPayloadParser(parser func(*stats.InPayload, zipkin.SpanCustomizer)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inPayload = parser
}
}

// WithClientInTrailerParser adds a parser for the stats.InTrailer to be able to access
// the incoming response trailer
func WithClientInTrailerParser(parser func(*stats.InTrailer, zipkin.SpanCustomizer)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inTrailer = parser
}
}

// WithClientInHeaderParser adds a parser for the stats.InHeader to be able to access
// the incoming response header
func WithClientInHeaderParser(parser func(*stats.InHeader, zipkin.SpanCustomizer)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.inHeader = parser
}
}

// NewClientHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add
// tracing to a gRPC client. The gRPC method name is used as the span name and by default the only
// tags are the gRPC status code if the call fails.
Expand All @@ -67,7 +108,7 @@ func (c *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) con

// HandleRPC implements per-RPC tracing and stats instrumentation.
func (c *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
handleRPC(ctx, rs, c.handleRPCParser)
}

// TagRPC implements per-RPC context management.
Expand Down
105 changes: 105 additions & 0 deletions middleware/grpc/client_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2020 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc_test

import (
"context"
"testing"

"github.com/openzipkin/zipkin-go"
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
service "github.com/openzipkin/zipkin-go/proto/testing"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)

func TestGRPCClientCanAccessToPayloadAndMetadata(t *testing.T) {
tracer, flusher := createTracer(false)

s := grpc.NewServer()
defer s.Stop()

service.RegisterHelloServiceServer(s, &TestHelloService{
responseHeader: metadata.Pairs("test_key", "test_value_1"),
responseTrailer: metadata.Pairs("test_key", "test_value_2"),
})

dialer := initListener(s)

ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithInsecure(),
grpc.WithStatsHandler(zipkingrpc.NewClientHandler(
tracer,
zipkingrpc.WithClientOutPayloadParser(func(outPayload *stats.OutPayload, span zipkin.SpanCustomizer) {
m, ok := outPayload.Payload.(*service.HelloRequest)
if !ok {
t.Fatal("failed to cast the payload as a service.HelloResponse")
}
if want, have := "Hello", m.Payload; want != have {
t.Errorf("incorrect payload: want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientOutHeaderParser(func(outHeader *stats.OutHeader, span zipkin.SpanCustomizer) {
if want, have := "test_value", outHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientInPayloadParser(func(inPayload *stats.InPayload, span zipkin.SpanCustomizer) {
m, ok := inPayload.Payload.(*service.HelloResponse)
if !ok {
t.Fatal("failed to cast the payload as a service.HelloRequest")
}
if want, have := "World", m.Payload; want != have {
t.Errorf("incorrect payload: want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientInHeaderParser(func(inHeader *stats.InHeader, span zipkin.SpanCustomizer) {
if want, have := "test_value_1", inHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.SpanCustomizer) {
if want, have := "test_value_2", inTrailer.Trailer.Get("test_key")[0]; want != have {
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
)),
)

if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()

client := service.NewHelloServiceClient(conn)

ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key", "test_value"))
_, err = client.Hello(ctx, &service.HelloRequest{
Payload: "Hello",
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

spans := flusher()
if want, have := 1, len(spans); want != have {
t.Errorf("unexpected number of spans, want %d, have %d", want, have)
}
}
45 changes: 43 additions & 2 deletions middleware/grpc/grpc_suite_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The OpenZipkin Authors
// Copyright 2020 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@ package grpc_test
import (
"context"
"errors"
"log"
"net"
"testing"

Expand All @@ -26,6 +27,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"

"github.com/openzipkin/zipkin-go"
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
Expand Down Expand Up @@ -121,8 +123,10 @@ func (g *sequentialIdGenerator) reset() {
g.nextSpanId = g.start
}

type TestHelloService struct{
type TestHelloService struct {
service.UnimplementedHelloServiceServer
responseHeader metadata.MD
responseTrailer metadata.MD
}

func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest) (*service.HelloResponse, error) {
Expand Down Expand Up @@ -156,5 +160,42 @@ func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest)
}
}

grpc.SetTrailer(ctx, s.responseTrailer)
grpc.SendHeader(ctx, s.responseHeader)

return resp, nil
}

func initListener(s *grpc.Server) func(context.Context, string) (net.Conn, error) {
const bufSize = 1024 * 1024

listener := bufconn.Listen(bufSize)
bufDialer := func(context.Context, string) (net.Conn, error) {
return listener.Dial()
}

go func() {
if err := s.Serve(listener); err != nil {
log.Fatalf("Server exited with error: %v", err)
}
}()

return bufDialer
}

func createTracer(joinSpans bool) (*zipkin.Tracer, func() []model.SpanModel) {
recorder := recorder.NewReporter()
ep, _ := zipkin.NewEndpoint("grpc-server", "")

serverIdGenerator = newSequentialIdGenerator(0x1000000)

tracer, _ := zipkin.NewTracer(
recorder,
zipkin.WithLocalEndpoint(ep),
zipkin.WithSharedSpans(joinSpans),
zipkin.WithIDGenerator(serverIdGenerator),
)
return tracer, func() []model.SpanModel {
return recorder.Flush()
}
}
49 changes: 45 additions & 4 deletions middleware/grpc/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The OpenZipkin Authors
// Copyright 2020 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -25,8 +25,9 @@ import (
)

type serverHandler struct {
tracer *zipkin.Tracer
defaultTags map[string]string
tracer *zipkin.Tracer
defaultTags map[string]string
handleRPCParser handleRPCParser
}

// A ServerOption can be passed to NewServerHandler to customize the returned handler.
Expand All @@ -39,6 +40,46 @@ func ServerTags(tags map[string]string) ServerOption {
}
}

// WithServerInPayloadParser adds a parser for the stats.InPayload to be able to access
// the incoming request payload
func WithServerInPayloadParser(parser func(*stats.InPayload, zipkin.SpanCustomizer)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inPayload = parser
}
}

// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access
// the incoming request header
func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.SpanCustomizer)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inHeader = parser
}
}

// WithServerOutPayloadParser adds a parser for the stats.OutPayload to be able to access
// the outgoing response payload
func WithServerOutPayloadParser(parser func(*stats.OutPayload, zipkin.SpanCustomizer)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.outPayload = parser
}
}

// WithServerOutTrailerParser adds a parser for the stats.OutTrailer to be able to access
// the outgoing response trailer
func WithServerOutTrailerParser(parser func(*stats.OutTrailer, zipkin.SpanCustomizer)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.outTrailer = parser
}
}

// WithServerOutHeaderParser adds a parser for the stats.OutHeader to be able to access
// the outgoing response payload
func WithServerOutHeaderParser(parser func(*stats.OutHeader, zipkin.SpanCustomizer)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.outHeader = parser
}
}

// NewServerHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add
// tracing to a gRPC server. The gRPC method name is used as the span name and by default the only
// tags are the gRPC status code if the call fails. Use ServerTags to add additional tags that
Expand Down Expand Up @@ -66,7 +107,7 @@ func (s *serverHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) con

// HandleRPC implements per-RPC tracing and stats instrumentation.
func (s *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
handleRPC(ctx, rs)
handleRPC(ctx, rs, s.handleRPCParser)
}

// TagRPC implements per-RPC context management.
Expand Down
Loading