Skip to content

Commit

Permalink
Use default configs for testbed receivers
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jun 26, 2020
1 parent 2fa73ea commit d3caa77
Showing 1 changed file with 38 additions and 35 deletions.
73 changes: 38 additions & 35 deletions testbed/testbed/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configprotocol"
"go.opentelemetry.io/collector/receiver/jaegerreceiver"
"go.opentelemetry.io/collector/receiver/opencensusreceiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
Expand All @@ -36,7 +37,7 @@ import (
// an exporter.
type DataReceiver interface {
Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error
Stop()
Stop() error

// Generate a config string to place in exporter part of collector config
// so that it can send data to this receiver.
Expand All @@ -59,7 +60,7 @@ func (mb *DataReceiverBase) ReportFatalError(err error) {
}

// GetFactory of the specified kind. Returns the factory for a component type.
func (mb *DataReceiverBase) GetFactory(kind component.Kind, componentType configmodels.Type) component.Factory {
func (mb *DataReceiverBase) GetFactory(_ component.Kind, _ configmodels.Type) component.Factory {
return nil
}

Expand All @@ -75,7 +76,7 @@ func (mb *DataReceiverBase) GetExporters() map[configmodels.DataType]map[configm
// OCDataReceiver implements OpenCensus format receiver.
type OCDataReceiver struct {
DataReceiverBase
receiver *opencensusreceiver.Receiver
receiver component.Receiver
}

// Ensure OCDataReceiver implements DataReceiver.
Expand All @@ -89,19 +90,22 @@ func NewOCDataReceiver(port int) *OCDataReceiver {
return &OCDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}}
}

func (or *OCDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error {
addr := fmt.Sprintf("localhost:%d", or.Port)
func (or *OCDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error {
factory := opencensusreceiver.Factory{}
cfg := factory.CreateDefaultConfig().(*opencensusreceiver.Config)
cfg.SetName(or.ProtocolName())
cfg.Endpoint = fmt.Sprintf("localhost:%d", or.Port)
var err error
or.receiver, err = opencensusreceiver.New("opencensus", "tcp", addr, tc, mc)
or.receiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, tc)
if err != nil {
return err
}

return or.receiver.Start(context.Background(), or)
}

func (or *OCDataReceiver) Stop() {
or.receiver.Shutdown(context.Background())
func (or *OCDataReceiver) Stop() error {
return or.receiver.Shutdown(context.Background())
}

func (or *OCDataReceiver) GenConfigYAMLStr() string {
Expand All @@ -128,26 +132,25 @@ func NewJaegerDataReceiver(port int) *JaegerDataReceiver {
return &JaegerDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}}
}

func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error {
jaegerCfg := jaegerreceiver.Configuration{
CollectorGRPCPort: jr.Port,
func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error {
factory := jaegerreceiver.Factory{}
cfg := factory.CreateDefaultConfig().(*jaegerreceiver.Config)
cfg.SetName(jr.ProtocolName())
cfg.Protocols["grpc"] = &configprotocol.ProtocolServerSettings{
Endpoint: fmt.Sprintf("localhost:%d", jr.Port),
}
var err error
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
jr.receiver, err = jaegerreceiver.New("jaeger", &jaegerCfg, tc, params)
jr.receiver, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc)
if err != nil {
return err
}

return jr.receiver.Start(context.Background(), jr)
}

func (jr *JaegerDataReceiver) Stop() {
if jr.receiver != nil {
if err := jr.receiver.Shutdown(context.Background()); err != nil {
log.Printf("Cannot stop Jaeger receiver: %s", err.Error())
}
}
func (jr *JaegerDataReceiver) Stop() error {
return jr.receiver.Shutdown(context.Background())
}

func (jr *JaegerDataReceiver) GenConfigYAMLStr() string {
Expand All @@ -165,7 +168,7 @@ func (jr *JaegerDataReceiver) ProtocolName() string {
// OTLPDataReceiver implements OTLP format receiver.
type OTLPDataReceiver struct {
DataReceiverBase
receiver *otlpreceiver.Receiver
receiver component.Receiver
}

// Ensure OTLPDataReceiver implements DataReceiver.
Expand All @@ -179,19 +182,23 @@ func NewOTLPDataReceiver(port int) *OTLPDataReceiver {
return &OTLPDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}}
}

func (or *OTLPDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error {
addr := fmt.Sprintf("localhost:%d", or.Port)
func (or *OTLPDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error {
factory := otlpreceiver.Factory{}
cfg := factory.CreateDefaultConfig().(*otlpreceiver.Config)
cfg.SetName(or.ProtocolName())
cfg.Endpoint = fmt.Sprintf("localhost:%d", or.Port)
var err error
or.receiver, err = otlpreceiver.New("otlp", "tcp", addr, tc, mc)
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
or.receiver, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc)
if err != nil {
return err
}

return or.receiver.Start(context.Background(), or)
}

func (or *OTLPDataReceiver) Stop() {
or.receiver.Shutdown(context.Background())
func (or *OTLPDataReceiver) Stop() error {
return or.receiver.Shutdown(context.Background())
}

func (or *OTLPDataReceiver) GenConfigYAMLStr() string {
Expand All @@ -209,7 +216,7 @@ func (or *OTLPDataReceiver) ProtocolName() string {
// ZipkinDataReceiver implements Zipkin format receiver.
type ZipkinDataReceiver struct {
DataReceiverBase
receiver *zipkinreceiver.ZipkinReceiver
receiver component.TraceReceiver
}

const DefaultZipkinAddressPort = 9411
Expand All @@ -218,13 +225,13 @@ func NewZipkinDataReceiver(port int) *ZipkinDataReceiver {
return &ZipkinDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}}
}

func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error {
var err error
func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error {
factory := zipkinreceiver.Factory{}
cfg := factory.CreateDefaultConfig().(*zipkinreceiver.Config)
cfg.NameVal = "zipkin"
cfg.SetName(zr.ProtocolName())
cfg.Endpoint = fmt.Sprintf("localhost:%d", zr.Port)
zr.receiver, err = zipkinreceiver.New(cfg, tc)
var err error
zr.receiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, tc)

if err != nil {
return err
Expand All @@ -233,12 +240,8 @@ func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsume
return zr.receiver.Start(context.Background(), zr)
}

func (zr *ZipkinDataReceiver) Stop() {
if zr.receiver != nil {
if err := zr.receiver.Shutdown(context.Background()); err != nil {
log.Printf("Cannot stop Zipkin receiver: %s", err.Error())
}
}
func (zr *ZipkinDataReceiver) Stop() error {
return zr.receiver.Shutdown(context.Background())
}

func (zr *ZipkinDataReceiver) GenConfigYAMLStr() string {
Expand Down

0 comments on commit d3caa77

Please sign in to comment.