diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index dd0136354e9..d84a4fcdfa9 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/receiver/jaegerreceiver" "go.opentelemetry.io/collector/receiver/opencensusreceiver" "go.opentelemetry.io/collector/receiver/otlpreceiver" @@ -36,7 +37,7 @@ import ( // an exporter. type DataReceiver interface { Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error - Stop() + Stop() error // Generate a config string to place in exporter part of collector config // so that it can send data to this receiver. @@ -59,7 +60,7 @@ func (mb *DataReceiverBase) ReportFatalError(err error) { } // GetFactory of the specified kind. Returns the factory for a component type. -func (mb *DataReceiverBase) GetFactory(kind component.Kind, componentType configmodels.Type) component.Factory { +func (mb *DataReceiverBase) GetFactory(_ component.Kind, _ configmodels.Type) component.Factory { return nil } @@ -75,7 +76,7 @@ func (mb *DataReceiverBase) GetExporters() map[configmodels.DataType]map[configm // OCDataReceiver implements OpenCensus format receiver. type OCDataReceiver struct { DataReceiverBase - receiver *opencensusreceiver.Receiver + receiver component.Receiver } // Ensure OCDataReceiver implements DataReceiver. @@ -89,10 +90,13 @@ func NewOCDataReceiver(port int) *OCDataReceiver { return &OCDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}} } -func (or *OCDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error { - addr := fmt.Sprintf("localhost:%d", or.Port) +func (or *OCDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error { + factory := opencensusreceiver.Factory{} + cfg := factory.CreateDefaultConfig().(*opencensusreceiver.Config) + cfg.SetName("opencensus") + cfg.Endpoint = fmt.Sprintf("localhost:%d", or.Port) var err error - or.receiver, err = opencensusreceiver.New("opencensus", "tcp", addr, tc, mc) + or.receiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, tc) if err != nil { return err } @@ -100,8 +104,8 @@ func (or *OCDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) e return or.receiver.Start(context.Background(), or) } -func (or *OCDataReceiver) Stop() { - or.receiver.Shutdown(context.Background()) +func (or *OCDataReceiver) Stop() error { + return or.receiver.Shutdown(context.Background()) } func (or *OCDataReceiver) GenConfigYAMLStr() string { @@ -128,13 +132,16 @@ func NewJaegerDataReceiver(port int) *JaegerDataReceiver { return &JaegerDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}} } -func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error { - jaegerCfg := jaegerreceiver.Configuration{ - CollectorGRPCPort: jr.Port, +func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error { + factory := jaegerreceiver.Factory{} + cfg := factory.CreateDefaultConfig().(*jaegerreceiver.Config) + cfg.SetName("jaeger") + cfg.Protocols["grpc"] = &configprotocol.ProtocolServerSettings{ + Endpoint: fmt.Sprintf("localhost:%d", jr.Port), } var err error params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr.receiver, err = jaegerreceiver.New("jaeger", &jaegerCfg, tc, params) + jr.receiver, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc) if err != nil { return err } @@ -142,12 +149,8 @@ func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsume return jr.receiver.Start(context.Background(), jr) } -func (jr *JaegerDataReceiver) Stop() { - if jr.receiver != nil { - if err := jr.receiver.Shutdown(context.Background()); err != nil { - log.Printf("Cannot stop Jaeger receiver: %s", err.Error()) - } - } +func (jr *JaegerDataReceiver) Stop() error { + return jr.receiver.Shutdown(context.Background()) } func (jr *JaegerDataReceiver) GenConfigYAMLStr() string { @@ -165,7 +168,7 @@ func (jr *JaegerDataReceiver) ProtocolName() string { // OTLPDataReceiver implements OTLP format receiver. type OTLPDataReceiver struct { DataReceiverBase - receiver *otlpreceiver.Receiver + receiver component.Receiver } // Ensure OTLPDataReceiver implements DataReceiver. @@ -179,10 +182,14 @@ func NewOTLPDataReceiver(port int) *OTLPDataReceiver { return &OTLPDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}} } -func (or *OTLPDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error { - addr := fmt.Sprintf("localhost:%d", or.Port) +func (or *OTLPDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error { + factory := otlpreceiver.Factory{} + cfg := factory.CreateDefaultConfig().(*otlpreceiver.Config) + cfg.SetName("otlp") + cfg.Endpoint = fmt.Sprintf("localhost:%d", or.Port) var err error - or.receiver, err = otlpreceiver.New("otlp", "tcp", addr, tc, mc) + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + or.receiver, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc) if err != nil { return err } @@ -190,8 +197,8 @@ func (or *OTLPDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) return or.receiver.Start(context.Background(), or) } -func (or *OTLPDataReceiver) Stop() { - or.receiver.Shutdown(context.Background()) +func (or *OTLPDataReceiver) Stop() error { + return or.receiver.Shutdown(context.Background()) } func (or *OTLPDataReceiver) GenConfigYAMLStr() string { @@ -209,7 +216,7 @@ func (or *OTLPDataReceiver) ProtocolName() string { // ZipkinDataReceiver implements Zipkin format receiver. type ZipkinDataReceiver struct { DataReceiverBase - receiver *zipkinreceiver.ZipkinReceiver + receiver component.TraceReceiver } const DefaultZipkinAddressPort = 9411 @@ -218,13 +225,13 @@ func NewZipkinDataReceiver(port int) *ZipkinDataReceiver { return &ZipkinDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}} } -func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error { - var err error +func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error { factory := zipkinreceiver.Factory{} cfg := factory.CreateDefaultConfig().(*zipkinreceiver.Config) - cfg.NameVal = "zipkin" + cfg.SetName("zipkin") cfg.Endpoint = fmt.Sprintf("localhost:%d", zr.Port) - zr.receiver, err = zipkinreceiver.New(cfg, tc) + var err error + zr.receiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, tc) if err != nil { return err @@ -233,12 +240,8 @@ func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsume return zr.receiver.Start(context.Background(), zr) } -func (zr *ZipkinDataReceiver) Stop() { - if zr.receiver != nil { - if err := zr.receiver.Shutdown(context.Background()); err != nil { - log.Printf("Cannot stop Zipkin receiver: %s", err.Error()) - } - } +func (zr *ZipkinDataReceiver) Stop() error { + return zr.receiver.Shutdown(context.Background()) } func (zr *ZipkinDataReceiver) GenConfigYAMLStr() string {