Skip to content

Commit

Permalink
Update grpctrace instrumentation span names (#922)
Browse files Browse the repository at this point in the history
* Update grpctrace instrumentation span names

Span names MUST not contain the leading slash (`/`) that the grpc
package prepends to all `FullMethod` values. This replaces the
`serviceFromFullMethod` function with a parsing function. This parsing
function returns an span name adhering to the OpenTelemetry semantic
conventions as well as formatted span attributes.

Additionally, the service name needs to include the package if one
exists. This updates that attribute accordingly.

Once #900 is merged the method attributes can be added by uncommenting.

Resolves #916

* Update Changelog

* Update comment to plural

* Switch from regexp to string parsing

* Consolidate attributes before creating span

* Update Changelog with addition of rpc.method in grpctrace

* Fix test spanMap lookup key

* Update instrumentation/grpctrace/interceptor.go

Co-authored-by: ET <[email protected]>

* Unify on explicit typed return value

* Fix copy paste error

Co-authored-by: ET <[email protected]>
  • Loading branch information
MrAlias and evantorrie authored Jul 9, 2020
1 parent d2913b7 commit 54fffd6
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 47 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add test for api.standard `HTTPClientAttributesFromHTTPRequest`. (#905)
- Bump github.com/golangci/golangci-lint from 1.27.0 to 1.28.1 in /tools. (#901, #913)
- Update otel-colector example to use the v0.5.0 collector. (#915)
- The `grpctrace` instrumentation uses a span name conforming to the OpenTelemetry semantic conventions (does not contain a leading slash (`/`)). (#922)
- The `grpctrace` instrumentation includes an `rpc.method` attribute now set to the gRPC method name. (#900, #922)
- The `grpctrace` instrumentation `rpc.service` attribute now contains the package name if one exists.
This is in accordance with OpenTelemetry semantic conventions. (#922)
- Correlation Context extractor will no longer insert an empty map into the returned context when no valid values are extracted. (#923)

## [0.7.0] - 2020-06-26
Expand Down
78 changes: 47 additions & 31 deletions instrumentation/grpctrace/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ func UnaryClientInterceptor(tracer trace.Tracer) grpc.UnaryClientInterceptor {
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()

name, attr := spanInfo(method, cc.Target())
var span trace.Span
ctx, span = tracer.Start(
ctx, method,
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(peerInfoFromTarget(cc.Target())...),
trace.WithAttributes(standard.RPCSystemGRPC),
trace.WithAttributes(serviceAndMethodFromFullName(method)...),
trace.WithAttributes(attr...),
)
defer span.End()

Expand Down Expand Up @@ -260,13 +260,13 @@ func StreamClientInterceptor(tracer trace.Tracer) grpc.StreamClientInterceptor {
requestMetadata, _ := metadata.FromOutgoingContext(ctx)
metadataCopy := requestMetadata.Copy()

name, attr := spanInfo(method, cc.Target())
var span trace.Span
ctx, span = tracer.Start(
ctx, method,
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(peerInfoFromTarget(cc.Target())...),
trace.WithAttributes(standard.RPCSystemGRPC),
trace.WithAttributes(serviceAndMethodFromFullName(method)...),
trace.WithAttributes(attr...),
)

Inject(ctx, &metadataCopy)
Expand Down Expand Up @@ -315,13 +315,12 @@ func UnaryServerInterceptor(tracer trace.Tracer) grpc.UnaryServerInterceptor {
MultiKV: entries,
}))

name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, spanCtx),
info.FullMethod,
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(peerInfoFromContext(ctx)...),
trace.WithAttributes(standard.RPCSystemGRPC),
trace.WithAttributes(serviceAndMethodFromFullName(info.FullMethod)...),
trace.WithAttributes(attr...),
)
defer span.End()

Expand Down Expand Up @@ -406,13 +405,12 @@ func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor {
MultiKV: entries,
}))

name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, spanCtx),
info.FullMethod,
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(peerInfoFromContext(ctx)...),
trace.WithAttributes(standard.RPCSystemGRPC),
trace.WithAttributes(serviceAndMethodFromFullName(info.FullMethod)...),
trace.WithAttributes(attr...),
)
defer span.End()

Expand All @@ -427,11 +425,21 @@ func StreamServerInterceptor(tracer trace.Tracer) grpc.StreamServerInterceptor {
}
}

func peerInfoFromTarget(target string) []kv.KeyValue {
host, port, err := net.SplitHostPort(target)
// spanInfo returns a span name and all appropriate attributes from the gRPC
// method and peer address.
func spanInfo(fullMethod, peerAddress string) (string, []kv.KeyValue) {
attrs := []kv.KeyValue{standard.RPCSystemGRPC}
name, mAttrs := parseFullMethod(fullMethod)
attrs = append(attrs, mAttrs...)
attrs = append(attrs, peerAttr(peerAddress)...)
return name, attrs
}

// peerAttr returns attributes about the peer address.
func peerAttr(addr string) []kv.KeyValue {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return []kv.KeyValue{}
return []kv.KeyValue(nil)
}

if host == "" {
Expand All @@ -444,24 +452,32 @@ func peerInfoFromTarget(target string) []kv.KeyValue {
}
}

func peerInfoFromContext(ctx context.Context) []kv.KeyValue {
// peerFromCtx returns a peer address from a context, if one exists.
func peerFromCtx(ctx context.Context) string {
p, ok := peer.FromContext(ctx)

if !ok {
return []kv.KeyValue{}
return ""
}

return peerInfoFromTarget(p.Addr.String())
return p.Addr.String()
}

func serviceAndMethodFromFullName(method string) []kv.KeyValue {
l := strings.LastIndexByte(method, '/')
if l == -1 {
return []kv.KeyValue{}
// parseFullMethod returns a span name following the OpenTelemetry semantic
// conventions as well as all applicable span kv.KeyValue attributes based
// on a gRPC's FullMethod.
func parseFullMethod(fullMethod string) (string, []kv.KeyValue) {
name := strings.TrimLeft(fullMethod, "/")
parts := strings.SplitN(name, "/", 2)
if len(parts) != 2 {
// Invalid format, does not follow `/package.service/method`.
return name, []kv.KeyValue(nil)
}

return []kv.KeyValue{
standard.RPCServiceKey.String(method[:l]),
standard.RPCMethodKey.String(method[l+1:]),
var attrs []kv.KeyValue
if service := parts[0]; service != "" {
attrs = append(attrs, standard.RPCServiceKey.String(service))
}
if method := parts[1]; method != "" {
attrs = append(attrs, standard.RPCMethodKey.String(method))
}
return name, attrs
}
100 changes: 84 additions & 16 deletions instrumentation/grpctrace/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,17 @@ func TestUnaryClientInterceptor(t *testing.T) {
uniInterceptorInvoker := &mockUICInvoker{}

checks := []struct {
method string
name string
expectedAttr map[kv.Key]value.Value
eventsAttr []map[kv.Key]value.Value
}{
{
name: "/github.com.serviceName/bar",
method: "/github.com.serviceName/bar",
name: "github.com.serviceName/bar",
expectedAttr: map[kv.Key]value.Value{
standard.RPCSystemKey: value.String("grpc"),
standard.RPCServiceKey: value.String("/github.com.serviceName"),
standard.RPCServiceKey: value.String("github.com.serviceName"),
standard.RPCMethodKey: value.String("bar"),
standard.NetPeerIPKey: value.String("fake"),
standard.NetPeerPortKey: value.String("connection"),
Expand All @@ -117,10 +119,11 @@ func TestUnaryClientInterceptor(t *testing.T) {
},
},
{
name: "/serviceName/bar",
method: "/serviceName/bar",
name: "serviceName/bar",
expectedAttr: map[kv.Key]value.Value{
standard.RPCSystemKey: value.String("grpc"),
standard.RPCServiceKey: value.String("/serviceName"),
standard.RPCServiceKey: value.String("serviceName"),
standard.RPCMethodKey: value.String("bar"),
standard.NetPeerIPKey: value.String("fake"),
standard.NetPeerPortKey: value.String("connection"),
Expand All @@ -139,7 +142,8 @@ func TestUnaryClientInterceptor(t *testing.T) {
},
},
{
name: "serviceName/bar",
method: "serviceName/bar",
name: "serviceName/bar",
expectedAttr: map[kv.Key]value.Value{
standard.RPCSystemKey: value.String("grpc"),
standard.RPCServiceKey: value.String("serviceName"),
Expand All @@ -161,7 +165,8 @@ func TestUnaryClientInterceptor(t *testing.T) {
},
},
{
name: "invalidName",
method: "invalidName",
name: "invalidName",
expectedAttr: map[kv.Key]value.Value{
standard.RPCSystemKey: value.String("grpc"),
standard.NetPeerIPKey: value.String("fake"),
Expand All @@ -181,10 +186,11 @@ func TestUnaryClientInterceptor(t *testing.T) {
},
},
{
name: "/github.com.foo.serviceName_123/method",
method: "/github.com.foo.serviceName_123/method",
name: "github.com.foo.serviceName_123/method",
expectedAttr: map[kv.Key]value.Value{
standard.RPCSystemKey: value.String("grpc"),
standard.RPCServiceKey: value.String("/github.com.foo.serviceName_123"),
standard.RPCServiceKey: value.String("github.com.foo.serviceName_123"),
standard.RPCMethodKey: value.String("method"),
standard.NetPeerIPKey: value.String("fake"),
standard.NetPeerPortKey: value.String("connection"),
Expand All @@ -205,7 +211,7 @@ func TestUnaryClientInterceptor(t *testing.T) {
}

for _, check := range checks {
err = unaryInterceptor(context.Background(), check.name, req, reply, clientConn, uniInterceptorInvoker.invoker)
err = unaryInterceptor(context.Background(), check.method, req, reply, clientConn, uniInterceptorInvoker.invoker)
if err != nil {
t.Errorf("failed to run unary interceptor: %v", err)
continue
Expand Down Expand Up @@ -298,12 +304,13 @@ func TestStreamClientInterceptor(t *testing.T) {
streamCI := StreamClientInterceptor(tracer)

var mockClStr mockClientStream
methodName := "/github.com.serviceName/bar"
method := "/github.com.serviceName/bar"
name := "github.com.serviceName/bar"

streamClient, err := streamCI(context.Background(),
&grpc.StreamDesc{ServerStreams: true},
clientConn,
methodName,
method,
func(ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
Expand All @@ -318,7 +325,7 @@ func TestStreamClientInterceptor(t *testing.T) {
}

// no span exported while stream is open
if _, ok := exp.spanMap[methodName]; ok {
if _, ok := exp.spanMap[name]; ok {
t.Fatalf("span shouldn't end while stream is open")
}

Expand All @@ -341,21 +348,21 @@ func TestStreamClientInterceptor(t *testing.T) {
for retry := 0; retry < 5; retry++ {
ok := false
exp.mu.Lock()
spanData, ok = exp.spanMap[methodName]
spanData, ok = exp.spanMap[name]
exp.mu.Unlock()
if ok {
break
}
time.Sleep(time.Second * 1)
}
if spanData == nil {
t.Fatalf("no span data found for name < %s >", methodName)
t.Fatalf("no span data found for name < %s >", name)
}

attrs := spanData.Attributes
expectedAttr := map[kv.Key]string{
standard.RPCSystemKey: "grpc",
standard.RPCServiceKey: "/github.com.serviceName",
standard.RPCServiceKey: "github.com.serviceName",
standard.RPCMethodKey: "bar",
standard.NetPeerIPKey: "fake",
standard.NetPeerPortKey: "connection",
Expand All @@ -365,7 +372,7 @@ func TestStreamClientInterceptor(t *testing.T) {
expected, ok := expectedAttr[attr.Key]
if ok {
if expected != attr.Value.AsString() {
t.Errorf("name: %s invalid %s found. expected %s, actual %s", methodName, string(attr.Key),
t.Errorf("name: %s invalid %s found. expected %s, actual %s", name, string(attr.Key),
expected, attr.Value.AsString())
}
}
Expand Down Expand Up @@ -428,3 +435,64 @@ func TestServerInterceptorError(t *testing.T) {
kv.Int("message.uncompressed_size", 26),
}, span.MessageEvents[1].Attributes)
}

func TestParseFullMethod(t *testing.T) {
tests := []struct {
fullMethod string
name string
attr []kv.KeyValue
}{
{
fullMethod: "/grpc.test.EchoService/Echo",
name: "grpc.test.EchoService/Echo",
attr: []kv.KeyValue{
standard.RPCServiceKey.String("grpc.test.EchoService"),
standard.RPCMethodKey.String("Echo"),
},
}, {
fullMethod: "/com.example.ExampleRmiService/exampleMethod",
name: "com.example.ExampleRmiService/exampleMethod",
attr: []kv.KeyValue{
standard.RPCServiceKey.String("com.example.ExampleRmiService"),
standard.RPCMethodKey.String("exampleMethod"),
},
}, {
fullMethod: "/MyCalcService.Calculator/Add",
name: "MyCalcService.Calculator/Add",
attr: []kv.KeyValue{
standard.RPCServiceKey.String("MyCalcService.Calculator"),
standard.RPCMethodKey.String("Add"),
},
}, {
fullMethod: "/MyServiceReference.ICalculator/Add",
name: "MyServiceReference.ICalculator/Add",
attr: []kv.KeyValue{
standard.RPCServiceKey.String("MyServiceReference.ICalculator"),
standard.RPCMethodKey.String("Add"),
},
}, {
fullMethod: "/MyServiceWithNoPackage/theMethod",
name: "MyServiceWithNoPackage/theMethod",
attr: []kv.KeyValue{
standard.RPCServiceKey.String("MyServiceWithNoPackage"),
standard.RPCMethodKey.String("theMethod"),
},
}, {
fullMethod: "/pkg.srv",
name: "pkg.srv",
attr: []kv.KeyValue(nil),
}, {
fullMethod: "/pkg.srv/",
name: "pkg.srv/",
attr: []kv.KeyValue{
standard.RPCServiceKey.String("pkg.srv"),
},
},
}

for _, test := range tests {
n, a := parseFullMethod(test.fullMethod)
assert.Equal(t, test.name, n)
assert.Equal(t, test.attr, a)
}
}

0 comments on commit 54fffd6

Please sign in to comment.