From 969a8d4e9aceb9808f06d61df318956d28855ccc Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 23 Jun 2020 13:30:49 -0400 Subject: [PATCH] Unify zipkin v1 and v2 annotation/tag parsing logic (#1002) Fixes https://github.com/open-telemetry/opentelemetry-collector/issues/975 Please look at the individual commits, most of this is just moving code around. Moves zipkin v2 trace conversion code into translator/trace/zipkin, previously it was in the receiver Use the same tag parsing logic for both zipkin v1 and v2 --- receiver/zipkinreceiver/trace_receiver.go | 295 +----------------- .../zipkinreceiver/trace_receiver_test.go | 241 ++++++++++---- translator/trace/protospan_translation.go | 15 +- translator/trace/zipkin/status_code.go | 70 ++++- translator/trace/zipkin/status_code_test.go | 182 +++++++++++ .../testdata/zipkin_v1_multiple_batches.json | 9 + .../testdata/zipkin_v1_single_batch.json | 9 + .../zipkin_v1_thrift_single_batch.json | 9 + .../zipkin/testdata/zipkin_v2_single.json | 38 +++ .../trace/zipkin/zipkinv1_to_protospan.go | 42 ++- .../zipkin/zipkinv1_to_protospan_test.go | 45 ++- .../trace/zipkin/zipkinv2_to_protospan.go | 266 ++++++++++++++++ .../zipkin/zipkinv2_to_protospan_test.go | 208 ++++++++++++ 13 files changed, 1043 insertions(+), 386 deletions(-) create mode 100644 translator/trace/zipkin/testdata/zipkin_v2_single.json create mode 100644 translator/trace/zipkin/zipkinv2_to_protospan.go create mode 100644 translator/trace/zipkin/zipkinv2_to_protospan_test.go diff --git a/receiver/zipkinreceiver/trace_receiver.go b/receiver/zipkinreceiver/trace_receiver.go index a8ca378c1cf..0165b73c79a 100644 --- a/receiver/zipkinreceiver/trace_receiver.go +++ b/receiver/zipkinreceiver/trace_receiver.go @@ -24,13 +24,10 @@ import ( "io/ioutil" "net" "net/http" - "strconv" "strings" "sync" "github.com/apache/thrift/lib/go/thrift" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" zipkinmodel "github.com/openzipkin/zipkin-go/model" zipkinproto "github.com/openzipkin/zipkin-go/proto/v2" @@ -41,9 +38,7 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerdata" - "go.opentelemetry.io/collector/internal" "go.opentelemetry.io/collector/obsreport" - tracetranslator "go.opentelemetry.io/collector/translator/trace" "go.opentelemetry.io/collector/translator/trace/zipkin" ) @@ -201,39 +196,7 @@ func (zr *ZipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs []c return nil, err } - // *commonpb.Node instances have unique addresses hence - // for grouping within a map, we'll use the .String() value - byNodeGrouping := make(map[string][]*tracepb.Span) - uniqueNodes := make([]*commonpb.Node, 0, len(zipkinSpans)) - // Now translate them into tracepb.Span - for _, zspan := range zipkinSpans { - if zspan == nil { - continue - } - span, node := zipkinSpanToTraceSpan(zspan) - key := node.String() - if _, alreadyAdded := byNodeGrouping[key]; !alreadyAdded { - uniqueNodes = append(uniqueNodes, node) - } - byNodeGrouping[key] = append(byNodeGrouping[key], span) - } - - for _, node := range uniqueNodes { - key := node.String() - spans := byNodeGrouping[key] - if len(spans) == 0 { - // Should never happen but nonetheless be cautious - // not to send blank spans. - continue - } - reqs = append(reqs, consumerdata.TraceData{ - Node: node, - Spans: spans, - }) - delete(byNodeGrouping, key) - } - - return reqs, nil + return zipkin.V2BatchToOCProto(zipkinSpans) } func (zr *ZipkinReceiver) deserializeFromJSON(jsonBlob []byte, debugWasSet bool) (zs []*zipkinmodel.SpanModel, err error) { @@ -369,262 +332,6 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusAccepted) } -func zipkinSpanToTraceSpan(zs *zipkinmodel.SpanModel) (*tracepb.Span, *commonpb.Node) { - traceID := tracetranslator.UInt64ToByteTraceID(zs.TraceID.High, zs.TraceID.Low) - var parentSpanID []byte - if zs.ParentID != nil { - parentSpanID = tracetranslator.UInt64ToByteSpanID(uint64(*zs.ParentID)) - } - - pbs := &tracepb.Span{ - TraceId: traceID, - SpanId: tracetranslator.UInt64ToByteSpanID(uint64(zs.ID)), - ParentSpanId: parentSpanID, - Name: &tracepb.TruncatableString{Value: zs.Name}, - StartTime: internal.TimeToTimestamp(zs.Timestamp), - EndTime: internal.TimeToTimestamp(zs.Timestamp.Add(zs.Duration)), - Kind: zipkinSpanKindToProtoSpanKind(zs.Kind), - Status: extractProtoStatus(zs), - Attributes: zipkinTagsToTraceAttributes(zs.Tags, zs.Kind), - TimeEvents: zipkinAnnotationsToProtoTimeEvents(zs.Annotations), - } - - node := nodeFromZipkinEndpoints(zs, pbs) - zipkin.SetTimestampsIfUnset(pbs) - - return pbs, node -} - -func nodeFromZipkinEndpoints(zs *zipkinmodel.SpanModel, pbs *tracepb.Span) *commonpb.Node { - if zs.LocalEndpoint == nil && zs.RemoteEndpoint == nil { - return nil - } - - node := new(commonpb.Node) - var endpointMap map[string]string - - // Retrieve and make use of the local endpoint - if lep := zs.LocalEndpoint; lep != nil { - node.ServiceInfo = &commonpb.ServiceInfo{ - Name: lep.ServiceName, - } - endpointMap = zipkinEndpointIntoAttributes(lep, endpointMap, isLocalEndpoint) - } - - // Retrieve and make use of the remote endpoint - if rep := zs.RemoteEndpoint; rep != nil { - endpointMap = zipkinEndpointIntoAttributes(rep, endpointMap, isRemoteEndpoint) - } - - if endpointMap != nil { - if pbs.Attributes == nil { - pbs.Attributes = &tracepb.Span_Attributes{} - } - if pbs.Attributes.AttributeMap == nil { - pbs.Attributes.AttributeMap = make( - map[string]*tracepb.AttributeValue, len(endpointMap)) - } - - // Delete the redundant serviceName key since it is already on the node. - delete(endpointMap, zipkin.LocalEndpointServiceName) - attrbMap := pbs.Attributes.AttributeMap - for key, value := range endpointMap { - attrbMap[key] = &tracepb.AttributeValue{ - Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: value}, - }, - } - } - } - - return node -} - -type zipkinDirection bool - -const ( - isLocalEndpoint zipkinDirection = true - isRemoteEndpoint zipkinDirection = false -) - -var blankIP net.IP - -// zipkinEndpointIntoAttributes extracts information from s zipkin endpoint struct -// and puts it into a map with pre-defined keys. -func zipkinEndpointIntoAttributes( - ep *zipkinmodel.Endpoint, - into map[string]string, - endpointType zipkinDirection, -) map[string]string { - - if into == nil { - into = make(map[string]string) - } - - var ipv4Key, ipv6Key, portKey, serviceNameKey string - if endpointType == isLocalEndpoint { - ipv4Key, ipv6Key = zipkin.LocalEndpointIPv4, zipkin.LocalEndpointIPv6 - portKey, serviceNameKey = zipkin.LocalEndpointPort, zipkin.LocalEndpointServiceName - } else { - ipv4Key, ipv6Key = zipkin.RemoteEndpointIPv4, zipkin.RemoteEndpointIPv6 - portKey, serviceNameKey = zipkin.RemoteEndpointPort, zipkin.RemoteEndpointServiceName - } - if ep.IPv4 != nil && !ep.IPv4.Equal(blankIP) { - into[ipv4Key] = ep.IPv4.String() - } - if ep.IPv6 != nil && !ep.IPv6.Equal(blankIP) { - into[ipv6Key] = ep.IPv6.String() - } - if ep.Port > 0 { - into[portKey] = strconv.Itoa(int(ep.Port)) - } - if serviceName := ep.ServiceName; serviceName != "" { - into[serviceNameKey] = serviceName - } - return into -} - -const statusCodeUnknown = 2 - -func extractProtoStatus(zs *zipkinmodel.SpanModel) *tracepb.Status { - // The status is stored with the "error" key - // See https://github.com/census-instrumentation/opencensus-go/blob/1eb9a13c7dd02141e065a665f6bf5c99a090a16a/exporter/zipkin/zipkin.go#L160-L165 - if zs == nil || len(zs.Tags) == 0 { - return nil - } - canonicalCodeStr := zs.Tags["error"] - message := zs.Tags["opencensus.status_description"] - if message == "" && canonicalCodeStr == "" { - return nil - } - code, set := canonicalCodesMap[canonicalCodeStr] - if !set { - // If not status code was set, then we should use UNKNOWN - code = statusCodeUnknown - } - return &tracepb.Status{ - Message: message, - Code: code, - } -} - -var canonicalCodesMap = map[string]int32{ - // https://github.com/googleapis/googleapis/blob/bee79fbe03254a35db125dc6d2f1e9b752b390fe/google/rpc/code.proto#L33-L186 - "OK": 0, - "CANCELLED": 1, - "UNKNOWN": 2, - "INVALID_ARGUMENT": 3, - "DEADLINE_EXCEEDED": 4, - "NOT_FOUND": 5, - "ALREADY_EXISTS": 6, - "PERMISSION_DENIED": 7, - "RESOURCE_EXHAUSTED": 8, - "FAILED_PRECONDITION": 9, - "ABORTED": 10, - "OUT_OF_RANGE": 11, - "UNIMPLEMENTED": 12, - "INTERNAL": 13, - "UNAVAILABLE": 14, - "DATA_LOSS": 15, - "UNAUTHENTICATED": 16, -} - -func zipkinSpanKindToProtoSpanKind(skind zipkinmodel.Kind) tracepb.Span_SpanKind { - switch strings.ToUpper(string(skind)) { - case "CLIENT": - return tracepb.Span_CLIENT - case "SERVER": - return tracepb.Span_SERVER - default: - return tracepb.Span_SPAN_KIND_UNSPECIFIED - } -} - -func zipkinAnnotationsToProtoTimeEvents(zas []zipkinmodel.Annotation) *tracepb.Span_TimeEvents { - if len(zas) == 0 { - return nil - } - tevs := make([]*tracepb.Span_TimeEvent, 0, len(zas)) - for _, za := range zas { - if tev := zipkinAnnotationToProtoAnnotation(za); tev != nil { - tevs = append(tevs, tev) - } - } - if len(tevs) == 0 { - return nil - } - return &tracepb.Span_TimeEvents{ - TimeEvent: tevs, - } -} - -var blankAnnotation zipkinmodel.Annotation - -func zipkinAnnotationToProtoAnnotation(zas zipkinmodel.Annotation) *tracepb.Span_TimeEvent { - if zas == blankAnnotation { - return nil - } - return &tracepb.Span_TimeEvent{ - Time: internal.TimeToTimestamp(zas.Timestamp), - Value: &tracepb.Span_TimeEvent_Annotation_{ - Annotation: &tracepb.Span_TimeEvent_Annotation{ - Description: &tracepb.TruncatableString{Value: zas.Value}, - }, - }, - } -} - -func zipkinTagsToTraceAttributes(tags map[string]string, skind zipkinmodel.Kind) *tracepb.Span_Attributes { - // Produce and Consumer span kinds are not representable in OpenCensus format. - // We will represent them using TagSpanKind attribute, according to OpenTracing - // conventions. Check if it is one of those span kinds. - var spanKindTagVal tracetranslator.OpenTracingSpanKind - switch skind { - case zipkinmodel.Producer: - spanKindTagVal = tracetranslator.OpenTracingSpanKindProducer - case zipkinmodel.Consumer: - spanKindTagVal = tracetranslator.OpenTracingSpanKindConsumer - } - - if len(tags) == 0 && spanKindTagVal == "" { - // No input tags and no need to add a span kind tag. Keep attributes map empty. - return nil - } - - amap := make(map[string]*tracepb.AttributeValue, len(tags)) - for key, value := range tags { - // We did a translation from "boolean" to "string" - // in OpenCensus-Go's Zipkin exporter as per - // https://github.com/census-instrumentation/opencensus-go/blob/1eb9a13c7dd02141e065a665f6bf5c99a090a16a/exporter/zipkin/zipkin.go#L138-L155 - switch value { - case "true", "false": - amap[key] = &tracepb.AttributeValue{ - Value: &tracepb.AttributeValue_BoolValue{BoolValue: value == "true"}, - } - default: - amap[key] = &tracepb.AttributeValue{ - Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: value}, - }, - } - } - - } - - if spanKindTagVal != "" { - // Set the previously translated span kind attribute (see top of this function). - // We do this after the "tags" map is translated so that we will overwrite - // the attribute if it exists. - amap[tracetranslator.TagSpanKind] = &tracepb.AttributeValue{ - Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{Value: string(spanKindTagVal)}, - }, - } - } - - return &tracepb.Span_Attributes{AttributeMap: amap} -} - func transportType(r *http.Request) string { v1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans") if v1 { diff --git a/receiver/zipkinreceiver/trace_receiver_test.go b/receiver/zipkinreceiver/trace_receiver_test.go index 775e37324f3..e74c66cf173 100644 --- a/receiver/zipkinreceiver/trace_receiver_test.go +++ b/receiver/zipkinreceiver/trace_receiver_test.go @@ -16,7 +16,11 @@ package zipkinreceiver import ( "bytes" + "compress/gzip" + "compress/zlib" "context" + "errors" + "fmt" "io" "io/ioutil" "net" @@ -28,7 +32,8 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - zipkinmodel "github.com/openzipkin/zipkin-go/model" + zipkin2 "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -43,31 +48,11 @@ import ( "go.opentelemetry.io/collector/exporter/zipkinexporter" "go.opentelemetry.io/collector/internal" "go.opentelemetry.io/collector/testutils" - tracetranslator "go.opentelemetry.io/collector/translator/trace" "go.opentelemetry.io/collector/translator/trace/zipkin" ) const zipkinReceiver = "zipkin_receiver_test" -func TestShortIDSpanConversion(t *testing.T) { - shortID, _ := zipkinmodel.TraceIDFromHex("0102030405060708") - assert.Equal(t, uint64(0), shortID.High, "wanted 64bit traceID, so TraceID.High must be zero") - - zc := zipkinmodel.SpanContext{ - TraceID: shortID, - ID: zipkinmodel.ID(shortID.Low), - } - zs := zipkinmodel.SpanModel{ - SpanContext: zc, - } - - ocSpan, _ := zipkinSpanToTraceSpan(&zs) - require.Len(t, ocSpan.TraceId, 16, "incorrect OC proto trace id length") - - want := []byte{0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8} - assert.Equal(t, want, ocSpan.TraceId) -} - func TestNew(t *testing.T) { type args struct { address string @@ -392,52 +377,202 @@ func TestStartTraceReception(t *testing.T) { } } -func TestSpanKindTranslation(t *testing.T) { +func TestReceiverContentTypes(t *testing.T) { tests := []struct { - zipkinKind zipkinmodel.Kind - ocKind tracepb.Span_SpanKind - otKind tracetranslator.OpenTracingSpanKind + endpoint string + content string + encoding string + bodyFn func() ([]byte, error) }{ { - zipkinKind: zipkinmodel.Client, - ocKind: tracepb.Span_CLIENT, - otKind: "", + endpoint: "/api/v1/spans", + content: "application/json", + encoding: "gzip", + bodyFn: func() ([]byte, error) { + return ioutil.ReadFile("../../translator/trace/zipkin/testdata/zipkin_v1_single_batch.json") + }, }, + { - zipkinKind: zipkinmodel.Server, - ocKind: tracepb.Span_SERVER, - otKind: "", + endpoint: "/api/v1/spans", + content: "application/x-thrift", + encoding: "gzip", + bodyFn: func() ([]byte, error) { + return thriftExample(), nil + }, + }, + + { + endpoint: "/api/v2/spans", + content: "application/json", + encoding: "gzip", + bodyFn: func() ([]byte, error) { + return ioutil.ReadFile("../../translator/trace/zipkin/testdata/zipkin_v2_single.json") + }, }, + { - zipkinKind: zipkinmodel.Producer, - ocKind: tracepb.Span_SPAN_KIND_UNSPECIFIED, - otKind: tracetranslator.OpenTracingSpanKindProducer, + endpoint: "/api/v2/spans", + content: "application/json", + encoding: "zlib", + bodyFn: func() ([]byte, error) { + return ioutil.ReadFile("../../translator/trace/zipkin/testdata/zipkin_v2_single.json") + }, }, + { - zipkinKind: zipkinmodel.Consumer, - ocKind: tracepb.Span_SPAN_KIND_UNSPECIFIED, - otKind: tracetranslator.OpenTracingSpanKindConsumer, + endpoint: "/api/v2/spans", + content: "application/json", + encoding: "", + bodyFn: func() ([]byte, error) { + return ioutil.ReadFile("../../translator/trace/zipkin/testdata/zipkin_v2_single.json") + }, }, } - for _, tt := range tests { - t.Run(string(tt.zipkinKind), func(t *testing.T) { - zs := &zipkinmodel.SpanModel{ - SpanContext: zipkinmodel.SpanContext{ - TraceID: zipkinmodel.TraceID{Low: 123}, - ID: 456, - }, - Kind: tt.zipkinKind, - Timestamp: time.Now(), + for _, test := range tests { + name := fmt.Sprintf("%v %v %v", test.endpoint, test.content, test.encoding) + t.Run(name, func(t *testing.T) { + body, err := test.bodyFn() + require.NoError(t, err, "Failed to generate test body: %v", err) + + var requestBody *bytes.Buffer + switch test.encoding { + case "": + requestBody = bytes.NewBuffer(body) + case "zlib": + requestBody, err = compressZlib(body) + case "gzip": + requestBody, err = compressGzip(body) } - ocSpan, _ := zipkinSpanToTraceSpan(zs) - assert.EqualValues(t, tt.ocKind, ocSpan.Kind) - if tt.otKind != "" { - otSpanKind := ocSpan.Attributes.AttributeMap[tracetranslator.TagSpanKind] - assert.EqualValues(t, tt.otKind, otSpanKind.GetStringValue().Value) - } else { - assert.True(t, ocSpan.Attributes == nil) + require.NoError(t, err) + + r := httptest.NewRequest("POST", test.endpoint, requestBody) + r.Header.Add("content-type", test.content) + r.Header.Add("content-encoding", test.encoding) + + next := &zipkinMockTraceConsumer{ + ch: make(chan consumerdata.TraceData, 10), + } + zr, err := New(zipkinReceiver, "", next) + require.NoError(t, err) + + req := httptest.NewRecorder() + zr.ServeHTTP(req, r) + + select { + case td := <-next.ch: + require.NotNil(t, td) + require.Equal(t, 202, req.Code) + break + case <-time.After(time.Second * 2): + t.Error("next consumer did not receive the batch") + break } }) } } + +func TestReceiverInvalidContentType(t *testing.T) { + body := `{ invalid json ` + + r := httptest.NewRequest("POST", "/api/v2/spans", + bytes.NewBuffer([]byte(body))) + r.Header.Add("content-type", "application/json") + + next := &zipkinMockTraceConsumer{ + ch: make(chan consumerdata.TraceData, 10), + } + zr, err := New(zipkinReceiver, "", next) + require.NoError(t, err) + + req := httptest.NewRecorder() + zr.ServeHTTP(req, r) + + require.Equal(t, 400, req.Code) + require.Equal(t, "invalid character 'i' looking for beginning of object key string\n", req.Body.String()) +} + +func TestReceiverConsumerError(t *testing.T) { + body, err := ioutil.ReadFile("../../translator/trace/zipkin/testdata/zipkin_v2_single.json") + require.NoError(t, err) + + r := httptest.NewRequest("POST", "/api/v2/spans", + bytes.NewBuffer([]byte(body))) + r.Header.Add("content-type", "application/json") + + next := &zipkinMockTraceConsumer{ + ch: make(chan consumerdata.TraceData, 10), + err: errors.New("consumer error"), + } + zr, err := New(zipkinReceiver, "", next) + require.NoError(t, err) + + req := httptest.NewRecorder() + zr.ServeHTTP(req, r) + + require.Equal(t, 500, req.Code) + require.Equal(t, "\"Internal Server Error\"", req.Body.String()) +} + +func thriftExample() []byte { + now := time.Now().Unix() + zSpans := []*zipkincore.Span{ + { + TraceID: 1, + Name: "test", + ID: 2, + BinaryAnnotations: []*zipkincore.BinaryAnnotation{ + { + Key: "http.path", + Value: []byte("/"), + }, + }, + Timestamp: &now, + }, + } + + return zipkin2.SerializeThrift(zSpans) +} + +func compressGzip(body []byte) (*bytes.Buffer, error) { + var buf bytes.Buffer + zw := gzip.NewWriter(&buf) + + _, err := zw.Write(body) + if err != nil { + return nil, err + } + + if err := zw.Close(); err != nil { + return nil, err + } + + return &buf, nil +} + +func compressZlib(body []byte) (*bytes.Buffer, error) { + var buf bytes.Buffer + zw := zlib.NewWriter(&buf) + + _, err := zw.Write(body) + if err != nil { + return nil, err + } + + if err := zw.Close(); err != nil { + return nil, err + } + + return &buf, nil +} + +type zipkinMockTraceConsumer struct { + ch chan consumerdata.TraceData + err error +} + +func (m *zipkinMockTraceConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { + m.ch <- td + return m.err +} diff --git a/translator/trace/protospan_translation.go b/translator/trace/protospan_translation.go index 2501ecc0844..07ea690ff5f 100644 --- a/translator/trace/protospan_translation.go +++ b/translator/trace/protospan_translation.go @@ -25,13 +25,14 @@ const ( TagSpanKind = "span.kind" - TagStatusCode = "status.code" - TagStatusMsg = "status.message" - TagError = "error" - TagHTTPStatusCode = "http.status_code" - TagHTTPStatusMsg = "http.status_message" - TagZipkinCensusCode = "census.status_code" - TagZipkinCensusMsg = "census.status_description" + TagStatusCode = "status.code" + TagStatusMsg = "status.message" + TagError = "error" + TagHTTPStatusCode = "http.status_code" + TagHTTPStatusMsg = "http.status_message" + TagZipkinCensusCode = "census.status_code" + TagZipkinCensusMsg = "census.status_description" + TagZipkinOpenCensusMsg = "opencensus.status_description" ) // OpenTracingSpanKind are possible values for TagSpanKind and match the OpenTracing diff --git a/translator/trace/zipkin/status_code.go b/translator/trace/zipkin/status_code.go index c7f8f72cf2b..e48e7fec314 100644 --- a/translator/trace/zipkin/status_code.go +++ b/translator/trace/zipkin/status_code.go @@ -37,6 +37,10 @@ type statusMapper struct { fromCensus status // oc status code extracted from "http.status_code" tags fromHTTP status + // oc status code extracted from "error" tags + fromErrorTag status + // oc status code 'unknown' when the "error" tag exists but is invalid + fromErrorTagUnknown status } // ocStatus returns an OC status from the best possible extraction source. @@ -51,11 +55,20 @@ func (m *statusMapper) ocStatus() *tracepb.Status { s = m.fromCensus case m.fromStatus.codePtr != nil: s = m.fromStatus - default: + case m.fromErrorTag.codePtr != nil: + s = m.fromErrorTag + if m.fromCensus.message != "" { + s.message = m.fromCensus.message + } else if m.fromStatus.message != "" { + s.message = m.fromStatus.message + } + case m.fromHTTP.codePtr != nil: s = m.fromHTTP + default: + s = m.fromErrorTagUnknown } - if s.codePtr != nil || s.message != "" { + if s.codePtr != nil { code := int32(0) if s.codePtr != nil { code = *s.codePtr @@ -77,7 +90,7 @@ func (m *statusMapper) fromAttribute(key string, attrib *tracepb.AttributeValue) } return true - case tracetranslator.TagZipkinCensusMsg: + case tracetranslator.TagZipkinCensusMsg, tracetranslator.TagZipkinOpenCensusMsg: m.fromCensus.message = attrib.GetStringValue().GetValue() return true @@ -101,6 +114,14 @@ func (m *statusMapper) fromAttribute(key string, attrib *tracepb.AttributeValue) case tracetranslator.TagHTTPStatusMsg: m.fromHTTP.message = attrib.GetStringValue().GetValue() + + case tracetranslator.TagError: + code, ok := extractStatusFromError(attrib) + if ok { + m.fromErrorTag.codePtr = code + return true + } + m.fromErrorTagUnknown.codePtr = code } return false } @@ -131,3 +152,46 @@ func toInt32(i int) (int32, error) { } return 0, fmt.Errorf("outside of the int32 range") } + +func extractStatusFromError(attrib *tracepb.AttributeValue) (*int32, bool) { + // The status is stored with the "error" key + // See https://github.com/census-instrumentation/opencensus-go/blob/1eb9a13c7dd02141e065a665f6bf5c99a090a16a/exporter/zipkin/zipkin.go#L160-L165 + var unknown int32 = 2 + + switch val := attrib.Value.(type) { + case *tracepb.AttributeValue_StringValue: + canonicalCodeStr := val.StringValue.GetValue() + if canonicalCodeStr == "" { + return nil, true + } + code, set := canonicalCodesMap[canonicalCodeStr] + if set { + return &code, true + } + default: + break + } + + return &unknown, false +} + +var canonicalCodesMap = map[string]int32{ + // https://github.com/googleapis/googleapis/blob/bee79fbe03254a35db125dc6d2f1e9b752b390fe/google/rpc/code.proto#L33-L186 + "OK": 0, + "CANCELLED": 1, + "UNKNOWN": 2, + "INVALID_ARGUMENT": 3, + "DEADLINE_EXCEEDED": 4, + "NOT_FOUND": 5, + "ALREADY_EXISTS": 6, + "PERMISSION_DENIED": 7, + "RESOURCE_EXHAUSTED": 8, + "FAILED_PRECONDITION": 9, + "ABORTED": 10, + "OUT_OF_RANGE": 11, + "UNIMPLEMENTED": 12, + "INTERNAL": 13, + "UNAVAILABLE": 14, + "DATA_LOSS": 15, + "UNAUTHENTICATED": 16, +} diff --git a/translator/trace/zipkin/status_code_test.go b/translator/trace/zipkin/status_code_test.go index d266d2395fd..ebc0bc53cd4 100644 --- a/translator/trace/zipkin/status_code_test.go +++ b/translator/trace/zipkin/status_code_test.go @@ -93,3 +93,185 @@ func TestAttribToStatusCode(t *testing.T) { }) } } + +func TestStatusCodeMapperCases(t *testing.T) { + tests := []struct { + name string + expected *tracepb.Status + attributes map[string]string + }{ + { + name: "no relevant attributes", + expected: nil, + attributes: map[string]string{ + "not.relevant": "data", + }, + }, + + { + name: "http: 500", + expected: &tracepb.Status{Code: 13}, + attributes: map[string]string{ + "http.status_code": "500", + }, + }, + + { + name: "http: message only, nil", + expected: nil, + attributes: map[string]string{ + "http.status_message": "something", + }, + }, + + { + name: "http: 500", + expected: &tracepb.Status{Code: 13, Message: "a message"}, + attributes: map[string]string{ + "http.status_code": "500", + "http.status_message": "a message", + }, + }, + + { + name: "http: 500, with error attribute", + expected: &tracepb.Status{Code: 13}, + attributes: map[string]string{ + "http.status_code": "500", + "error": "an error occurred", + }, + }, + + { + name: "oc: internal", + expected: &tracepb.Status{Code: 13, Message: "a description"}, + attributes: map[string]string{ + "census.status_code": "13", + "census.status_description": "a description", + }, + }, + + { + name: "oc: description and error", + expected: &tracepb.Status{Code: 13, Message: "a description"}, + attributes: map[string]string{ + "opencensus.status_description": "a description", + "error": "INTERNAL", + }, + }, + + { + name: "oc: error only", + expected: &tracepb.Status{Code: 13, Message: ""}, + attributes: map[string]string{ + "error": "INTERNAL", + }, + }, + + { + name: "oc: empty error tag", + expected: nil, + attributes: map[string]string{ + "error": "", + }, + }, + + { + name: "oc: description only, no status", + expected: nil, + attributes: map[string]string{ + "opencensus.status_description": "a description", + }, + }, + + { + name: "oc: priority over http", + expected: &tracepb.Status{Code: 4, Message: "deadline expired"}, + attributes: map[string]string{ + "census.status_description": "deadline expired", + "census.status_code": "4", + + "http.status_message": "a description", + "http.status_code": "500", + }, + }, + + { + name: "error: valid oc status priority over http", + expected: &tracepb.Status{Code: 4}, + attributes: map[string]string{ + "error": "DEADLINE_EXCEEDED", + + "http.status_message": "a description", + "http.status_code": "500", + }, + }, + + { + name: "error: invalid oc status uses http", + expected: &tracepb.Status{Code: 13, Message: "a description"}, + attributes: map[string]string{ + "error": "123", + + "http.status_message": "a description", + "http.status_code": "500", + }, + }, + + { + name: "error only: string description", + expected: &tracepb.Status{Code: 2}, + attributes: map[string]string{ + "error": "a description", + }, + }, + + { + name: "error only: true", + expected: &tracepb.Status{Code: 2}, + attributes: map[string]string{ + "error": "true", + }, + }, + + { + name: "error only: false", + expected: &tracepb.Status{Code: 2}, + attributes: map[string]string{ + "error": "false", + }, + }, + + { + name: "error only: 1", + expected: &tracepb.Status{Code: 2}, + attributes: map[string]string{ + "error": "1", + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + attributes := attributesFromMap(test.attributes) + + sMapper := &statusMapper{} + for k, v := range attributes { + sMapper.fromAttribute(k, v) + } + + got := sMapper.ocStatus() + assert.EqualValues(t, test.expected, got) + }) + } +} + +func attributesFromMap(mapValues map[string]string) map[string]*tracepb.AttributeValue { + res := map[string]*tracepb.AttributeValue{} + + for k, v := range mapValues { + pbAttrib := parseAnnotationValue(v) + res[k] = pbAttrib + } + return res +} diff --git a/translator/trace/zipkin/testdata/zipkin_v1_multiple_batches.json b/translator/trace/zipkin/testdata/zipkin_v1_multiple_batches.json index 82cce9fe7af..2e4270b66d8 100644 --- a/translator/trace/zipkin/testdata/zipkin_v1_multiple_batches.json +++ b/translator/trace/zipkin/testdata/zipkin_v1_multiple_batches.json @@ -14,6 +14,15 @@ "serviceName": "service1" } }, + { + "timestamp": 1544805927450000, + "value": "custom time event", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + }, { "timestamp": 1544805927460102, "value": "ss", diff --git a/translator/trace/zipkin/testdata/zipkin_v1_single_batch.json b/translator/trace/zipkin/testdata/zipkin_v1_single_batch.json index a488c3d6441..4042529272a 100644 --- a/translator/trace/zipkin/testdata/zipkin_v1_single_batch.json +++ b/translator/trace/zipkin/testdata/zipkin_v1_single_batch.json @@ -14,6 +14,15 @@ } }, { + "timestamp": 1544805927450000, + "value": "custom time event", + "endpoint": { + "ipv4": "172.31.0.4", + "port": 0, + "serviceName": "service1" + } + }, + { "timestamp": 1544805927460102, "value": "ss", "endpoint": { diff --git a/translator/trace/zipkin/testdata/zipkin_v1_thrift_single_batch.json b/translator/trace/zipkin/testdata/zipkin_v1_thrift_single_batch.json index 413c1a47c92..4faf08d6a83 100644 --- a/translator/trace/zipkin/testdata/zipkin_v1_thrift_single_batch.json +++ b/translator/trace/zipkin/testdata/zipkin_v1_thrift_single_batch.json @@ -14,6 +14,15 @@ } }, { + "timestamp": 1544805927450000, + "value": "custom time event", + "host": { + "ipv4": -1407254524, + "port": 0, + "service_name": "service1" + } + }, + { "timestamp": 1544805927460102, "value": "ss", "host": { diff --git a/translator/trace/zipkin/testdata/zipkin_v2_single.json b/translator/trace/zipkin/testdata/zipkin_v2_single.json new file mode 100644 index 00000000000..875eda1a0c5 --- /dev/null +++ b/translator/trace/zipkin/testdata/zipkin_v2_single.json @@ -0,0 +1,38 @@ +[ + { + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "backend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "http.status_code": "500", + "cache_hit": "true", + "ping_count": "25", + "timeout": "12.3", + "clnt/finagle.version": "6.45.0" + } + } +] diff --git a/translator/trace/zipkin/zipkinv1_to_protospan.go b/translator/trace/zipkin/zipkinv1_to_protospan.go index ddb43d0b8c5..b65e576984d 100644 --- a/translator/trace/zipkin/zipkinv1_to_protospan.go +++ b/translator/trace/zipkin/zipkinv1_to_protospan.go @@ -177,7 +177,7 @@ func zipkinV1ToOCSpan(zSpan *zipkinV1Span) (*tracepb.Span, *annotationParseResul } setSpanKind(ocSpan, parsedAnnotations.Kind, parsedAnnotations.ExtendedKind) - SetTimestampsIfUnset(ocSpan) + setTimestampsIfUnset(ocSpan) return ocSpan, parsedAnnotations, nil } @@ -215,15 +215,8 @@ func zipkinV1BinAnnotationsToOCAttributes(binAnnotations []*binaryAnnotation) (a if binAnnotation.Endpoint != nil && binAnnotation.Endpoint.ServiceName != "" { fallbackServiceName = binAnnotation.Endpoint.ServiceName } - pbAttrib := &tracepb.AttributeValue{} - if iValue, err := strconv.ParseInt(binAnnotation.Value, 10, 64); err == nil { - pbAttrib.Value = &tracepb.AttributeValue_IntValue{IntValue: iValue} - } else if bValue, err := strconv.ParseBool(binAnnotation.Value); err == nil { - pbAttrib.Value = &tracepb.AttributeValue_BoolValue{BoolValue: bValue} - } else { - // For now all else go to string - pbAttrib.Value = &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: binAnnotation.Value}} - } + + pbAttrib := parseAnnotationValue(binAnnotation.Value) key := binAnnotation.Key @@ -257,6 +250,21 @@ func zipkinV1BinAnnotationsToOCAttributes(binAnnotations []*binaryAnnotation) (a return attributes, status, fallbackServiceName } +func parseAnnotationValue(value string) *tracepb.AttributeValue { + pbAttrib := &tracepb.AttributeValue{} + + if iValue, err := strconv.ParseInt(value, 10, 64); err == nil { + pbAttrib.Value = &tracepb.AttributeValue_IntValue{IntValue: iValue} + } else if bValue, err := strconv.ParseBool(value); err == nil { + pbAttrib.Value = &tracepb.AttributeValue_BoolValue{BoolValue: bValue} + } else { + // For now all else go to string + pbAttrib.Value = &tracepb.AttributeValue_StringValue{StringValue: &tracepb.TruncatableString{Value: value}} + } + + return pbAttrib +} + // annotationParseResult stores the results of examining the original annotations, // this way multiple passes on the annotations are not needed. type annotationParseResult struct { @@ -354,17 +362,7 @@ func parseZipkinV1Annotations(annotations []*annotation) *annotationParseResult // Using the more expensive annotation until/if something cheaper is needed. Value: &tracepb.Span_TimeEvent_Annotation_{ Annotation: &tracepb.Span_TimeEvent_Annotation{ - Attributes: &tracepb.Span_Attributes{ - AttributeMap: map[string]*tracepb.AttributeValue{ - currAnnotation.Value: { - Value: &tracepb.AttributeValue_StringValue{ - StringValue: &tracepb.TruncatableString{ - Value: endpointName, - }, - }, - }, - }, - }, + Description: &tracepb.TruncatableString{Value: currAnnotation.Value}, }, }, } @@ -485,7 +483,7 @@ func (ep *endpoint) createAttributeMap() map[string]string { return attributeMap } -func SetTimestampsIfUnset(span *tracepb.Span) { +func setTimestampsIfUnset(span *tracepb.Span) { // zipkin allows timestamp to be unset, but opentelemetry-collector expects it to have a value. // If this is unset, the conversion from open census to the internal trace format breaks // what should be an identity transformation oc -> internal -> oc diff --git a/translator/trace/zipkin/zipkinv1_to_protospan_test.go b/translator/trace/zipkin/zipkinv1_to_protospan_test.go index 8e877bf4bda..c5b5dbe435d 100644 --- a/translator/trace/zipkin/zipkinv1_to_protospan_test.go +++ b/translator/trace/zipkin/zipkinv1_to_protospan_test.go @@ -187,9 +187,7 @@ func TestSingleJSONV1BatchToOCProto(t *testing.T) { sortTraceByNodeName(want) sortTraceByNodeName(got) - if !reflect.DeepEqual(got, want) { - t.Fatalf("Unsuccessful conversion\nGot:\n\t%v\nWant:\n\t%v", got, want) - } + assert.EqualValues(t, got, want) } func TestMultipleJSONV1BatchesToOCProto(t *testing.T) { @@ -236,9 +234,7 @@ func TestMultipleJSONV1BatchesToOCProto(t *testing.T) { sortTraceByNodeName(want) sortTraceByNodeName(got) - if !reflect.DeepEqual(got, want) { - t.Fatalf("Unsuccessful conversion\nGot:\n\t%v\nWant:\n\t%v", got, want) - } + assert.EqualValues(t, got, want) } func sortTraceByNodeName(trace []consumerdata.TraceData) { @@ -652,7 +648,18 @@ var ocBatchesFromZipkinV1 = []consumerdata.TraceData{ Kind: tracepb.Span_SERVER, StartTime: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 448081000}, EndTime: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 460102000}, - TimeEvents: nil, + TimeEvents: &tracepb.Span_TimeEvents{ + TimeEvent: []*tracepb.Span_TimeEvent{ + { + Time: ×tamp.Timestamp{Seconds: 1544805927, Nanos: 450000000}, + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Description: &tracepb.TruncatableString{Value: "custom time event"}, + }, + }, + }, + }, + }, }, { TraceId: []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0e, 0xd2, 0xe6, 0x3c, 0xbe, 0x71, 0xf5, 0xa8}, @@ -793,3 +800,27 @@ func TestSpanKindTranslation(t *testing.T) { }) } } + +func TestZipkinV1ToOCSpanInvalidTraceId(t *testing.T) { + zSpan := &zipkinV1Span{ + TraceID: "abc", + ID: "0123456789123456", + Annotations: []*annotation{ + {Value: "cr"}, + }, + } + _, _, err := zipkinV1ToOCSpan(zSpan) + assert.EqualError(t, err, "zipkinV1 span traceId: hex traceId span has wrong length (expected 16 or 32)") +} + +func TestZipkinV1ToOCSpanInvalidSpanId(t *testing.T) { + zSpan := &zipkinV1Span{ + TraceID: "1234567890123456", + ID: "abc", + Annotations: []*annotation{ + {Value: "cr"}, + }, + } + _, _, err := zipkinV1ToOCSpan(zSpan) + assert.EqualError(t, err, "zipkinV1 span id: hex Id has wrong length (expected 16)") +} diff --git a/translator/trace/zipkin/zipkinv2_to_protospan.go b/translator/trace/zipkin/zipkinv2_to_protospan.go new file mode 100644 index 00000000000..1aee6f98c9a --- /dev/null +++ b/translator/trace/zipkin/zipkinv2_to_protospan.go @@ -0,0 +1,266 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "net" + "strconv" + "strings" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + zipkinmodel "github.com/openzipkin/zipkin-go/model" + + "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/internal" + tracetranslator "go.opentelemetry.io/collector/translator/trace" +) + +func V2BatchToOCProto(zipkinSpans []*zipkinmodel.SpanModel) (reqs []consumerdata.TraceData, err error) { + // *commonpb.Node instances have unique addresses hence + // for grouping within a map, we'll use the .String() value + byNodeGrouping := make(map[string][]*tracepb.Span) + uniqueNodes := make([]*commonpb.Node, 0, len(zipkinSpans)) + // Now translate them into tracepb.Span + for _, zspan := range zipkinSpans { + if zspan == nil { + continue + } + span, node := zipkinSpanToTraceSpan(zspan) + key := node.String() + if _, alreadyAdded := byNodeGrouping[key]; !alreadyAdded { + uniqueNodes = append(uniqueNodes, node) + } + byNodeGrouping[key] = append(byNodeGrouping[key], span) + } + + for _, node := range uniqueNodes { + key := node.String() + spans := byNodeGrouping[key] + if len(spans) == 0 { + // Should never happen but nonetheless be cautious + // not to send blank spans. + continue + } + reqs = append(reqs, consumerdata.TraceData{ + Node: node, + Spans: spans, + }) + delete(byNodeGrouping, key) + } + + return reqs, nil +} + +func zipkinSpanToTraceSpan(zs *zipkinmodel.SpanModel) (*tracepb.Span, *commonpb.Node) { + traceID := tracetranslator.UInt64ToByteTraceID(zs.TraceID.High, zs.TraceID.Low) + var parentSpanID []byte + if zs.ParentID != nil { + parentSpanID = tracetranslator.UInt64ToByteSpanID(uint64(*zs.ParentID)) + } + + attributes, ocStatus := zipkinTagsToTraceAttributes(zs.Tags, zs.Kind) + + pbs := &tracepb.Span{ + TraceId: traceID, + SpanId: tracetranslator.UInt64ToByteSpanID(uint64(zs.ID)), + ParentSpanId: parentSpanID, + Name: &tracepb.TruncatableString{Value: zs.Name}, + StartTime: internal.TimeToTimestamp(zs.Timestamp), + EndTime: internal.TimeToTimestamp(zs.Timestamp.Add(zs.Duration)), + Kind: zipkinSpanKindToProtoSpanKind(zs.Kind), + Status: ocStatus, + Attributes: attributes, + TimeEvents: zipkinAnnotationsToProtoTimeEvents(zs.Annotations), + } + + node := nodeFromZipkinEndpoints(zs, pbs) + setTimestampsIfUnset(pbs) + + return pbs, node +} + +func nodeFromZipkinEndpoints(zs *zipkinmodel.SpanModel, pbs *tracepb.Span) *commonpb.Node { + if zs.LocalEndpoint == nil && zs.RemoteEndpoint == nil { + return nil + } + + node := new(commonpb.Node) + var endpointMap map[string]string + + // Retrieve and make use of the local endpoint + if lep := zs.LocalEndpoint; lep != nil { + node.ServiceInfo = &commonpb.ServiceInfo{ + Name: lep.ServiceName, + } + endpointMap = zipkinEndpointIntoAttributes(lep, endpointMap, isLocalEndpoint) + } + + // Retrieve and make use of the remote endpoint + if rep := zs.RemoteEndpoint; rep != nil { + endpointMap = zipkinEndpointIntoAttributes(rep, endpointMap, isRemoteEndpoint) + } + + if endpointMap != nil { + if pbs.Attributes == nil { + pbs.Attributes = &tracepb.Span_Attributes{} + } + if pbs.Attributes.AttributeMap == nil { + pbs.Attributes.AttributeMap = make( + map[string]*tracepb.AttributeValue, len(endpointMap)) + } + + // Delete the redundant serviceName key since it is already on the node. + delete(endpointMap, LocalEndpointServiceName) + attrbMap := pbs.Attributes.AttributeMap + for key, value := range endpointMap { + attrbMap[key] = &tracepb.AttributeValue{ + Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: value}, + }, + } + } + } + + return node +} + +var blankIP net.IP + +// zipkinEndpointIntoAttributes extracts information from s zipkin endpoint struct +// and puts it into a map with pre-defined keys. +func zipkinEndpointIntoAttributes( + ep *zipkinmodel.Endpoint, + into map[string]string, + endpointType zipkinDirection, +) map[string]string { + + if into == nil { + into = make(map[string]string) + } + + var ipv4Key, ipv6Key, portKey, serviceNameKey string + if endpointType == isLocalEndpoint { + ipv4Key, ipv6Key = LocalEndpointIPv4, LocalEndpointIPv6 + portKey, serviceNameKey = LocalEndpointPort, LocalEndpointServiceName + } else { + ipv4Key, ipv6Key = RemoteEndpointIPv4, RemoteEndpointIPv6 + portKey, serviceNameKey = RemoteEndpointPort, RemoteEndpointServiceName + } + if ep.IPv4 != nil && !ep.IPv4.Equal(blankIP) { + into[ipv4Key] = ep.IPv4.String() + } + if ep.IPv6 != nil && !ep.IPv6.Equal(blankIP) { + into[ipv6Key] = ep.IPv6.String() + } + if ep.Port > 0 { + into[portKey] = strconv.Itoa(int(ep.Port)) + } + if serviceName := ep.ServiceName; serviceName != "" { + into[serviceNameKey] = serviceName + } + return into +} + +func zipkinSpanKindToProtoSpanKind(skind zipkinmodel.Kind) tracepb.Span_SpanKind { + switch strings.ToUpper(string(skind)) { + case "CLIENT": + return tracepb.Span_CLIENT + case "SERVER": + return tracepb.Span_SERVER + default: + return tracepb.Span_SPAN_KIND_UNSPECIFIED + } +} + +func zipkinAnnotationsToProtoTimeEvents(zas []zipkinmodel.Annotation) *tracepb.Span_TimeEvents { + if len(zas) == 0 { + return nil + } + tevs := make([]*tracepb.Span_TimeEvent, 0, len(zas)) + for _, za := range zas { + if tev := zipkinAnnotationToProtoAnnotation(za); tev != nil { + tevs = append(tevs, tev) + } + } + if len(tevs) == 0 { + return nil + } + return &tracepb.Span_TimeEvents{ + TimeEvent: tevs, + } +} + +var blankAnnotation zipkinmodel.Annotation + +func zipkinAnnotationToProtoAnnotation(zas zipkinmodel.Annotation) *tracepb.Span_TimeEvent { + if zas == blankAnnotation { + return nil + } + return &tracepb.Span_TimeEvent{ + Time: internal.TimeToTimestamp(zas.Timestamp), + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Description: &tracepb.TruncatableString{Value: zas.Value}, + }, + }, + } +} + +func zipkinTagsToTraceAttributes(tags map[string]string, skind zipkinmodel.Kind) (*tracepb.Span_Attributes, *tracepb.Status) { + // Produce and Consumer span kinds are not representable in OpenCensus format. + // We will represent them using TagSpanKind attribute, according to OpenTracing + // conventions. Check if it is one of those span kinds. + var spanKindTagVal tracetranslator.OpenTracingSpanKind + switch skind { + case zipkinmodel.Producer: + spanKindTagVal = tracetranslator.OpenTracingSpanKindProducer + case zipkinmodel.Consumer: + spanKindTagVal = tracetranslator.OpenTracingSpanKindConsumer + } + + if len(tags) == 0 && spanKindTagVal == "" { + // No input tags and no need to add a span kind tag. Keep attributes map empty. + return nil, nil + } + + sMapper := &statusMapper{} + amap := make(map[string]*tracepb.AttributeValue, len(tags)) + for key, value := range tags { + + pbAttrib := parseAnnotationValue(value) + + if drop := sMapper.fromAttribute(key, pbAttrib); drop { + continue + } + + amap[key] = pbAttrib + } + + status := sMapper.ocStatus() + + if spanKindTagVal != "" { + // Set the previously translated span kind attribute (see top of this function). + // We do this after the "tags" map is translated so that we will overwrite + // the attribute if it exists. + amap[tracetranslator.TagSpanKind] = &tracepb.AttributeValue{ + Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: string(spanKindTagVal)}, + }, + } + } + + return &tracepb.Span_Attributes{AttributeMap: amap}, status +} diff --git a/translator/trace/zipkin/zipkinv2_to_protospan_test.go b/translator/trace/zipkin/zipkinv2_to_protospan_test.go new file mode 100644 index 00000000000..3bfb9900acc --- /dev/null +++ b/translator/trace/zipkin/zipkinv2_to_protospan_test.go @@ -0,0 +1,208 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import ( + "encoding/json" + "io/ioutil" + "net" + "testing" + "time" + + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + zipkinmodel "github.com/openzipkin/zipkin-go/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + tracetranslator "go.opentelemetry.io/collector/translator/trace" +) + +func TestShortIDSpanConversion(t *testing.T) { + shortID, _ := zipkinmodel.TraceIDFromHex("0102030405060708") + assert.Equal(t, uint64(0), shortID.High, "wanted 64bit traceID, so TraceID.High must be zero") + + zc := zipkinmodel.SpanContext{ + TraceID: shortID, + ID: zipkinmodel.ID(shortID.Low), + } + zs := zipkinmodel.SpanModel{ + SpanContext: zc, + } + + ocSpan, _ := zipkinSpanToTraceSpan(&zs) + require.Len(t, ocSpan.TraceId, 16, "incorrect OC proto trace id length") + + want := []byte{0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8} + assert.Equal(t, want, ocSpan.TraceId) +} + +func TestV2SpanKindTranslation(t *testing.T) { + tests := []struct { + zipkinKind zipkinmodel.Kind + ocKind tracepb.Span_SpanKind + otKind tracetranslator.OpenTracingSpanKind + }{ + { + zipkinKind: zipkinmodel.Client, + ocKind: tracepb.Span_CLIENT, + otKind: "", + }, + { + zipkinKind: zipkinmodel.Server, + ocKind: tracepb.Span_SERVER, + otKind: "", + }, + { + zipkinKind: zipkinmodel.Producer, + ocKind: tracepb.Span_SPAN_KIND_UNSPECIFIED, + otKind: tracetranslator.OpenTracingSpanKindProducer, + }, + { + zipkinKind: zipkinmodel.Consumer, + ocKind: tracepb.Span_SPAN_KIND_UNSPECIFIED, + otKind: tracetranslator.OpenTracingSpanKindConsumer, + }, + } + + for _, tt := range tests { + t.Run(string(tt.zipkinKind), func(t *testing.T) { + zs := &zipkinmodel.SpanModel{ + SpanContext: zipkinmodel.SpanContext{ + TraceID: zipkinmodel.TraceID{Low: 123}, + ID: 456, + }, + Kind: tt.zipkinKind, + Timestamp: time.Now(), + } + ocSpan, _ := zipkinSpanToTraceSpan(zs) + assert.EqualValues(t, tt.ocKind, ocSpan.Kind) + if tt.otKind != "" { + otSpanKind := ocSpan.Attributes.AttributeMap[tracetranslator.TagSpanKind] + assert.EqualValues(t, tt.otKind, otSpanKind.GetStringValue().Value) + } else { + assert.True(t, ocSpan.Attributes == nil) + } + }) + } +} + +func TestV2ParsesTags(t *testing.T) { + jsonBlob, err := ioutil.ReadFile("./testdata/zipkin_v2_single.json") + require.NoError(t, err, "Failed to read sample JSON file: %v", err) + + var zs []*zipkinmodel.SpanModel + require.NoError(t, json.Unmarshal(jsonBlob, &zs), "Failed to unmarshal zipkin spans") + + reqs, err := V2BatchToOCProto(zs) + require.NoError(t, err, "Failed to convert Zipkin spans to Trace spans: %v", err) + require.Len(t, reqs, 1, "Expecting only one request", len(reqs)) + require.Len(t, reqs, 1, "Expecting only one span", len(reqs[0].Spans)) + + var expected = &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "http.path": {Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "/api"}, + }}, + "http.status_code": {Value: &tracepb.AttributeValue_IntValue{IntValue: 500}}, + "cache_hit": {Value: &tracepb.AttributeValue_BoolValue{BoolValue: true}}, + "ping_count": {Value: &tracepb.AttributeValue_IntValue{IntValue: 25}}, + "timeout": {Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "12.3"}, + }}, + "ipv6": {Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "7::80:807f"}}, + }, + "clnt/finagle.version": {Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "6.45.0"}}, + }, + "zipkin.remoteEndpoint.ipv4": {Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "192.168.99.101"}}, + }, + "zipkin.remoteEndpoint.port": {Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "9000"}}, + }, + "zipkin.remoteEndpoint.serviceName": {Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "backend"}}, + }, + }, + } + + var span = reqs[0].Spans[0] + assert.EqualValues(t, expected, span.Attributes) + + var expectedStatus = &tracepb.Status{ + Code: tracetranslator.OCInternal, + } + assert.EqualValues(t, expectedStatus, span.Status) +} + +func TestZipkinTagsToTraceAttributesDropTag(t *testing.T) { + tags := map[string]string{ + "status.code": "13", + "status.message": "a message", + "http.path": "/api", + } + + attrs, status := zipkinTagsToTraceAttributes(tags, zipkinmodel.Client) + + var expected = &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "http.path": {Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "/api"}, + }}, + }, + } + assert.EqualValues(t, expected, attrs) + assert.EqualValues(t, status, &tracepb.Status{Code: 13, Message: "a message"}) +} + +func TestNodeFromZipkinEndpointsSetsAttributeOnNode(t *testing.T) { + zc := zipkinmodel.SpanContext{ + TraceID: zipkinmodel.TraceID{ + High: 0x0001020304050607, + Low: 0x08090A0B0C0D0E0F}, + ID: zipkinmodel.ID(uint64(0xF1F2F3F4F5F6F7F8)), + } + zs := &zipkinmodel.SpanModel{ + SpanContext: zc, + LocalEndpoint: &zipkinmodel.Endpoint{ + ServiceName: "my-service", + IPv4: net.IPv4(1, 2, 3, 4), + }, + } + + pbs := &tracepb.Span{} + _ = nodeFromZipkinEndpoints(zs, pbs) + + var expected = &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "ipv4": {Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "1.2.3.4"}, + }}, + }, + } + + assert.EqualValues(t, expected, pbs.Attributes) +} + +func TestV2ParsesTagsHandleNil(t *testing.T) { + zs := []*zipkinmodel.SpanModel{ + nil, + } + + reqs, err := V2BatchToOCProto(zs) + assert.Nil(t, err) + assert.EqualValues(t, 0, len(reqs)) +}