Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

module/apmgrpc: add stream interceptors #918

Merged
merged 2 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions docs/instrumenting.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,15 @@ import (
)

func main() {
server := grpc.NewServer(grpc.UnaryInterceptor(apmgrpc.NewUnaryServerInterceptor()))
server := grpc.NewServer(
grpc.UnaryInterceptor(apmgrpc.NewUnaryServerInterceptor()),
grpc.StreamInterceptor(apmgrpc.NewStreamServerInterceptor()),
)
...
conn, err := grpc.Dial(addr, grpc.WithUnaryInterceptor(apmgrpc.NewUnaryClientInterceptor()))
conn, err := grpc.Dial(addr,
grpc.WithUnaryInterceptor(apmgrpc.NewUnaryClientInterceptor()),
gprc.WithStreamInterceptor(apmgrpc.NewStreamClientInterceptor()),
)
...
}
----
Expand All @@ -172,8 +178,11 @@ server := grpc.NewServer(grpc.UnaryInterceptor(
...
----

There is currently no support for intercepting at the stream level. Please file an issue and/or
send a pull request if this is something you need.
Stream interceptors emit transactions and spans that represent the entire stream,
and not individual messages. For client streams, spans will be ended when the request
fails; when any of `grpc.ClientStream.RecvMsg`, `grpc.ClientStream.SendMsg`, or
`grpc.ClientStream.Header` return with an error; or when `grpc.ClientStream.RecvMsg`
returns for a non-streaming server method.

[[builtin-modules-apmhttp]]
==== module/apmhttp
Expand Down
6 changes: 3 additions & 3 deletions docs/supported-tech.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ the MongoDB Go Driver instrumentation.

We support https://grpc.io/[gRPC]
https://github.com/grpc/grpc-go/releases/tag/v1.3.0[v1.3.0] and greater.
We provide unary interceptors for both the client and server. The server
interceptor will create a transaction for each incoming request, and
the client interceptor will create a span for each outgoing request.
We provide unary and stream interceptors for both the client and server.
The server interceptor will create a transaction for each incoming request,
and the client interceptor will create a span for each outgoing request.

See <<builtin-modules-apmgrpc, module/apmgrpc>> for more information
about gRPC instrumentation.
Expand Down
119 changes: 116 additions & 3 deletions module/apmgrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package apmgrpc // import "go.elastic.co/apm/module/apmgrpc"

import (
"net"
"sync"

"golang.org/x/net/context"
"google.golang.org/grpc"
Expand All @@ -35,9 +36,9 @@ import (
// NewUnaryClientInterceptor returns a grpc.UnaryClientInterceptor that
// traces gRPC requests with the given options.
//
// The interceptor will trace spans with the "grpc" type for each request
// made, for any client method presented with a context containing a sampled
// apm.Transaction.
// The interceptor will trace spans with the "external.grpc" type for each
// request made, for any client method presented with a context containing
// a sampled apm.Transaction.
func NewUnaryClientInterceptor(o ...ClientOption) grpc.UnaryClientInterceptor {
opts := clientOptions{}
for _, o := range o {
Expand Down Expand Up @@ -75,6 +76,105 @@ func NewUnaryClientInterceptor(o ...ClientOption) grpc.UnaryClientInterceptor {
}
}

// NewStreamClientInterceptor returns a grpc.UnaryClientInterceptor that
// traces gRPC requests with the given options.
//
// The interceptor will trace spans with the "external.grpc" type for each
// stream request made, for any client method presented with a context
// containing a sampled apm.Transaction.
//
// Spans are ended when the stream is closed, which can happen in various
// ways: the initial stream setup request fails, Header, SendMsg or RecvMsg
// return with an error, or RecvMsg returns for a non-streaming server.
func NewStreamClientInterceptor(o ...ClientOption) grpc.StreamClientInterceptor {
opts := clientOptions{}
for _, o := range o {
o(&opts)
}
return func(
ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
var peer peer.Peer
span, ctx := startSpan(ctx, method)
if span != nil {
opts = append(opts, grpc.Peer(&peer))
}
stream, err := streamer(ctx, desc, cc, method, opts...)
if span != nil {
if err != nil {
setSpanOutcome(span, err)
setSpanContext(span, peer)
span.End()
} else if stream != nil {
wrapped := &clientStream{ClientStream: stream}
go func() {
defer span.End()
// Header blocks until headers are available
// or the stream is ended. Either way, after
// Header returns, it is safe to call Context().
stream.Header()
<-stream.Context().Done()
err := wrapped.getError()
setSpanOutcome(span, err)
setSpanContext(span, peer)
}()
stream = wrapped
}
}
return stream, err
}
}

// clientStream wraps grpc.ClientStream to intercept errors.
type clientStream struct {
grpc.ClientStream
mu sync.RWMutex
err error
}

func (s *clientStream) CloseSend() error {
err := s.ClientStream.CloseSend()
s.setError(err)
return err
}

func (s *clientStream) Header() (metadata.MD, error) {
md, err := s.ClientStream.Header()
s.setError(err)
return md, err
}

func (s *clientStream) SendMsg(m interface{}) error {
err := s.ClientStream.SendMsg(m)
s.setError(err)
return err
}

func (s *clientStream) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
s.setError(err)
return err
}

func (s *clientStream) getError() error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.err
}

func (s *clientStream) setError(err error) {
if err != nil {
s.mu.Lock()
s.err = err
s.mu.Unlock()
}
}

func startSpan(ctx context.Context, name string) (*apm.Span, context.Context) {
tx := apm.TransactionFromContext(ctx)
if tx == nil {
Expand All @@ -93,6 +193,19 @@ func startSpan(ctx context.Context, name string) (*apm.Span, context.Context) {
return span, outgoingContextWithTraceContext(ctx, traceContext, propagateLegacyHeader)
}

func setSpanContext(span *apm.Span, peer peer.Peer) {
if peer.Addr != nil {
if tcpAddr, ok := peer.Addr.(*net.TCPAddr); ok {
span.Context.SetDestinationAddress(tcpAddr.IP.String(), tcpAddr.Port)
}
addrString := peer.Addr.String()
span.Context.SetDestinationService(apm.DestinationServiceSpanContext{
Name: addrString,
Resource: addrString,
})
}
}

func outgoingContextWithTraceContext(
ctx context.Context,
traceContext apm.TraceContext,
Expand Down
80 changes: 72 additions & 8 deletions module/apmgrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package apmgrpc_test

import (
"io"
"net"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -34,6 +36,7 @@ import (
"go.elastic.co/apm"
"go.elastic.co/apm/apmtest"
"go.elastic.co/apm/model"
"go.elastic.co/apm/module/apmgrpc/internal/testservice"
"go.elastic.co/apm/module/apmhttp"
"go.elastic.co/apm/transport/transporttest"
)
Expand All @@ -55,11 +58,11 @@ func testClientSpan(t *testing.T, traceparentHeaders ...string) {

serverTracer, serverTransport := transporttest.NewRecorderTracer()
defer serverTracer.Close()
s, _, addr := newServer(t, serverTracer)
s, _, addr := newGreeterServer(t, serverTracer)
defer s.GracefulStop()
tcpAddr := addr.(*net.TCPAddr)

conn, client := newClient(t, addr)
conn, client := newGreeterClient(t, addr)
defer conn.Close()
resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "birita"})
require.NoError(t, err)
Expand Down Expand Up @@ -131,10 +134,10 @@ func testClientSpan(t *testing.T, traceparentHeaders ...string) {
func TestClientSpanDropped(t *testing.T) {
serverTracer := apmtest.NewRecordingTracer()
defer serverTracer.Close()
s, _, addr := newServer(t, serverTracer.Tracer)
s, _, addr := newGreeterServer(t, serverTracer.Tracer)
defer s.GracefulStop()

conn, client := newClient(t, addr)
conn, client := newGreeterClient(t, addr)
defer conn.Close()

clientTracer := apmtest.NewRecordingTracer()
Expand Down Expand Up @@ -162,10 +165,10 @@ func TestClientSpanDropped(t *testing.T) {
func TestClientTransactionUnsampled(t *testing.T) {
serverTracer := apmtest.NewRecordingTracer()
defer serverTracer.Close()
s, _, addr := newServer(t, serverTracer.Tracer)
s, _, addr := newGreeterServer(t, serverTracer.Tracer)
defer s.GracefulStop()

conn, client := newClient(t, addr)
conn, client := newGreeterClient(t, addr)
defer conn.Close()

clientTracer := apmtest.NewRecordingTracer()
Expand All @@ -186,10 +189,10 @@ func TestClientTransactionUnsampled(t *testing.T) {
}

func TestClientOutcome(t *testing.T) {
s, helloworldServer, addr := newServer(t, apmtest.DiscardTracer)
s, helloworldServer, addr := newGreeterServer(t, apmtest.DiscardTracer)
defer s.GracefulStop()

conn, client := newClient(t, addr)
conn, client := newGreeterClient(t, addr)
defer conn.Close()

clientTracer := apmtest.NewRecordingTracer()
Expand All @@ -210,3 +213,64 @@ func TestClientOutcome(t *testing.T) {
assert.Equal(t, "failure", spans[1].Outcome) // unknown error
assert.Equal(t, "failure", spans[2].Outcome)
}

func TestStreamClientSpan(t *testing.T) {
clientTracer, clientTransport := transporttest.NewRecorderTracer()
defer clientTracer.Close()

serverTracer, serverTransport := transporttest.NewRecorderTracer()
defer serverTracer.Close()
s, _, addr := newAccumulatorServer(t, serverTracer)
defer s.GracefulStop()

conn, client := newAccumulatorClient(t, addr)
defer conn.Close()

clientTransaction := clientTracer.StartTransaction("name", "type")
ctx := apm.ContextWithTransaction(context.Background(), clientTransaction)

stream, err := client.Accumulate(ctx)
require.NoError(t, err)
err = stream.Send(&testservice.AccumulateRequest{Value: 123})
require.NoError(t, err)
reply, err := stream.Recv()
require.NoError(t, err)
assert.Equal(t, int64(123), reply.Value)

err = stream.CloseSend()
require.NoError(t, err)
_, err = stream.Recv()
assert.Equal(t, io.EOF, err)

timeout := time.NewTimer(10 * time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
defer timeout.Stop()
defer ticker.Stop()
var done bool
for !done {
select {
case <-ticker.C:
clientTracer.Flush(nil)
if len(clientTransport.Payloads().Spans) > 0 {
done = true
}
case <-timeout.C:
t.Fatal("timed out waiting for client span to end")
}
}
clientTransaction.End()

clientTracer.Flush(nil)
clientPayloads := clientTransport.Payloads()
require.Len(t, clientPayloads.Transactions, 1)
require.Len(t, clientPayloads.Spans, 1)
assert.Equal(t, "/go.elastic.co.apm.module.apmgrpc.testservice.Accumulator/Accumulate", clientPayloads.Spans[0].Name)
assert.Equal(t, "external", clientPayloads.Spans[0].Type)
assert.Equal(t, "grpc", clientPayloads.Spans[0].Subtype)

serverTracer.Flush(nil)
serverPayloads := serverTransport.Payloads()
require.Len(t, serverPayloads.Transactions, 1)
assert.Equal(t, clientPayloads.Spans[0].ID, serverPayloads.Transactions[0].ParentID)
assert.Equal(t, clientPayloads.Spans[0].TraceID, serverPayloads.Transactions[0].TraceID)
}
1 change: 1 addition & 0 deletions module/apmgrpc/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module go.elastic.co/apm/module/apmgrpc

require (
github.com/golang/protobuf v1.2.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/stretchr/testify v1.4.0
go.elastic.co/apm v1.11.0
Expand Down
18 changes: 14 additions & 4 deletions module/apmgrpc/ignorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,27 @@ package apmgrpc // import "go.elastic.co/apm/module/apmgrpc"

import (
"regexp"
"sync"

"google.golang.org/grpc"
)

var (
defaultServerRequestIgnorerOnce sync.Once
defaultServerRequestIgnorer RequestIgnorerFunc = IgnoreNone
defaultServerRequestIgnorer RequestIgnorerFunc = IgnoreNone
defaultServerStreamIgnorer StreamIgnorerFunc = IgnoreNoneStream
)

// DefaultServerRequestIgnorer returns the default RequestIgnorer to use in
// DefaultServerRequestIgnorer returns the default RequestIgnorerFunc to use in
// handlers.
func DefaultServerRequestIgnorer() RequestIgnorerFunc {
return defaultServerRequestIgnorer
}

// DefaultServerStreamIgnorer returns the default StreamIgnorerFunc to use in
// handlers.
func DefaultServerStreamIgnorer() StreamIgnorerFunc {
return defaultServerStreamIgnorer
}

// NewRegexpRequestIgnorer returns a RequestIgnorerFunc which matches requests'
// URLs against re. Note that for server requests, typically only Path and
// possibly RawQuery will be set, so the regular expression should take this
Expand All @@ -54,3 +59,8 @@ func NewRegexpRequestIgnorer(re *regexp.Regexp) RequestIgnorerFunc {
func IgnoreNone(*grpc.UnaryServerInfo) bool {
return false
}

// IgnoreNoneStream is a StreamIgnorerFunc which ignores no stream requests.
func IgnoreNoneStream(*grpc.StreamServerInfo) bool {
return false
}
20 changes: 20 additions & 0 deletions module/apmgrpc/internal/testservice/generate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:generate protoc --go_out=plugins=grpc:. --go_opt=paths=source_relative testservice.proto

package testservice
Loading