Skip to content

Commit

Permalink
Add propagator option for gRPC instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
XSAM committed Jul 29, 2020
1 parent 452256c commit 89fcd1a
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions instrumentation/grpctrace/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ var (
// s := grpc.NewServer(
// grpc.WithUnaryInterceptor(grpctrace.UnaryClientInterceptor(tracer)),
// ..., // (existing DialOptions))
func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
func UnaryClientInterceptor(tracer trace.Tracer, opts ...Option) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
callOpts ...grpc.CallOption,
) error {
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()
Expand All @@ -91,12 +91,12 @@ func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
)
defer span.End()

Inject(ctx, &metadataCopy)
Inject(ctx, &metadataCopy, opts...)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)

messageSent.Event(ctx, 1, req)

err := invoker(ctx, method, req, reply, cc, opts...)
err := invoker(ctx, method, req, reply, cc, callOpts...)

messageReceived.Event(ctx, 1, reply)

Expand Down Expand Up @@ -248,14 +248,14 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
// s := grpc.Dial(
// grpc.WithStreamInterceptor(grpctrace.StreamClientInterceptor(tracer)),
// ..., // (existing DialOptions))
func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor {
func StreamClientInterceptor(tracer trace.Tracer, opts ...Option) grpc.StreamClientInterceptor {
return func(
ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption,
callOpts ...grpc.CallOption,
) (grpc.ClientStream, error) {
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()
Expand All @@ -269,10 +269,10 @@ func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor {
trace.WithAttributes(attr...),
)

Inject(ctx, &metadataCopy)
Inject(ctx, &metadataCopy, opts...)
ctx = metadata.NewOutgoingContext(ctx, metadataCopy)

s, err := streamer(ctx, desc, cc, method, opts...)
s, err := streamer(ctx, desc, cc, method, callOpts...)
stream := wrapClientStream(s, desc)

go func() {
Expand Down Expand Up @@ -300,7 +300,7 @@ func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor {
// s := grpc.Dial(
// grpc.UnaryInterceptor(grpctrace.UnaryServerInterceptor(tracer)),
// ..., // (existing ServerOptions))
func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
func UnaryServerInterceptor(tracer trace.Tracer, opts ...Option) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
Expand All @@ -310,7 +310,7 @@ func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy()

entries, spanCtx := Extract(ctx, &metadataCopy)
entries, spanCtx := Extract(ctx, &metadataCopy, opts...)
ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{
MultiKV: entries,
}))
Expand Down Expand Up @@ -388,7 +388,7 @@ func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
// s := grpc.Dial(
// grpc.StreamInterceptor(grpctrace.StreamServerInterceptor(tracer)),
// ..., // (existing ServerOptions))
func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor {
func StreamServerInterceptor(tracer trace.Tracer, opts ...Option) grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
Expand All @@ -400,7 +400,7 @@ func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor {
requestMetadata, _ := metadata.FromIncomingContext(ctx)
metadataCopy := requestMetadata.Copy()

entries, spanCtx := Extract(ctx, &metadataCopy)
entries, spanCtx := Extract(ctx, &metadataCopy, opts...)
ctx = correlation.ContextWithMap(ctx, correlation.NewMap(correlation.MapUpdate{
MultiKV: entries,
}))
Expand Down

0 comments on commit 89fcd1a

Please sign in to comment.