From 56fb2aa122c26117271485a3dabb465ebece412c Mon Sep 17 00:00:00 2001 From: JeffreyHe Date: Mon, 27 Mar 2023 20:26:42 +0800 Subject: [PATCH] [receiver/skywalking]: 1. Restructuring the directory/structure 2. Adding an HTTP trace Reception unit test --- cmd/otelcontribcol/config.yaml | 31 ++++ receiver/skywalkingreceiver/factory.go | 26 +++- receiver/skywalkingreceiver/factory_test.go | 31 ++-- receiver/skywalkingreceiver/go.mod | 5 +- receiver/skywalkingreceiver/go.sum | 5 +- .../trace}/skywalkingproto_to_traces.go | 2 +- .../trace}/skywalkingproto_to_traces_test.go | 91 ++++++++++- .../internal/trace/tracing_report_service.go | 143 ++++++++++++++++++ ...ace_receiver.go => skywalking_receiver.go} | 101 +++---------- ...er_test.go => skywalking_receiver_test.go} | 69 +++++++-- .../tracing_report_service.go | 72 --------- 11 files changed, 392 insertions(+), 184 deletions(-) create mode 100644 cmd/otelcontribcol/config.yaml rename receiver/skywalkingreceiver/{ => internal/trace}/skywalkingproto_to_traces.go (98%) rename receiver/skywalkingreceiver/{ => internal/trace}/skywalkingproto_to_traces_test.go (78%) create mode 100644 receiver/skywalkingreceiver/internal/trace/tracing_report_service.go rename receiver/skywalkingreceiver/{trace_receiver.go => skywalking_receiver.go} (71%) rename receiver/skywalkingreceiver/{trace_receiver_test.go => skywalking_receiver_test.go} (72%) delete mode 100644 receiver/skywalkingreceiver/tracing_report_service.go diff --git a/cmd/otelcontribcol/config.yaml b/cmd/otelcontribcol/config.yaml new file mode 100644 index 000000000000..e966e3bac98a --- /dev/null +++ b/cmd/otelcontribcol/config.yaml @@ -0,0 +1,31 @@ +extensions: +receivers: +# otlp: +# protocols: +# grpc: +# http: + skywalking: + protocols: + grpc: + endpoint: 0.0.0.0:11800 + http: + endpoint: 0.0.0.0:12800 +exporters: + logging: + loglevel: -1 +service: + + pipelines: + + traces: + receivers: [skywalking] + processors: + exporters: [logging] +# metrics: +# receivers: [skywalking] +# processors: +# exporters: [logging] + + + + extensions: [] \ No newline at end of file diff --git a/receiver/skywalkingreceiver/factory.go b/receiver/skywalkingreceiver/factory.go index 333e4b8104bd..4e32e6ef47ec 100644 --- a/receiver/skywalkingreceiver/factory.go +++ b/receiver/skywalkingreceiver/factory.go @@ -22,6 +22,8 @@ import ( "net" "strconv" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" @@ -80,6 +82,24 @@ func createTracesReceiver( // that Skywalking receiver understands. rCfg := cfg.(*Config) + c, err := createConfiguration(rCfg) + if err != nil { + return nil, err + } + + r := receivers.GetOrAdd(cfg, func() component.Component { + return newSkywalkingReceiver(c, set) + }) + + if err = r.Unwrap().(*swReceiver).registerTraceConsumer(nextConsumer); err != nil { + return nil, err + } + + return r, nil +} + +// create the config that Skywalking receiver will use. +func createConfiguration(rCfg *Config) (*configuration, error) { var err error var c configuration // Set ports @@ -96,9 +116,7 @@ func createTracesReceiver( return nil, fmt.Errorf("unable to extract port for the HTTP endpoint: %w", err) } } - - // Create the receiver. - return newSkywalkingReceiver(&c, nextConsumer, set) + return &c, nil } // extract the port number from string in "address:port" format. If the @@ -117,3 +135,5 @@ func extractPortFromEndpoint(endpoint string) (int, error) { } return int(port), nil } + +var receivers = sharedcomponent.NewSharedComponents() diff --git a/receiver/skywalkingreceiver/factory_test.go b/receiver/skywalkingreceiver/factory_test.go index 776c0e14d971..173dc448939d 100644 --- a/receiver/skywalkingreceiver/factory_test.go +++ b/receiver/skywalkingreceiver/factory_test.go @@ -19,6 +19,9 @@ import ( "path/filepath" "testing" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" + "go.opentelemetry.io/collector/consumer/consumertest" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -54,8 +57,9 @@ func TestCreateReceiver(t *testing.T) { Transport: "tcp", }, } + traceSink := new(consumertest.TracesSink) set := receivertest.NewNopCreateSettings() - tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) + tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "receiver creation failed") assert.NotNil(t, tReceiver, "receiver creation failed") @@ -75,7 +79,8 @@ func TestCreateReceiverGeneralConfig(t *testing.T) { require.NoError(t, component.UnmarshalConfig(sub, cfg)) set := receivertest.NewNopCreateSettings() - tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) + traceSink := new(consumertest.TracesSink) + tReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "receiver creation failed") assert.NotNil(t, tReceiver, "receiver creation failed") @@ -94,11 +99,12 @@ func TestCreateDefaultGRPCEndpoint(t *testing.T) { Transport: "tcp", }, } + traceSink := new(consumertest.TracesSink) set := receivertest.NewNopCreateSettings() - r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) - + r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "unexpected error creating receiver") - assert.Equal(t, 11800, r.(*swReceiver).config.CollectorGRPCPort, "grpc port should be default") + assert.Equal(t, 11800, r.(*sharedcomponent.SharedComponent). + Unwrap().(*swReceiver).config.CollectorGRPCPort, "grpc port should be default") } func TestCreateTLSGPRCEndpoint(t *testing.T) { @@ -118,8 +124,8 @@ func TestCreateTLSGPRCEndpoint(t *testing.T) { }, } set := receivertest.NewNopCreateSettings() - - _, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) + traceSink := new(consumertest.TracesSink) + _, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "tls-enabled receiver creation failed") } @@ -138,8 +144,8 @@ func TestCreateTLSHTTPEndpoint(t *testing.T) { } set := receivertest.NewNopCreateSettings() - - _, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) + traceSink := new(consumertest.TracesSink) + _, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "tls-enabled receiver creation failed") } @@ -151,8 +157,9 @@ func TestCreateInvalidHTTPEndpoint(t *testing.T) { Endpoint: defaultHTTPBindEndpoint, } set := receivertest.NewNopCreateSettings() - r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil) - + traceSink := new(consumertest.TracesSink) + r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, traceSink) assert.NoError(t, err, "unexpected error creating receiver") - assert.Equal(t, 12800, r.(*swReceiver).config.CollectorHTTPPort, "http port should be default") + assert.Equal(t, 12800, r.(*sharedcomponent.SharedComponent). + Unwrap().(*swReceiver).config.CollectorHTTPPort, "http port should be default") } diff --git a/receiver/skywalkingreceiver/go.mod b/receiver/skywalkingreceiver/go.mod index dafdd2eaf22f..ad6b5033dbb6 100644 --- a/receiver/skywalkingreceiver/go.mod +++ b/receiver/skywalkingreceiver/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.8.2 go.opentelemetry.io/collector v0.74.0 go.opentelemetry.io/collector/component v0.74.0 @@ -22,7 +23,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect - github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -37,7 +37,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.1.17 // indirect - github.com/pelletier/go-toml v1.9.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.8.3 // indirect go.opencensus.io v0.24.0 // indirect @@ -57,4 +56,6 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent + retract v0.65.0 diff --git a/receiver/skywalkingreceiver/go.sum b/receiver/skywalkingreceiver/go.sum index f274af9bac68..1c069b627ec9 100644 --- a/receiver/skywalkingreceiver/go.sum +++ b/receiver/skywalkingreceiver/go.sum @@ -68,7 +68,6 @@ github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= -github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -244,7 +243,6 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= -github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -435,7 +433,6 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -508,8 +505,8 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUyUwEgHQXw849cJrilpS5NeIjOWESAw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/receiver/skywalkingreceiver/skywalkingproto_to_traces.go b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces.go similarity index 98% rename from receiver/skywalkingreceiver/skywalkingproto_to_traces.go rename to receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces.go index efdc7f881fde..ab1c7a7fda4c 100644 --- a/receiver/skywalkingreceiver/skywalkingproto_to_traces.go +++ b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver" +package trace import ( "bytes" diff --git a/receiver/skywalkingreceiver/skywalkingproto_to_traces_test.go b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces_test.go similarity index 78% rename from receiver/skywalkingreceiver/skywalkingproto_to_traces_test.go rename to receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces_test.go index 5262a32be43e..bc60b63da064 100644 --- a/receiver/skywalkingreceiver/skywalkingproto_to_traces_test.go +++ b/receiver/skywalkingreceiver/internal/trace/skywalkingproto_to_traces_test.go @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -package skywalkingreceiver +package trace import ( "strconv" "testing" + "time" + + common "skywalking.apache.org/repo/goapi/collect/common/v3" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" agentV3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" ) @@ -316,3 +320,88 @@ func generateTracesOneEmptyResourceSpans() ptrace.Span { il.Spans().AppendEmpty() return il.Spans().At(0) } + +func mockGrpcTraceSegment(sequence int) *agent.SegmentObject { + seq := strconv.Itoa(sequence) + return &agent.SegmentObject{ + TraceId: "trace" + seq, + TraceSegmentId: "trace-segment" + seq, + Service: "demo-segmentReportService" + seq, + ServiceInstance: "demo-instance" + seq, + IsSizeLimited: false, + Spans: []*agent.SpanObject{ + { + SpanId: 1, + ParentSpanId: 0, + StartTime: time.Now().Unix(), + EndTime: time.Now().Unix() + 10, + OperationName: "operation" + seq, + Peer: "127.0.0.1:6666", + SpanType: agent.SpanType_Entry, + SpanLayer: agent.SpanLayer_Http, + ComponentId: 1, + IsError: false, + SkipAnalysis: false, + Tags: []*common.KeyStringValuePair{ + { + Key: "mock-key" + seq, + Value: "mock-value" + seq, + }, + }, + Logs: []*agent.Log{ + { + Time: time.Now().Unix(), + Data: []*common.KeyStringValuePair{ + { + Key: "log-key" + seq, + Value: "log-value" + seq, + }, + }, + }, + }, + Refs: []*agent.SegmentReference{ + { + RefType: agent.RefType_CrossThread, + TraceId: "trace" + seq, + ParentTraceSegmentId: "parent-trace-segment" + seq, + ParentSpanId: 0, + ParentService: "parent" + seq, + ParentServiceInstance: "parent" + seq, + ParentEndpoint: "parent" + seq, + NetworkAddressUsedAtPeer: "127.0.0.1:6666", + }, + }, + }, + { + SpanId: 2, + ParentSpanId: 1, + StartTime: time.Now().Unix(), + EndTime: time.Now().Unix() + 20, + OperationName: "operation" + seq, + Peer: "127.0.0.1:6666", + SpanType: agent.SpanType_Local, + SpanLayer: agent.SpanLayer_Http, + ComponentId: 2, + IsError: false, + SkipAnalysis: false, + Tags: []*common.KeyStringValuePair{ + { + Key: "mock-key" + seq, + Value: "mock-value" + seq, + }, + }, + Logs: []*agent.Log{ + { + Time: time.Now().Unix(), + Data: []*common.KeyStringValuePair{ + { + Key: "log-key" + seq, + Value: "log-value" + seq, + }, + }, + }, + }, + }, + }, + } +} diff --git a/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go new file mode 100644 index 000000000000..66ec2f35b703 --- /dev/null +++ b/receiver/skywalkingreceiver/internal/trace/tracing_report_service.go @@ -0,0 +1,143 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver" + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/obsreport" + "go.opentelemetry.io/collector/receiver" + "google.golang.org/protobuf/proto" + common "skywalking.apache.org/repo/goapi/collect/common/v3" + agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" + v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3" +) + +const ( + collectorHTTPTransport = "http" + grpcTransport = "grpc" + failing = "failing" +) + +type Receiver struct { + nextConsumer consumer.Traces + grpcObsrecv *obsreport.Receiver + httpObsrecv *obsreport.Receiver + agent.UnimplementedTraceSegmentReportServiceServer +} + +// New creates a new Receiver reference. +func New(nextConsumer consumer.Traces, set receiver.CreateSettings) (*Receiver, error) { + grpcObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + ReceiverID: set.ID, + Transport: grpcTransport, + ReceiverCreateSettings: set, + }) + if err != nil { + return nil, err + } + httpObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + ReceiverID: set.ID, + Transport: collectorHTTPTransport, + ReceiverCreateSettings: set, + }) + if err != nil { + return nil, err + } + return &Receiver{ + nextConsumer: nextConsumer, + grpcObsrecv: grpcObsrecv, + httpObsrecv: httpObsrecv, + }, nil +} + +// Collect implements the service Collect traces func. +func (r *Receiver) Collect(stream agent.TraceSegmentReportService_CollectServer) error { + for { + segmentObject, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + return stream.SendAndClose(&common.Commands{}) + } + return err + } + + err = consumeTraces(stream.Context(), segmentObject, r.nextConsumer) + if err != nil { + return stream.SendAndClose(&common.Commands{}) + } + } +} + +// CollectInSync implements the service CollectInSync traces func. +func (r *Receiver) CollectInSync(ctx context.Context, segments *agent.SegmentCollection) (*common.Commands, error) { + for _, segment := range segments.Segments { + marshaledSegment, err := proto.Marshal(segment) + if err != nil { + fmt.Printf("cannot marshal segemnt from sync, %v", err) + } + err = consumeTraces(ctx, segment, r.nextConsumer) + if err != nil { + fmt.Printf("cannot consume traces, %v", err) + } + fmt.Printf("receivec data:%s", marshaledSegment) + } + return &common.Commands{}, nil +} + +func consumeTraces(ctx context.Context, segment *agent.SegmentObject, consumer consumer.Traces) error { + if segment == nil { + return nil + } + ptd := SkywalkingToTraces(segment) + return consumer.ConsumeTraces(ctx, ptd) +} + +func (r *Receiver) HTTPHandler(rsp http.ResponseWriter, req *http.Request) { + rsp.Header().Set("Content-Type", "application/json") + b, err := io.ReadAll(req.Body) + if err != nil { + response := &Response{Status: failing, Msg: err.Error()} + ResponseWithJSON(rsp, response, http.StatusBadRequest) + return + } + var data []*v3.SegmentObject + if err = json.Unmarshal(b, &data); err != nil { + fmt.Printf("cannot Unmarshal skywalking segment collection, %v", err) + } + + for _, segment := range data { + err = consumeTraces(req.Context(), segment, r.nextConsumer) + if err != nil { + fmt.Printf("cannot consume traces, %v", err) + } + } +} + +type Response struct { + Status string `json:"status"` + Msg string `json:"msg"` +} + +func ResponseWithJSON(rsp http.ResponseWriter, response *Response, code int) { + rsp.WriteHeader(code) + _ = json.NewEncoder(rsp).Encode(response) +} diff --git a/receiver/skywalkingreceiver/trace_receiver.go b/receiver/skywalkingreceiver/skywalking_receiver.go similarity index 71% rename from receiver/skywalkingreceiver/trace_receiver.go rename to receiver/skywalkingreceiver/skywalking_receiver.go index 0acd9e35f9d3..feba434791f4 100644 --- a/receiver/skywalkingreceiver/trace_receiver.go +++ b/receiver/skywalkingreceiver/skywalking_receiver.go @@ -16,20 +16,19 @@ package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-co import ( "context" - "encoding/json" "errors" "fmt" - "io" "net" "net/http" "sync" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver/internal/trace" + "github.com/gorilla/mux" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/receiver" "go.uber.org/multierr" "google.golang.org/grpc" @@ -52,8 +51,6 @@ type configuration struct { // Receiver type is used to receive spans that were originally intended to be sent to Skywaking. // This receiver is basically a Skywalking collector. type swReceiver struct { - nextConsumer consumer.Traces - config *configuration grpc *grpc.Server @@ -63,49 +60,33 @@ type swReceiver struct { settings receiver.CreateSettings - grpcObsrecv *obsreport.Receiver - httpObsrecv *obsreport.Receiver - segmentReportService *traceSegmentReportService - dummyReportService *dummyReportService -} + traceReceiver *trace.Receiver -const ( - collectorHTTPTransport = "http" - grpcTransport = "grpc" - failing = "failing" -) + dummyReportService *dummyReportService +} // newSkywalkingReceiver creates a TracesReceiver that receives traffic as a Skywalking collector func newSkywalkingReceiver( config *configuration, - nextConsumer consumer.Traces, set receiver.CreateSettings, -) (*swReceiver, error) { +) *swReceiver { + return &swReceiver{ + config: config, + settings: set, + } +} - grpcObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ - ReceiverID: set.ID, - Transport: grpcTransport, - ReceiverCreateSettings: set, - }) - if err != nil { - return nil, err +// registerTraceConsumer register a TracesReceiver that receives trace +func (sr *swReceiver) registerTraceConsumer(tc consumer.Traces) error { + if tc == nil { + return component.ErrNilNextConsumer } - httpObsrecv, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ - ReceiverID: set.ID, - Transport: collectorHTTPTransport, - ReceiverCreateSettings: set, - }) + var err error + sr.traceReceiver, err = trace.New(tc, sr.settings) if err != nil { - return nil, err + return err } - - return &swReceiver{ - config: config, - nextConsumer: nextConsumer, - settings: set, - grpcObsrecv: grpcObsrecv, - httpObsrecv: httpObsrecv, - }, nil + return nil } func (sr *swReceiver) collectorGRPCAddr() string { @@ -157,7 +138,7 @@ func (sr *swReceiver) startCollector(host component.Host) error { } nr := mux.NewRouter() - nr.HandleFunc("/v3/segments", sr.httpHandler).Methods(http.MethodPost) + nr.HandleFunc("/v3/segments", sr.traceReceiver.HTTPHandler).Methods(http.MethodPost) sr.collectorServer, cerr = sr.config.CollectorHTTPSettings.ToServer(host, sr.settings.TelemetrySettings, nr) if cerr != nil { return cerr @@ -178,25 +159,24 @@ func (sr *swReceiver) startCollector(host component.Host) error { if err != nil { return fmt.Errorf("failed to build the options for the Skywalking gRPC Collector: %w", err) } - gaddr := sr.collectorGRPCAddr() gln, gerr := net.Listen("tcp", gaddr) if gerr != nil { return fmt.Errorf("failed to bind to gRPC address %q: %w", gaddr, gerr) } - - sr.segmentReportService = &traceSegmentReportService{sr: sr} - v3.RegisterTraceSegmentReportServiceServer(sr.grpc, sr.segmentReportService) + if sr.traceReceiver != nil { + v3.RegisterTraceSegmentReportServiceServer(sr.grpc, sr.traceReceiver) + } sr.dummyReportService = &dummyReportService{} - management.RegisterManagementServiceServer(sr.grpc, sr.dummyReportService) cds.RegisterConfigurationDiscoveryServiceServer(sr.grpc, sr.dummyReportService) event.RegisterEventServiceServer(sr.grpc, &eventService{}) profile.RegisterProfileTaskServer(sr.grpc, sr.dummyReportService) - v3.RegisterJVMMetricReportServiceServer(sr.grpc, sr.dummyReportService) v3.RegisterMeterReportServiceServer(sr.grpc, &meterService{}) v3.RegisterCLRMetricReportServiceServer(sr.grpc, &clrService{}) v3.RegisterBrowserPerfServiceServer(sr.grpc, sr.dummyReportService) + //TODO: add jvm metrics service + v3.RegisterJVMMetricReportServiceServer(sr.grpc, sr.dummyReportService) sr.goroutines.Add(1) go func() { @@ -209,34 +189,3 @@ func (sr *swReceiver) startCollector(host component.Host) error { return nil } - -type Response struct { - Status string `json:"status"` - Msg string `json:"msg"` -} - -func (sr *swReceiver) httpHandler(rsp http.ResponseWriter, r *http.Request) { - rsp.Header().Set("Content-Type", "application/json") - b, err := io.ReadAll(r.Body) - if err != nil { - response := &Response{Status: failing, Msg: err.Error()} - ResponseWithJSON(rsp, response, http.StatusBadRequest) - return - } - var data []*v3.SegmentObject - if err = json.Unmarshal(b, &data); err != nil { - fmt.Printf("cannot Unmarshal skywalking segment collection, %v", err) - } - - for _, segment := range data { - err = consumeTraces(r.Context(), segment, sr.nextConsumer) - if err != nil { - fmt.Printf("cannot consume traces, %v", err) - } - } -} - -func ResponseWithJSON(rsp http.ResponseWriter, response *Response, code int) { - rsp.WriteHeader(code) - _ = json.NewEncoder(rsp).Encode(response) -} diff --git a/receiver/skywalkingreceiver/trace_receiver_test.go b/receiver/skywalkingreceiver/skywalking_receiver_test.go similarity index 72% rename from receiver/skywalkingreceiver/trace_receiver_test.go rename to receiver/skywalkingreceiver/skywalking_receiver_test.go index fc338a851045..cfdd30c23a2e 100644 --- a/receiver/skywalkingreceiver/trace_receiver_test.go +++ b/receiver/skywalkingreceiver/skywalking_receiver_test.go @@ -15,17 +15,21 @@ package skywalkingreceiver import ( + "bytes" "context" + "encoding/json" "fmt" + "net/http" "strconv" "testing" "time" + "go.opentelemetry.io/collector/config/confighttp" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" "google.golang.org/grpc" @@ -41,9 +45,10 @@ var ( func TestTraceSource(t *testing.T) { set := receivertest.NewNopCreateSettings() set.ID = skywalkingReceiver - jr, err := newSkywalkingReceiver(&configuration{}, nil, set) - require.NoError(t, err) - require.NotNil(t, jr) + mockSwReceiver := newSkywalkingReceiver(&configuration{}, set) + err := mockSwReceiver.registerTraceConsumer(nil) + assert.Equal(t, err, component.ErrNilNextConsumer) + require.NotNil(t, mockSwReceiver) } func TestStartAndShutdown(t *testing.T) { @@ -58,9 +63,9 @@ func TestStartAndShutdown(t *testing.T) { set := receivertest.NewNopCreateSettings() set.ID = skywalkingReceiver - sr, err := newSkywalkingReceiver(config, sink, set) + sr := newSkywalkingReceiver(config, set) + err := sr.registerTraceConsumer(sink) require.NoError(t, err) - require.NoError(t, sr.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { require.NoError(t, sr.Shutdown(context.Background())) }) @@ -75,20 +80,19 @@ func TestGRPCReception(t *testing.T) { set := receivertest.NewNopCreateSettings() set.ID = skywalkingReceiver - swReceiver, err := newSkywalkingReceiver(config, sink, set) + mockSwReceiver := newSkywalkingReceiver(config, set) + err := mockSwReceiver.registerTraceConsumer(sink) require.NoError(t, err) + require.NoError(t, mockSwReceiver.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, swReceiver.Start(context.Background(), componenttest.NewNopHost())) - - t.Cleanup(func() { require.NoError(t, swReceiver.Shutdown(context.Background())) }) - + t.Cleanup(func() { require.NoError(t, mockSwReceiver.Shutdown(context.Background())) }) conn, err := grpc.Dial(fmt.Sprintf("0.0.0.0:%d", config.CollectorGRPCPort), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) defer conn.Close() segmentCollection := &agent.SegmentCollection{ Segments: []*agent.SegmentObject{ - mockGrpcTraceSegment(1), + MockGrpcTraceSegment(1), }, } @@ -103,7 +107,40 @@ func TestGRPCReception(t *testing.T) { assert.NotNil(t, commands) } -func mockGrpcTraceSegment(sequence int) *agent.SegmentObject { +func TestHttpReception(t *testing.T) { + config := &configuration{ + CollectorGRPCPort: 12800, // that's the only one used by this test + } + + sink := new(consumertest.TracesSink) + + set := receivertest.NewNopCreateSettings() + set.ID = skywalkingReceiver + mockSwReceiver := newSkywalkingReceiver(config, set) + err := mockSwReceiver.registerTraceConsumer(sink) + require.NoError(t, err) + require.NoError(t, mockSwReceiver.Start(context.Background(), componenttest.NewNopHost())) + + t.Cleanup(func() { require.NoError(t, mockSwReceiver.Shutdown(context.Background())) }) + + rb, err := mockSkywalkingHTTPTraceSegment() + require.NoError(t, err) + req, err := http.NewRequest("POST", "http://0.0.0.0:12800/v3/segments", bytes.NewBuffer(rb)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + client := &http.Client{} + response, err := client.Do(req) + // http client send trace data to otel/skywalkingreceiver + if err != nil { + t.Fatalf("cannot send data in sync mode: %v", err) + } + // verify + assert.NoError(t, err, "send skywalking segment successful.") + assert.NotNil(t, response) + +} + +func MockGrpcTraceSegment(sequence int) *agent.SegmentObject { seq := strconv.Itoa(sequence) return &agent.SegmentObject{ TraceId: "trace" + seq, @@ -187,3 +224,9 @@ func mockGrpcTraceSegment(sequence int) *agent.SegmentObject { }, } } + +func mockSkywalkingHTTPTraceSegment() ([]byte, error) { + segment := MockGrpcTraceSegment(1) + tb, err := json.Marshal(segment) + return tb, err +} diff --git a/receiver/skywalkingreceiver/tracing_report_service.go b/receiver/skywalkingreceiver/tracing_report_service.go deleted file mode 100644 index 05ee826bfda5..000000000000 --- a/receiver/skywalkingreceiver/tracing_report_service.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package skywalkingreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver" - -import ( - "context" - "errors" - "fmt" - "io" - - "go.opentelemetry.io/collector/consumer" - "google.golang.org/protobuf/proto" - common "skywalking.apache.org/repo/goapi/collect/common/v3" - agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3" -) - -type traceSegmentReportService struct { - sr *swReceiver - agent.UnimplementedTraceSegmentReportServiceServer -} - -func (s *traceSegmentReportService) Collect(stream agent.TraceSegmentReportService_CollectServer) error { - for { - segmentObject, err := stream.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - return stream.SendAndClose(&common.Commands{}) - } - return err - } - - err = consumeTraces(stream.Context(), segmentObject, s.sr.nextConsumer) - if err != nil { - return stream.SendAndClose(&common.Commands{}) - } - } -} - -func (s *traceSegmentReportService) CollectInSync(ctx context.Context, segments *agent.SegmentCollection) (*common.Commands, error) { - for _, segment := range segments.Segments { - marshaledSegment, err := proto.Marshal(segment) - if err != nil { - fmt.Printf("cannot marshal segemnt from sync, %v", err) - } - err = consumeTraces(ctx, segment, s.sr.nextConsumer) - if err != nil { - fmt.Printf("cannot consume traces, %v", err) - } - fmt.Printf("receivec data:%s", marshaledSegment) - } - return &common.Commands{}, nil -} - -func consumeTraces(ctx context.Context, segment *agent.SegmentObject, consumer consumer.Traces) error { - if segment == nil { - return nil - } - ptd := SkywalkingToTraces(segment) - return consumer.ConsumeTraces(ctx, ptd) -}