Skip to content

Commit

Permalink
[receiver/skywalking]:
Browse files Browse the repository at this point in the history
1. Restructuring the directory/structure
2. Adding an HTTP trace Reception unit test
  • Loading branch information
aheling11 authored and jeffreyhlhe committed Mar 27, 2023
1 parent e157551 commit 56fb2aa
Show file tree
Hide file tree
Showing 11 changed files with 392 additions and 184 deletions.
31 changes: 31 additions & 0 deletions cmd/otelcontribcol/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
extensions:
receivers:
# otlp:
# protocols:
# grpc:
# http:
skywalking:
protocols:
grpc:
endpoint: 0.0.0.0:11800
http:
endpoint: 0.0.0.0:12800
exporters:
logging:
loglevel: -1
service:

pipelines:

traces:
receivers: [skywalking]
processors:
exporters: [logging]
# metrics:
# receivers: [skywalking]
# processors:
# exporters: [logging]



extensions: []
26 changes: 23 additions & 3 deletions receiver/skywalkingreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net"
"strconv"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
Expand Down Expand Up @@ -80,6 +82,24 @@ func createTracesReceiver(
// that Skywalking receiver understands.
rCfg := cfg.(*Config)

c, err := createConfiguration(rCfg)
if err != nil {
return nil, err
}

r := receivers.GetOrAdd(cfg, func() component.Component {
return newSkywalkingReceiver(c, set)
})

if err = r.Unwrap().(*swReceiver).registerTraceConsumer(nextConsumer); err != nil {
return nil, err
}

return r, nil
}

// create the config that Skywalking receiver will use.
func createConfiguration(rCfg *Config) (*configuration, error) {
var err error
var c configuration
// Set ports
Expand All @@ -96,9 +116,7 @@ func createTracesReceiver(
return nil, fmt.Errorf("unable to extract port for the HTTP endpoint: %w", err)
}
}

// Create the receiver.
return newSkywalkingReceiver(&c, nextConsumer, set)
return &c, nil
}

// extract the port number from string in "address:port" format. If the
Expand All @@ -117,3 +135,5 @@ func extractPortFromEndpoint(endpoint string) (int, error) {
}
return int(port), nil
}

var receivers = sharedcomponent.NewSharedComponents()
31 changes: 19 additions & 12 deletions receiver/skywalkingreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"path/filepath"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"go.opentelemetry.io/collector/consumer/consumertest"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -54,8 +57,9 @@ func TestCreateReceiver(t *testing.T) {
Transport: "tcp",
},
}
traceSink := new(consumertest.TracesSink)
set := receivertest.NewNopCreateSettings()
tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)
tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")

Expand All @@ -75,7 +79,8 @@ func TestCreateReceiverGeneralConfig(t *testing.T) {
require.NoError(t, component.UnmarshalConfig(sub, cfg))

set := receivertest.NewNopCreateSettings()
tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)
traceSink := new(consumertest.TracesSink)
tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "receiver creation failed")
assert.NotNil(t, tReceiver, "receiver creation failed")

Expand All @@ -94,11 +99,12 @@ func TestCreateDefaultGRPCEndpoint(t *testing.T) {
Transport: "tcp",
},
}
traceSink := new(consumertest.TracesSink)
set := receivertest.NewNopCreateSettings()
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)

r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, 11800, r.(*swReceiver).config.CollectorGRPCPort, "grpc port should be default")
assert.Equal(t, 11800, r.(*sharedcomponent.SharedComponent).
Unwrap().(*swReceiver).config.CollectorGRPCPort, "grpc port should be default")
}

func TestCreateTLSGPRCEndpoint(t *testing.T) {
Expand All @@ -118,8 +124,8 @@ func TestCreateTLSGPRCEndpoint(t *testing.T) {
},
}
set := receivertest.NewNopCreateSettings()

_, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)
traceSink := new(consumertest.TracesSink)
_, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "tls-enabled receiver creation failed")
}

Expand All @@ -138,8 +144,8 @@ func TestCreateTLSHTTPEndpoint(t *testing.T) {
}

