diff --git a/module/apmgrpc/server.go b/module/apmgrpc/server.go index 1e14472f7..21d234c46 100644 --- a/module/apmgrpc/server.go +++ b/module/apmgrpc/server.go @@ -36,8 +36,6 @@ import ( "go.elastic.co/apm" "go.elastic.co/apm/module/apmhttp" - - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" ) var ( @@ -136,8 +134,8 @@ func NewStreamServerInterceptor(o ...ServerOption) grpc.StreamServerInterceptor tx, ctx := startTransaction(ctx, opts.tracer, info.FullMethod) defer tx.End() - wrapped := grpc_middleware.WrapServerStream(stream) - wrapped.WrappedContext = ctx + wrapped := wrapServerStream(stream) + wrapped.wrappedContext = ctx // TODO(axw) define span context schema for RPC, // including at least the peer address. @@ -316,3 +314,20 @@ func WithServerStreamIgnorer(s StreamIgnorerFunc) ServerOption { o.streamIgnorer = s } } + +// wrappedServerStream is a thin wrapper around grpc.ServerStream that allows modifying context. +type wrappedServerStream struct { + grpc.ServerStream + // wrappedContext is the wrapper's own Context. You can assign it. + wrappedContext context.Context +} + +// Context returns the wrapper's WrappedContext, overwriting the nested grpc.ServerStream.Context() +func (w *wrappedServerStream) Context() context.Context { + return w.wrappedContext +} + +// wrapServerStream returns a ServerStream that has the ability to overwrite context. +func wrapServerStream(stream grpc.ServerStream) *wrappedServerStream { + return &wrappedServerStream{ServerStream: stream, wrappedContext: stream.Context()} +}