Skip to content

Commit

Permalink
otelgrpc: StreamClientInterceptor ends spans synchronously (#4537)
Browse files Browse the repository at this point in the history
* otelgrpc: StreamClientInterceptor ends spans synchronously

* Update changelog

* Update grpc_test.go

* Add BenchmarkStreamClientInterceptor
  • Loading branch information
pellared authored Nov 14, 2023
1 parent 5ba7d1e commit 7469f61
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 95 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108)

### Fixed

- Fix `StreamClientInterceptor` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc` to end the spans synchronously. (#4537)

## [1.21.0/0.46.0/0.15.0/0.1.0] - 2023-11-10

### Added
Expand Down
91 changes: 20 additions & 71 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,13 @@ func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
}
}

type streamEventType int

type streamEvent struct {
Type streamEventType
Err error
}

const (
receiveEndEvent streamEventType = iota
errorEvent
)

// clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
// SendMsg method call.
type clientStream struct {
grpc.ClientStream
desc *grpc.StreamDesc

desc *grpc.StreamDesc
events chan streamEvent
eventsDone chan struct{}
finished chan error
span trace.Span

receivedEvent bool
sentEvent bool
Expand All @@ -160,11 +146,11 @@ func (w *clientStream) RecvMsg(m interface{}) error {
err := w.ClientStream.RecvMsg(m)

if err == nil && !w.desc.ServerStreams {
w.sendStreamEvent(receiveEndEvent, nil)
w.endSpan(nil)
} else if err == io.EOF {
w.sendStreamEvent(receiveEndEvent, nil)
w.endSpan(nil)
} else if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)
} else {
w.receivedMessageID++

Expand All @@ -186,7 +172,7 @@ func (w *clientStream) SendMsg(m interface{}) error {
}

if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)
}

return err
Expand All @@ -195,7 +181,7 @@ func (w *clientStream) SendMsg(m interface{}) error {
func (w *clientStream) Header() (metadata.MD, error) {
md, err := w.ClientStream.Header()
if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)
}

return md, err
Expand All @@ -204,54 +190,32 @@ func (w *clientStream) Header() (metadata.MD, error) {
func (w *clientStream) CloseSend() error {
err := w.ClientStream.CloseSend()
if err != nil {
w.sendStreamEvent(errorEvent, err)
w.endSpan(err)
}

return err
}

func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, cfg *config) *clientStream {
events := make(chan streamEvent)
eventsDone := make(chan struct{})
finished := make(chan error)

go func() {
defer close(eventsDone)

for {
select {
case event := <-events:
switch event.Type {
case receiveEndEvent:
finished <- nil
return
case errorEvent:
finished <- event.Err
return
}
case <-ctx.Done():
finished <- ctx.Err()
return
}
}
}()

func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, cfg *config) *clientStream {
return &clientStream{
ClientStream: s,
span: span,
desc: desc,
events: events,
eventsDone: eventsDone,
finished: finished,
receivedEvent: cfg.ReceivedEvent,
sentEvent: cfg.SentEvent,
}
}

func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
select {
case <-w.eventsDone:
case w.events <- streamEvent{Type: eventType, Err: err}:
func (w *clientStream) endSpan(err error) {
if err != nil {
s, _ := status.FromError(err)
w.span.SetStatus(codes.Error, s.Message())
w.span.SetAttributes(statusCodeAttr(s.Code()))
} else {
w.span.SetAttributes(statusCodeAttr(grpc_codes.OK))
}

w.span.End()
}

// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
Expand Down Expand Up @@ -306,22 +270,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
span.End()
return s, err
}
stream := wrapClientStream(ctx, s, desc, cfg)

go func() {
err := <-stream.finished

if err != nil {
s, _ := status.FromError(err)
span.SetStatus(codes.Error, s.Message())
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
}

span.End()
}()

stream := wrapClientStream(ctx, s, desc, span, cfg)
return stream, nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func TestStatsHandler(t *testing.T) {

listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to open port")
err = newGrpcTest(
listener,
client := newGrpcTest(t, listener,
[]grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(
otelgrpc.WithTracerProvider(clientTP),
Expand All @@ -66,7 +65,7 @@ func TestStatsHandler(t *testing.T) {
),
},
)
require.NoError(t, err)
doCalls(client)

t.Run("ClientSpans", func(t *testing.T) {
checkClientSpans(t, clientSR.Ended())
Expand Down
34 changes: 13 additions & 21 deletions instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"net"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -46,14 +45,18 @@ var wantInstrumentationScope = instrumentation.Scope{
Version: otelgrpc.Version(),
}

// newGrpcTest creats a grpc server, starts it, and executes all the calls, closes everything down.
func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) error {
// newGrpcTest creats a grpc server, starts it, and returns the client, closes everything down during test cleanup.
func newGrpcTest(t testing.TB, listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.ServerOption) pb.TestServiceClient {
grpcServer := grpc.NewServer(sOpt...)
pb.RegisterTestServiceServer(grpcServer, interop.NewTestServer())
errCh := make(chan error)
go func() {
errCh <- grpcServer.Serve(listener)
}()
t.Cleanup(func() {
grpcServer.Stop()
assert.NoError(t, <-errCh)
})
ctx := context.Background()

cOpt = append(cOpt, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand All @@ -68,17 +71,12 @@ func newGrpcTest(listener net.Listener, cOpt []grpc.DialOption, sOpt []grpc.Serv
listener.Addr().String(),
cOpt...,
)
if err != nil {
return err
}
client := pb.NewTestServiceClient(conn)

doCalls(client)

conn.Close()
grpcServer.Stop()
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, conn.Close())
})

return <-errCh
return pb.NewTestServiceClient(conn)
}

func doCalls(client pb.TestServiceClient) {
Expand Down Expand Up @@ -106,7 +104,7 @@ func TestInterceptors(t *testing.T) {

listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to open port")
err = newGrpcTest(listener,
client := newGrpcTest(t, listener,
[]grpc.DialOption{
//nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(
Expand All @@ -133,19 +131,13 @@ func TestInterceptors(t *testing.T) {
)),
},
)
require.NoError(t, err)
doCalls(client)

t.Run("UnaryClientSpans", func(t *testing.T) {
checkUnaryClientSpans(t, clientUnarySR.Ended(), listener.Addr().String())
})

t.Run("StreamClientSpans", func(t *testing.T) {
// StreamClientInterceptor ends the spans asynchronously.
// We need to wait for all spans before asserting them.
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Len(c, clientStreamSR.Ended(), 3)
}, 5*time.Second, 100*time.Millisecond)

checkStreamClientSpans(t, clientStreamSR.Ended(), listener.Addr().String())
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"google.golang.org/grpc"
grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/interop"
"google.golang.org/grpc/interop/grpc_testing"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -1128,3 +1129,20 @@ func assertServerMetrics(t *testing.T, reader metric.Reader, serviceName, name s
require.Len(t, rm.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, want, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
}

func BenchmarkStreamClientInterceptor(b *testing.B) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(b, err, "failed to open port")
client := newGrpcTest(b, listener,
[]grpc.DialOption{
//nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
},
[]grpc.ServerOption{},
)

b.ResetTimer()
for i := 0; i < b.N; i++ {
interop.DoClientStreaming(client)
}
}

0 comments on commit 7469f61

Please sign in to comment.