Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple instrumentation from SDK #983

Merged
merged 7 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Use `global.Handle` for span export errors in the OTLP exporter. (#946)
- Correct Go language formatting in the README documentation. (#961)
- Remove default SDK dependencies from the `go.opentelemetry.io/otel/api` package. (#977)
- Remove default SDK dependencies from the `go.opentelemetry.io/otel/instrumentation` package. (#983)
- Move documented examples for `go.opentelemetry.io/otel/instrumentation/grpctrace` interceptors into Go example tests. (#984)

## [0.9.0] - 2020-07-20
Expand Down
219 changes: 75 additions & 144 deletions instrumentation/grpctrace/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace/testtrace"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -32,19 +33,30 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel/api/kv"
export "go.opentelemetry.io/otel/sdk/export/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

type testExporter struct {
mu sync.Mutex
spanMap map[string]*export.SpanData
type SpanRecorder struct {
mu sync.RWMutex
spans map[string]*testtrace.Span
}

func (t *testExporter) ExportSpan(ctx context.Context, s *export.SpanData) {
t.mu.Lock()
defer t.mu.Unlock()
t.spanMap[s.Name] = s
func NewSpanRecorder() *SpanRecorder {
return &SpanRecorder{spans: make(map[string]*testtrace.Span)}
}

func (sr *SpanRecorder) OnStart(span *testtrace.Span) {}

func (sr *SpanRecorder) OnEnd(span *testtrace.Span) {
sr.mu.Lock()
defer sr.mu.Unlock()
sr.spans[span.Name()] = span
}

func (sr *SpanRecorder) Get(name string) (*testtrace.Span, bool) {
sr.mu.RLock()
defer sr.mu.RUnlock()
s, ok := sr.spans[name]
return s, ok
}

type mockUICInvoker struct {
Expand All @@ -69,18 +81,13 @@ func (mm *mockProtoMessage) ProtoMessage() {
}

func TestUnaryClientInterceptor(t *testing.T) {
exp := &testExporter{spanMap: make(map[string]*export.SpanData)}
tp, _ := sdktrace.NewProvider(sdktrace.WithSyncer(exp),
sdktrace.WithConfig(sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
},
))

clientConn, err := grpc.Dial("fake:connection", grpc.WithInsecure())
if err != nil {
t.Fatalf("failed to create client connection: %v", err)
}

sr := NewSpanRecorder()
tp := testtrace.NewProvider(testtrace.WithSpanRecorder(sr))
tracer := tp.Tracer("grpctrace/client")
unaryInterceptor := UnaryClientInterceptor(tracer)

Expand Down Expand Up @@ -210,68 +217,24 @@ func TestUnaryClientInterceptor(t *testing.T) {
}

for _, check := range checks {
err = unaryInterceptor(context.Background(), check.method, req, reply, clientConn, uniInterceptorInvoker.invoker)
if err != nil {
t.Errorf("failed to run unary interceptor: %v", err)
if !assert.NoError(t, unaryInterceptor(context.Background(), check.method, req, reply, clientConn, uniInterceptorInvoker.invoker)) {
continue
}

spanData, ok := exp.spanMap[check.name]
if !ok {
t.Errorf("no span data found for name < %s >", check.name)
span, ok := sr.Get(check.name)
if !assert.True(t, ok, "missing span %q", check.name) {
continue
}
assert.Equal(t, check.expectedAttr, span.Attributes())
assert.Equal(t, check.eventsAttr, eventAttrMap(span.Events()))
}
}

attrs := spanData.Attributes
if len(check.expectedAttr) > len(attrs) {
t.Errorf("attributes received are less than expected attributes, received %d, expected %d",
len(attrs), len(check.expectedAttr))
}
for _, attr := range attrs {
expectedAttr, ok := check.expectedAttr[attr.Key]
if ok {
if expectedAttr != attr.Value {
t.Errorf("name: %s invalid %s found. expected %s, actual %s", check.name, string(attr.Key),
expectedAttr.AsString(), attr.Value.AsString())
}
delete(check.expectedAttr, attr.Key)
} else {
t.Errorf("attribute %s not found in expected attributes map", string(attr.Key))
}
}

// Check if any expected attr not seen
if len(check.expectedAttr) > 0 {
for attr := range check.expectedAttr {
t.Errorf("missing attribute %s in span", string(attr))
}
}

events := spanData.MessageEvents
if len(check.eventsAttr) > len(events) {
t.Errorf("events received are less than expected events, received %d, expected %d",
len(events), len(check.eventsAttr))
}
for event := 0; event < len(check.eventsAttr); event++ {
for _, attr := range events[event].Attributes {
expectedAttr, ok := check.eventsAttr[event][attr.Key]
if ok {
if attr.Value != expectedAttr {
t.Errorf("invalid value for attribute %s in events, expected %s actual %s",
string(attr.Key), attr.Value.AsString(), expectedAttr.AsString())
}
delete(check.eventsAttr[event], attr.Key)
} else {
t.Errorf("attribute in event %s not found in expected attributes map", string(attr.Key))
}
}
if len(check.eventsAttr[event]) > 0 {
for attr := range check.eventsAttr[event] {
t.Errorf("missing attribute %s in span event", string(attr))
}
}
}
func eventAttrMap(events []testtrace.Event) []map[kv.Key]kv.Value {
maps := make([]map[kv.Key]kv.Value, len(events))
for i, event := range events {
maps[i] = event.Attributes
}
return maps
}

type mockClientStream struct {
Expand All @@ -287,26 +250,23 @@ func (mockClientStream) Header() (metadata.MD, error) { return nil, nil }
func (mockClientStream) Trailer() metadata.MD { return nil }

func TestStreamClientInterceptor(t *testing.T) {
exp := &testExporter{spanMap: make(map[string]*export.SpanData)}
tp, _ := sdktrace.NewProvider(sdktrace.WithSyncer(exp),
sdktrace.WithConfig(sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
},
))
clientConn, err := grpc.Dial("fake:connection", grpc.WithInsecure())
if err != nil {
t.Fatalf("failed to create client connection: %v", err)
}

// tracer
sr := NewSpanRecorder()
tp := testtrace.NewProvider(testtrace.WithSpanRecorder(sr))
tracer := tp.Tracer("grpctrace/Server")
streamCI := StreamClientInterceptor(tracer)

var mockClStr mockClientStream
method := "/github.com.serviceName/bar"
name := "github.com.serviceName/bar"

streamClient, err := streamCI(context.Background(),
streamClient, err := streamCI(
context.Background(),
&grpc.StreamDesc{ServerStreams: true},
clientConn,
method,
Expand All @@ -317,16 +277,11 @@ func TestStreamClientInterceptor(t *testing.T) {
opts ...grpc.CallOption) (grpc.ClientStream, error) {
mockClStr = mockClientStream{Desc: desc, Ctx: ctx}
return mockClStr, nil
})

if err != nil {
t.Fatalf("failed to initialize grpc stream client: %v", err)
}

// no span exported while stream is open
if _, ok := exp.spanMap[name]; ok {
t.Fatalf("span shouldn't end while stream is open")
}
},
)
require.NoError(t, err, "initialize grpc stream client")
_, ok := sr.Get(name)
require.False(t, ok, "span should ended while stream is open")

req := &mockProtoMessage{}
reply := &mockProtoMessage{}
Expand All @@ -343,53 +298,36 @@ func TestStreamClientInterceptor(t *testing.T) {
_ = streamClient.RecvMsg(reply)

// added retry because span end is called in separate go routine
var spanData *export.SpanData
var span *testtrace.Span
for retry := 0; retry < 5; retry++ {
ok := false
exp.mu.Lock()
spanData, ok = exp.spanMap[name]
exp.mu.Unlock()
span, ok = sr.Get(name)
if ok {
break
}
time.Sleep(time.Second * 1)
}
if spanData == nil {
t.Fatalf("no span data found for name < %s >", name)
require.True(t, ok, "missing span %s", name)

expectedAttr := map[kv.Key]kv.Value{
standard.RPCSystemKey: kv.StringValue("grpc"),
standard.RPCServiceKey: kv.StringValue("github.com.serviceName"),
standard.RPCMethodKey: kv.StringValue("bar"),
standard.NetPeerIPKey: kv.StringValue("fake"),
standard.NetPeerPortKey: kv.StringValue("connection"),
}
assert.Equal(t, expectedAttr, span.Attributes())

attrs := spanData.Attributes
expectedAttr := map[kv.Key]string{
standard.RPCSystemKey: "grpc",
standard.RPCServiceKey: "github.com.serviceName",
standard.RPCMethodKey: "bar",
standard.NetPeerIPKey: "fake",
standard.NetPeerPortKey: "connection",
}

for _, attr := range attrs {
expected, ok := expectedAttr[attr.Key]
if ok {
if expected != attr.Value.AsString() {
t.Errorf("name: %s invalid %s found. expected %s, actual %s", name, string(attr.Key),
expected, attr.Value.AsString())
}
}
}

events := spanData.MessageEvents
if len(events) != 20 {
t.Fatalf("incorrect number of events expected 20 got %d", len(events))
}
events := span.Events()
require.Len(t, events, 20)
for i := 0; i < 20; i += 2 {
msgID := i/2 + 1
validate := func(eventName string, attrs []kv.KeyValue) {
for _, attr := range attrs {
if attr.Key == standard.RPCMessageTypeKey && attr.Value.AsString() != eventName {
t.Errorf("invalid event on index: %d expecting %s event, receive %s event", i, eventName, attr.Value.AsString())
validate := func(eventName string, attrs map[kv.Key]kv.Value) {
for k, v := range attrs {
if k == standard.RPCMessageTypeKey && v.AsString() != eventName {
t.Errorf("invalid event on index: %d expecting %s event, receive %s event", i, eventName, v.AsString())
}
if attr.Key == standard.RPCMessageIDKey && attr.Value != kv.IntValue(msgID) {
t.Errorf("invalid id for message event expected %d received %d", msgID, attr.Value.AsInt32())
if k == standard.RPCMessageIDKey && v != kv.IntValue(msgID) {
t.Errorf("invalid id for message event expected %d received %d", msgID, v.AsInt32())
}
}
}
Expand All @@ -402,37 +340,30 @@ func TestStreamClientInterceptor(t *testing.T) {
}

func TestServerInterceptorError(t *testing.T) {
exp := &testExporter{spanMap: make(map[string]*export.SpanData)}
tp, err := sdktrace.NewProvider(
sdktrace.WithSyncer(exp),
sdktrace.WithConfig(sdktrace.Config{
DefaultSampler: sdktrace.AlwaysSample(),
}),
)
require.NoError(t, err)

sr := NewSpanRecorder()
tp := testtrace.NewProvider(testtrace.WithSpanRecorder(sr))
tracer := tp.Tracer("grpctrace/Server")
usi := UnaryServerInterceptor(tracer)
deniedErr := status.Error(codes.PermissionDenied, "PERMISSION_DENIED_TEXT")
handler := func(_ context.Context, _ interface{}) (interface{}, error) {
return nil, deniedErr
}
_, err = usi(context.Background(), &mockProtoMessage{}, &grpc.UnaryServerInfo{}, handler)
_, err := usi(context.Background(), &mockProtoMessage{}, &grpc.UnaryServerInfo{}, handler)
require.Error(t, err)
assert.Equal(t, err, deniedErr)

span, ok := exp.spanMap[""]
span, ok := sr.Get("")
if !ok {
t.Fatalf("failed to export error span")
}
assert.Equal(t, span.StatusCode, codes.PermissionDenied)
assert.Contains(t, deniedErr.Error(), span.StatusMessage)
assert.Len(t, span.MessageEvents, 2)
assert.Equal(t, []kv.KeyValue{
kv.String("message.type", "SENT"),
kv.Int("message.id", 1),
kv.Int("message.uncompressed_size", 26),
}, span.MessageEvents[1].Attributes)
assert.Equal(t, span.StatusCode(), codes.PermissionDenied)
assert.Contains(t, deniedErr.Error(), span.StatusMessage())
assert.Len(t, span.Events(), 2)
assert.Equal(t, map[kv.Key]kv.Value{
kv.Key("message.type"): kv.StringValue("SENT"),
kv.Key("message.id"): kv.IntValue(1),
kv.Key("message.uncompressed_size"): kv.IntValue(26),
}, span.Events()[1].Attributes)
}

func TestParseFullMethod(t *testing.T) {
Expand Down
Loading