Skip to content

Commit

Permalink
[receiver/skywalking] Fix skywalking traceid and spanid convertion. (#…
Browse files Browse the repository at this point in the history
…11562)

* Fix skywalking traceid and spanid convertion.

* Update CHANGELOG.md

* use uuid.Parse instead of hextable to convert id

* [receiver/skywalking] fix skywalking-rcv tid convertion

* use uuid.Parse instead of hextable to convert id

* [receiver/skywalking] fix skywalking-rcv tid convertion

* [receiver/skywalking] Add skywalking attributes into tracedata

Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
taloric and Alex Boten authored Jul 14, 2022
1 parent 471b2fb commit f1666ca
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 40 deletions.
1 change: 1 addition & 0 deletions receiver/skywalkingreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywal
go 1.17

require (
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/collector v0.55.1-0.20220711160057-6133c820fd50
Expand Down
2 changes: 2 additions & 0 deletions receiver/skywalkingreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

161 changes: 123 additions & 38 deletions receiver/skywalkingreceiver/skywalkingproto_to_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"

import (
"encoding/binary"
"bytes"
"encoding/hex"
"reflect"
"strconv"
"time"
"unsafe"

"github.com/google/uuid"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.8.0"
Expand All @@ -32,6 +35,10 @@ const (
AttributeParentService = "parent.service"
AttributeParentInstance = "parent.service.instance"
AttributeParentEndpoint = "parent.endpoint"
AttributeSkywalkingSpanID = "sw8.span_id"
AttributeSkywalkingTraceID = "sw8.trace_id"
AttributeSkywalkingSegmentID = "sw8.segment_id"
AttributeSkywalkingParentSpanID = "sw8.parent_span_id"
AttributeNetworkAddressUsedAtPeer = "network.AddressUsedAtPeer"
)

Expand All @@ -53,14 +60,17 @@ func SkywalkingToTraces(segment *agentV3.SegmentObject) ptrace.Traces {

resourceSpan := traceData.ResourceSpans().AppendEmpty()
rs := resourceSpan.Resource()

for _, span := range swSpans {
swTagsToInternalResource(span, rs)
rs.Attributes().Insert(conventions.AttributeServiceName, pcommon.NewValueString(segment.GetService()))
rs.Attributes().Insert(conventions.AttributeServiceInstanceID, pcommon.NewValueString(segment.GetServiceInstance()))
}

rs.Attributes().Insert(conventions.AttributeServiceName, pcommon.NewValueString(segment.GetService()))
rs.Attributes().Insert(conventions.AttributeServiceInstanceID, pcommon.NewValueString(segment.GetServiceInstance()))
rs.Attributes().Insert(AttributeSkywalkingTraceID, pcommon.NewValueString(segment.GetTraceId()))

il := resourceSpan.ScopeSpans().AppendEmpty()
swSpansToSpanSlice(segment.GetTraceId(), swSpans, il.Spans())
swSpansToSpanSlice(segment.GetTraceId(), segment.GetTraceSegmentId(), swSpans, il.Spans())

return traceData
}
Expand All @@ -86,7 +96,7 @@ func swTagsToInternalResource(span *agentV3.SpanObject, dest pcommon.Resource) {
}
}

func swSpansToSpanSlice(traceID string, spans []*agentV3.SpanObject, dest ptrace.SpanSlice) {
func swSpansToSpanSlice(traceID string, segmentID string, spans []*agentV3.SpanObject, dest ptrace.SpanSlice) {
if len(spans) == 0 {
return
}
Expand All @@ -96,17 +106,19 @@ func swSpansToSpanSlice(traceID string, spans []*agentV3.SpanObject, dest ptrace
if span == nil {
continue
}
swSpanToSpan(traceID, span, dest.AppendEmpty())
swSpanToSpan(traceID, segmentID, span, dest.AppendEmpty())
}
}

func swSpanToSpan(traceID string, span *agentV3.SpanObject, dest ptrace.Span) {
dest.SetTraceID(stringToTraceID(traceID))
dest.SetSpanID(uInt32ToSpanID(uint32(span.GetSpanId())))
func swSpanToSpan(traceID string, segmentID string, span *agentV3.SpanObject, dest ptrace.Span) {
dest.SetTraceID(swTraceIDToTraceID(traceID))
// skywalking defines segmentId + spanId as unique identifier
// so use segmentId to convert to an unique otel-span
dest.SetSpanID(segmentIDToSpanID(segmentID, uint32(span.GetSpanId())))

// parent spanid = -1, means(root span) no parent span in skywalking,so just make otlp's parent span id empty.
if span.ParentSpanId != -1 {
dest.SetParentSpanID(uInt32ToSpanID(uint32(span.GetParentSpanId())))
dest.SetParentSpanID(segmentIDToSpanID(segmentID, uint32(span.GetParentSpanId())))
}

dest.SetName(span.OperationName)
Expand All @@ -121,6 +133,8 @@ func swSpanToSpan(traceID string, span *agentV3.SpanObject, dest ptrace.Span) {
attrs.Clear()
}

attrs.InsertString(AttributeSkywalkingSegmentID, segmentID)
setSwSpanIDToAttributes(span, attrs)
setInternalSpanStatus(span, dest.Status())

switch {
Expand Down Expand Up @@ -154,8 +168,8 @@ func swReferencesToSpanLinks(refs []*agentV3.SegmentReference, dest ptrace.SpanL

for _, ref := range refs {
link := dest.AppendEmpty()
link.SetTraceID(stringToTraceID(ref.TraceId))
link.SetSpanID(stringToParentSpanID(ref.ParentTraceSegmentId))
link.SetTraceID(swTraceIDToTraceID(ref.TraceId))
link.SetSpanID(segmentIDToSpanID(ref.ParentTraceSegmentId, uint32(ref.ParentSpanId)))
link.SetTraceState("")
kvParis := []*common.KeyStringValuePair{
{
Expand Down Expand Up @@ -193,6 +207,13 @@ func setInternalSpanStatus(span *agentV3.SpanObject, dest ptrace.SpanStatus) {
}
}

func setSwSpanIDToAttributes(span *agentV3.SpanObject, dest pcommon.Map) {
dest.InsertInt(AttributeSkywalkingSpanID, int64(span.GetSpanId()))
if span.ParentSpanId != -1 {
dest.InsertInt(AttributeSkywalkingParentSpanID, int64(span.GetParentSpanId()))
}
}

func swLogsToSpanEvents(logs []*agentV3.Log, dest ptrace.SpanEventSlice) {
if len(logs) == 0 {
return
Expand Down Expand Up @@ -235,38 +256,102 @@ func microsecondsToTimestamp(ms int64) pcommon.Timestamp {
return pcommon.NewTimestampFromTime(time.UnixMilli(ms))
}

func stringToTraceID(traceID string) pcommon.TraceID {
return pcommon.NewTraceID(unsafeStringToBytes(traceID))
}
func swTraceIDToTraceID(traceID string) pcommon.TraceID {
// skywalking traceid format:
// de5980b8-fce3-4a37-aab9-b4ac3af7eedd: from browser/js-sdk/envoy/nginx-lua sdk/py-agent
// 56a5e1c519ae4c76a2b8b11d92cead7f.12.16563474296430001: from java-agent

func stringToParentSpanID(traceID string) pcommon.SpanID {
return pcommon.NewSpanID(unsafeStringTo8Bytes(traceID))
if len(traceID) <= 36 { // 36: uuid length (rfc4122)
uid, err := uuid.Parse(traceID)
if err != nil {
return pcommon.InvalidTraceID()
}
return pcommon.NewTraceID(uid)
}
return pcommon.NewTraceID(swStringToUUID(traceID, 0))
}

// uInt32ToSpanID converts the uint64 representation of a SpanID to pcommon.SpanID.
func uInt32ToSpanID(id uint32) pcommon.SpanID {
spanID := [8]byte{}
binary.BigEndian.PutUint32(spanID[:], id)
return pcommon.NewSpanID(spanID)
func segmentIDToSpanID(segmentID string, spanID uint32) pcommon.SpanID {
// skywalking segmentid format:
// 56a5e1c519ae4c76a2b8b11d92cead7f.12.16563474296430001: from TraceSegmentId
// 56a5e1c519ae4c76a2b8b11d92cead7f: from ParentTraceSegmentId

if len(segmentID) < 32 {
return pcommon.InvalidSpanID()
}
return pcommon.NewSpanID(uuidTo8Bytes(swStringToUUID(segmentID, spanID)))
}

func unsafeStringToBytes(s string) [16]byte {
p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data)
func swStringToUUID(s string, extra uint32) (dst [16]byte) {
// there are 2 possible formats for 's':
// s format = 56a5e1c519ae4c76a2b8b11d92cead7f.0000000000.000000000000000000
// ^ start(length=32) ^ mid(u32) ^ last(u64)
// uid = UUID(start) XOR ([4]byte(extra) . [4]byte(uint32(mid)) . [8]byte(uint64(last)))

// s format = 56a5e1c519ae4c76a2b8b11d92cead7f
// ^ start(length=32)
// uid = UUID(start) XOR [4]byte(extra)

if len(s) < 32 {
return
}

t := unsafeGetBytes(s)
var uid [16]byte
_, err := hex.Decode(uid[:], t[:32])
if err != nil {
return uid
}

for i := 0; i < 4; i++ {
uid[i] ^= byte(extra)
extra >>= 8
}

var b [16]byte
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
hdr.Data = uintptr(p)
hdr.Cap = len(s)
hdr.Len = len(s)
return b
if len(s) == 32 {
return uid
}

index1 := bytes.IndexByte(t, '.')
index2 := bytes.LastIndexByte(t, '.')
if index1 != 32 || index2 < 0 {
return
}

mid, err := strconv.Atoi(s[index1+1 : index2])
if err != nil {
return
}

last, err := strconv.Atoi(s[index2+1:])
if err != nil {
return
}

for i := 4; i < 8; i++ {
uid[i] ^= byte(mid)
mid >>= 8
}

for i := 8; i < 16; i++ {
uid[i] ^= byte(last)
last >>= 8
}

return uid
}

func uuidTo8Bytes(uuid [16]byte) [8]byte {
// high bit XOR low bit
var dst [8]byte
for i := 0; i < 8; i++ {
dst[i] = uuid[i] ^ uuid[i+8]
}
return dst
}

func unsafeStringTo8Bytes(s string) [8]byte {
p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data)
var b [8]byte
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
hdr.Data = uintptr(p)
hdr.Cap = len(s)
hdr.Len = len(s)
return b
func unsafeGetBytes(s string) []byte {
return (*[0x7fff0000]byte)(unsafe.Pointer(
(*reflect.StringHeader)(unsafe.Pointer(&s)).Data),
)[:len(s):len(s)]
}
Loading

0 comments on commit f1666ca

Please sign in to comment.