Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/skywalking] add Skywalking tracing receiver impl #8549

Merged
merged 26 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4803ac1
fix and add license
JaredTan95 Dec 21, 2021
3621312
remove usage of Deprecated LogRecord.Name field.
JaredTan95 Feb 11, 2022
e262fd7
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
JaredTan95 Feb 13, 2022
50eed02
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
JaredTan95 Feb 16, 2022
5d9b46d
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
JaredTan95 Feb 17, 2022
a4ed1bb
Merge branch 'main' of https://github.com/open-telemetry/opentelemetr…
JaredTan95 Mar 2, 2022
047eef4
fix com
JaredTan95 Mar 4, 2022
32bb075
finish skywalking tracing receiver and convert to otel.
JaredTan95 Mar 18, 2022
43d1eb7
Merge branch 'main' into sw_receiver_impl
JaredTan95 Mar 18, 2022
409c76f
fix ci
JaredTan95 Mar 18, 2022
15d56f8
add CHANGELOG
JaredTan95 Mar 18, 2022
a899b29
Update receiver/skywalkingreceiver/skywalkingproto_to_traces.go
JaredTan95 Mar 19, 2022
57ba37e
Update receiver/skywalkingreceiver/skywalkingproto_to_traces.go
JaredTan95 Mar 19, 2022
e565ca6
Update receiver/skywalkingreceiver/skywalkingproto_to_traces.go
JaredTan95 Mar 19, 2022
fb734df
Update receiver/skywalkingreceiver/skywalkingproto_to_traces.go
JaredTan95 Mar 19, 2022
4cafd96
fix typo
JaredTan95 Mar 19, 2022
c4451c2
Merge remote-tracking branch 'origin/main' into sw_receiver_impl
JaredTan95 Mar 19, 2022
9edf9c7
fix go sum
JaredTan95 Mar 19, 2022
026442f
Merge branch 'main' into sw_receiver_impl
JaredTan95 Mar 19, 2022
b29fbad
fix
JaredTan95 Mar 19, 2022
ef2a2aa
revert mischange
JaredTan95 Mar 19, 2022
b0da8d5
revert mischange
JaredTan95 Mar 19, 2022
8d60992
add more test
JaredTan95 Mar 19, 2022
b161ea0
fix ci
JaredTan95 Mar 19, 2022
5c2e67e
fix ci lint
JaredTan95 Mar 19, 2022
085b543
add codeowners
JaredTan95 Mar 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- `coralogixexporter` Allow exporter timeout to be configured (#7957)
- `prometheusremotewriteexporter` support adding trace id and span id attached to exemplars (#8380)
- `influxdbexporter`: accept histogram metric missing infinity bucket. (#8462)
- `skywalkingreceiver`: Added implementation of Skywalking receiver. (#8549)

### 🛑 Breaking changes 🛑

Expand Down
4 changes: 3 additions & 1 deletion cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ require (
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21 // indirect
skywalking.apache.org/repo/goapi v0.0.0-20220121092418-9c455d0dda3f // indirect
)

// Replace references to modules that are in this repository with their relateive paths
Expand Down Expand Up @@ -751,6 +751,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signa

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver => ../../receiver/simpleprometheusreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver => ../../receiver/skywalkingreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver => ../../receiver/splunkhecreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver => ../../receiver/statsdreceiver
Expand Down
1 change: 1 addition & 0 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sapmreceiver v0.47.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signalfxreceiver v0.47.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver v0.47.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver v0.47.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver v0.47.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver v0.47.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.47.0
Expand Down Expand Up @@ -477,7 +478,7 @@ require (
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21 // indirect
skywalking.apache.org/repo/goapi v0.0.0-20220121092418-9c455d0dda3f // indirect
)

// Replace references to modules that are in this repository with their relateive paths
Expand Down
151 changes: 150 additions & 1 deletion go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions internal/components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sapmreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signalfxreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver"
Expand Down Expand Up @@ -207,6 +208,7 @@ func Components() (component.Factories, error) {
sapmreceiver.NewFactory(),
signalfxreceiver.NewFactory(),
simpleprometheusreceiver.NewFactory(),
skywalkingreceiver.NewFactory(),
splunkhecreceiver.NewFactory(),
statsdreceiver.NewFactory(),
wavefrontreceiver.NewFactory(),
Expand Down
3 changes: 3 additions & 0 deletions internal/components/receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,9 @@ func TestDefaultReceivers(t *testing.T) {
{
receiver: "prometheus_simple",
},
{
receiver: "skywalking",
},
{
receiver: "splunk_hec",
},
Expand Down
2 changes: 1 addition & 1 deletion receiver/skywalkingreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Supported pipeline types: traces

## ⚠️ Warning

Note: This component is currently work in progress, and traces receiver is not yet fully functional.
Note: This component is experimental and is not recommended for production environments.

## Getting Started

Expand Down
250 changes: 250 additions & 0 deletions receiver/skywalkingreceiver/skywalkingproto_to_traces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingreceiver

import (
"encoding/binary"
"fmt"
"reflect"
"time"
"unsafe"

"go.opentelemetry.io/collector/model/pdata"
conventions "go.opentelemetry.io/collector/model/semconv/v1.8.0"
common "skywalking.apache.org/repo/goapi/collect/common/v3"
agentV3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)

var OtSpanTagsMapping = map[string]string{
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
"url": conventions.AttributeHTTPURL,
"status_code": conventions.AttributeHTTPStatusCode,
"db.type": conventions.AttributeDBSystem,
"db.instance": conventions.AttributeDBName,
"mq.broker": conventions.AttributeNetPeerName,
}

func SkywalkingToOtlpTraces(segment *agentV3.SegmentObject) pdata.Traces {
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
traceData := pdata.NewTraces()

swSpans := segment.Spans
if swSpans == nil && len(swSpans) == 0 {
return traceData
}

resourceSpan := traceData.ResourceSpans().AppendEmpty()
rs := resourceSpan.Resource()
for _, span := range swSpans {
swTagsToInternalResource(span, rs)
rs.Attributes().Insert(conventions.AttributeServiceName, pdata.NewAttributeValueString(segment.GetService()))
rs.Attributes().Insert(conventions.AttributeServiceInstanceID, pdata.NewAttributeValueString(segment.GetServiceInstance()))
}

il := resourceSpan.InstrumentationLibrarySpans().AppendEmpty()
swSpansToOtlpSpans(segment.GetTraceId(), swSpans, il.Spans())

return traceData
}

func swTagsToInternalResource(span *agentV3.SpanObject, dest pdata.Resource) {
if span == nil {
return
}

attrs := dest.Attributes()
attrs.Clear()

tags := span.Tags
if tags == nil {
return
}

for _, tag := range tags {
otKey, ok := OtSpanTagsMapping[tag.Key]
if ok {
attrs.UpsertString(otKey, tag.Value)
}
}
}

func swSpansToOtlpSpans(traceId string, spans []*agentV3.SpanObject, dest pdata.SpanSlice) {
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
if len(spans) == 0 {
return
}

dest.EnsureCapacity(len(spans))
for _, span := range spans {
if span == nil {
continue
}
swSpanToOtelSpan(traceId, span, dest.AppendEmpty())
}
}

func swSpanToOtelSpan(traceId string, span *agentV3.SpanObject, dest pdata.Span) {
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
dest.SetTraceID(stringToTraceID(traceId))
dest.SetSpanID(uInt32ToSpanID(uint32(span.GetSpanId())))

// parent spanid = -1, means(root span) no parent span in skywalking,so just make otlp's parent span id empty.
if span.ParentSpanId != -1 {
dest.SetParentSpanID(uInt32ToSpanID(uint32(span.GetParentSpanId())))
}

dest.SetName(span.OperationName)
dest.SetStartTimestamp(microsecondsToTimestamp(span.GetStartTime()))
dest.SetEndTimestamp(microsecondsToTimestamp(span.GetEndTime()))

attrs := dest.Attributes()
attrs.EnsureCapacity(len(span.Tags))
swKvPairsToInternalAttributes(span.Tags, attrs)
// drop the attributes slice if all of them were replaced during translation
if attrs.Len() == 0 {
attrs.Clear()
}

setInternalSpanStatus(span, dest.Status())

switch {
case span.SpanLayer == agentV3.SpanLayer_MQ:
if span.SpanType == agentV3.SpanType_Entry {
dest.SetKind(pdata.SpanKindConsumer)
} else if span.SpanType == agentV3.SpanType_Exit {
dest.SetKind(pdata.SpanKindProducer)
}
case span.GetSpanType() == agentV3.SpanType_Exit:
dest.SetKind(pdata.SpanKindClient)
case span.GetSpanType() == agentV3.SpanType_Entry:
dest.SetKind(pdata.SpanKindServer)
case span.GetSpanType() == agentV3.SpanType_Local:
dest.SetKind(pdata.SpanKindInternal)
default:
dest.SetKind(pdata.SpanKindUnspecified)
}

swLogsToSpanEvents(span.GetLogs(), dest.Events())
// swkyalking: In the across thread and across process, these references targeting the parent segments.
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
swReferencesToSpanLinks(span.Refs, int64(span.ParentSpanId), dest.Links())
}

func swReferencesToSpanLinks(refs []*agentV3.SegmentReference, excludeParentID int64, dest pdata.SpanLinkSlice) {
if len(refs) == 0 {
return
}

dest.EnsureCapacity(len(refs))

for _, ref := range refs {
parentTraceSegmentID := ""
for _, part := range ref.ParentTraceSegmentId {
parentTraceSegmentID += fmt.Sprintf("%d", part)
}
link := dest.AppendEmpty()
link.SetTraceID(stringToTraceID(ref.TraceId))
link.SetSpanID(stringToParentSpanId(ref.ParentTraceSegmentId))
}
}

func setInternalSpanStatus(span *agentV3.SpanObject, dest pdata.SpanStatus) {
statusCode := pdata.StatusCodeUnset
statusMessage := ""

if span.GetIsError() {
statusCode = pdata.StatusCodeError
statusMessage = "ERROR"
} else {
statusCode = pdata.StatusCodeOk
statusMessage = "SUCCESS"
}

dest.SetCode(statusCode)
dest.SetMessage(statusMessage)
}

func swLogsToSpanEvents(logs []*agentV3.Log, dest pdata.SpanEventSlice) {
if len(logs) == 0 {
return
}
dest.EnsureCapacity(len(logs))

for i, log := range logs {
var event pdata.SpanEvent
if dest.Len() > i {
event = dest.At(i)
} else {
event = dest.AppendEmpty()
}

event.SetName("logs")
event.SetTimestamp(microsecondsToTimestamp(log.GetTime()))
if len(log.GetData()) == 0 {
continue
}

attrs := event.Attributes()
attrs.Clear()
attrs.EnsureCapacity(len(log.GetData()))
swKvPairsToInternalAttributes(log.GetData(), attrs)
}
}

func swKvPairsToInternalAttributes(pairs []*common.KeyStringValuePair, dest pdata.AttributeMap) {
if pairs == nil {
return
}

for _, pair := range pairs {
dest.UpsertString(pair.Key, pair.Value)
}
}

// microsecondsToTimestamp converts epoch microseconds to pdata.Timestamp
func microsecondsToTimestamp(ms int64) pdata.Timestamp {
return pdata.NewTimestampFromTime(time.UnixMilli(ms))
}

func stringToTraceID(traceId string) pdata.TraceID {
return pdata.NewTraceID(stringToBytes(traceId))
}

func stringToParentSpanId(traceId string) pdata.SpanID {
traceID := stringTo8Bytes(traceId)
return pdata.NewSpanID(traceID)
}

// uInt32ToSpanID converts the uint64 representation of a SpanID to pdata.SpanID.
func uInt32ToSpanID(id uint32) pdata.SpanID {
spanID := [8]byte{}
binary.BigEndian.PutUint32(spanID[:], id)
return pdata.NewSpanID(spanID)
}

func stringToBytes(s string) [16]byte {
sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
bh := reflect.SliceHeader{
Data: sh.Data,
Len: sh.Len,
Cap: sh.Len,
}
return *(*[16]byte)(unsafe.Pointer(&bh))
}

func stringTo8Bytes(s string) [8]byte {
sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
bh := reflect.SliceHeader{
Data: sh.Data,
Len: sh.Len,
Cap: sh.Len,
}
return *(*[8]byte)(unsafe.Pointer(&bh))
}
30 changes: 30 additions & 0 deletions receiver/skywalkingreceiver/skywalkingproto_to_traces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingreceiver

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestSwProtoToTraces(t *testing.T) {
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
swSpan := mockGrpcTraceSegment(1)
td := SkywalkingToOtlpTraces(swSpan)

assert.Equal(t, 1, td.ResourceSpans().Len())
assert.Equal(t, 2, td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Len())

}
30 changes: 30 additions & 0 deletions receiver/skywalkingreceiver/sw_dummy_clr_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingreceiver

import (
"context"

common "skywalking.apache.org/repo/goapi/collect/common/v3"
agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)

type clrService struct {
JaredTan95 marked this conversation as resolved.
Show resolved Hide resolved
agent.UnimplementedCLRMetricReportServiceServer
}

func (c *clrService) Collect(ctx context.Context, req *agent.CLRMetricCollection) (*common.Commands, error) {
return &common.Commands{}, nil
}
Loading