set := receivertest.NewNopCreateSettings()

_, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)
traceSink := new(consumertest.TracesSink)
_, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "tls-enabled receiver creation failed")
}

Expand All @@ -151,8 +157,9 @@ func TestCreateInvalidHTTPEndpoint(t *testing.T) {
Endpoint: defaultHTTPBindEndpoint,
}
set := receivertest.NewNopCreateSettings()
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)

traceSink := new(consumertest.TracesSink)
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink)
assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, 12800, r.(*swReceiver).config.CollectorHTTPPort, "http port should be default")
assert.Equal(t, 12800, r.(*sharedcomponent.SharedComponent).
Unwrap().(*swReceiver).config.CollectorHTTPPort, "http port should be default")
}
5 changes: 3 additions & 2 deletions receiver/skywalkingreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.2
go.opentelemetry.io/collector v0.74.0
go.opentelemetry.io/collector/component v0.74.0
Expand All @@ -22,7 +23,6 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand All @@ -37,7 +37,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.1.17 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/cors v1.8.3 // indirect
go.opencensus.io v0.24.0 // indirect
Expand All @@ -57,4 +56,6 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent

retract v0.65.0
5 changes: 1 addition & 4 deletions receiver/skywalkingreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"
package trace

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package skywalkingreceiver
package trace

import (
"strconv"
"testing"
"time"

common "skywalking.apache.org/repo/goapi/collect/common/v3"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
agentV3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)

Expand Down Expand Up @@ -316,3 +320,88 @@ func generateTracesOneEmptyResourceSpans() ptrace.Span {
il.Spans().AppendEmpty()
return il.Spans().At(0)
}

func mockGrpcTraceSegment(sequence int) *agent.SegmentObject {
seq := strconv.Itoa(sequence)
return &agent.SegmentObject{
TraceId: "trace" + seq,
TraceSegmentId: "trace-segment" + seq,
Service: "demo-segmentReportService" + seq,
ServiceInstance: "demo-instance" + seq,
IsSizeLimited: false,
Spans: []*agent.SpanObject{
{
SpanId: 1,
ParentSpanId: 0,
StartTime: time.Now().Unix(),
EndTime: time.Now().Unix() + 10,
OperationName: "operation" + seq,
Peer: "127.0.0.1:6666",
SpanType: agent.SpanType_Entry,
SpanLayer: agent.SpanLayer_Http,
ComponentId: 1,
IsError: false,
SkipAnalysis: false,
Tags: []*common.KeyStringValuePair{
{
Key: "mock-key" + seq,
Value: "mock-value" + seq,
},
},
Logs: []*agent.Log{
{
Time: time.Now().Unix(),
Data: []*common.KeyStringValuePair{
{
Key: "log-key" + seq,
Value: "log-value" + seq,
},
},
},
},
Refs: []*agent.SegmentReference{
{
RefType: agent.RefType_CrossThread,
TraceId: "trace" + seq,
ParentTraceSegmentId: "parent-trace-segment" + seq,
ParentSpanId: 0,
ParentService: "parent" + seq,
ParentServiceInstance: "parent" + seq,
ParentEndpoint: "parent" + seq,
NetworkAddressUsedAtPeer: "127.0.0.1:6666",
},
},
},
{
SpanId: 2,
ParentSpanId: 1,
StartTime: time.Now().Unix(),
EndTime: time.Now().Unix() + 20,
OperationName: "operation" + seq,
Peer: "127.0.0.1:6666",
SpanType: agent.SpanType_Local,
SpanLayer: agent.SpanLayer_Http,
ComponentId: 2,
IsError: false,
SkipAnalysis: false,
Tags: []*common.KeyStringValuePair{
{
Key: "mock-key" + seq,
Value: "mock-value" + seq,
},
},
Logs: []*agent.Log{
{
Time: time.Now().Unix(),
Data: []*common.KeyStringValuePair{
{
Key: "log-key" + seq,
Value: "log-value" + seq,
},
},
},
},
},
},
}
}
Loading

0 comments on commit 56fb2aa

Please sign in to comment.