Skip to content

Commit

Permalink
Use default configs for testbed receivers
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jun 26, 2020
1 parent 41dc4db commit a6ca6b8
Showing 1 changed file with 64 additions and 39 deletions.
103 changes: 64 additions & 39 deletions testbed/testbed/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configprotocol"
"go.opentelemetry.io/collector/receiver/jaegerreceiver"
"go.opentelemetry.io/collector/receiver/opencensusreceiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
Expand All @@ -36,7 +37,7 @@ import (
// an exporter.
type DataReceiver interface {
Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error
Stop()
Stop() error

// Generate a config string to place in exporter part of collector config
// so that it can send data to this receiver.
Expand All @@ -59,7 +60,7 @@ func (mb *DataReceiverBase) ReportFatalError(err error) {
}

// GetFactory of the specified kind. Returns the factory for a component type.
func (mb *DataReceiverBase) GetFactory(kind component.Kind, componentType configmodels.Type) component.Factory {
func (mb *DataReceiverBase) GetFactory(_ component.Kind, _ configmodels.Type) component.Factory {
return nil
}

Expand All @@ -75,7 +76,8 @@ func (mb *DataReceiverBase) GetExporters() map[configmodels.DataType]map[configm
// OCDataReceiver implements OpenCensus format receiver.
type OCDataReceiver struct {
DataReceiverBase
receiver *opencensusreceiver.Receiver
traceReceiver component.TraceReceiver
metricsReceiver component.MetricsReceiver
}

// Ensure OCDataReceiver implements DataReceiver.
Expand All @@ -90,18 +92,31 @@ func NewOCDataReceiver(port int) *OCDataReceiver {
}

func (or *OCDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error {
addr := fmt.Sprintf("localhost:%d", or.Port)
factory := opencensusreceiver.Factory{}
cfg := factory.CreateDefaultConfig().(*opencensusreceiver.Config)
cfg.SetName(or.ProtocolName())
cfg.Endpoint = fmt.Sprintf("localhost:%d", or.Port)
var err error
or.receiver, err = opencensusreceiver.New("opencensus", "tcp", addr, tc, mc)
if err != nil {
if or.traceReceiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, tc); err != nil {
return err
}

return or.receiver.Start(context.Background(), or)
if or.metricsReceiver, err = factory.CreateMetricsReceiver(zap.NewNop(), cfg, mc); err != nil {
return err
}
if err = or.traceReceiver.Start(context.Background(), or); err != nil {
return err
}
return or.metricsReceiver.Start(context.Background(), or)
}

func (or *OCDataReceiver) Stop() {
or.receiver.Shutdown(context.Background())
func (or *OCDataReceiver) Stop() error {
if err := or.traceReceiver.Shutdown(context.Background()); err != nil {
return err
}
if err := or.metricsReceiver.Shutdown(context.Background()); err != nil {
return err
}
return nil
}

func (or *OCDataReceiver) GenConfigYAMLStr() string {
Expand All @@ -128,26 +143,25 @@ func NewJaegerDataReceiver(port int) *JaegerDataReceiver {
return &JaegerDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}}
}

func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error {
jaegerCfg := jaegerreceiver.Configuration{
CollectorGRPCPort: jr.Port,
func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error {
factory := jaegerreceiver.Factory{}
cfg := factory.CreateDefaultConfig().(*jaegerreceiver.Config)
cfg.SetName(jr.ProtocolName())
cfg.Protocols["grpc"] = &configprotocol.ProtocolServerSettings{
Endpoint: fmt.Sprintf("localhost:%d", jr.Port),
}
var err error
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
jr.receiver, err = jaegerreceiver.New("jaeger", &jaegerCfg, tc, params)
jr.receiver, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc)
if err != nil {
return err
}

return jr.receiver.Start(context.Background(), jr)
}

func (jr *JaegerDataReceiver) Stop() {
if jr.receiver != nil {
if err := jr.receiver.Shutdown(context.Background()); err != nil {
log.Printf("Cannot stop Jaeger receiver: %s", err.Error())
}
}
func (jr *JaegerDataReceiver) Stop() error {
return jr.receiver.Shutdown(context.Background())
}

func (jr *JaegerDataReceiver) GenConfigYAMLStr() string {
Expand All @@ -165,7 +179,8 @@ func (jr *JaegerDataReceiver) ProtocolName() string {
// OTLPDataReceiver implements OTLP format receiver.
type OTLPDataReceiver struct {
DataReceiverBase
receiver *otlpreceiver.Receiver
traceReceiver component.TraceReceiver
metricsReceiver component.MetricsReceiver
}

// Ensure OTLPDataReceiver implements DataReceiver.
Expand All @@ -180,18 +195,32 @@ func NewOTLPDataReceiver(port int) *OTLPDataReceiver {
}

func (or *OTLPDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error {
addr := fmt.Sprintf("localhost:%d", or.Port)
factory := otlpreceiver.Factory{}
cfg := factory.CreateDefaultConfig().(*otlpreceiver.Config)
cfg.SetName(or.ProtocolName())
cfg.Endpoint = fmt.Sprintf("localhost:%d", or.Port)
var err error
or.receiver, err = otlpreceiver.New("otlp", "tcp", addr, tc, mc)
if err != nil {
params := component.ReceiverCreateParams{Logger: zap.NewNop()}
if or.traceReceiver, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc); err != nil {
return err
}

return or.receiver.Start(context.Background(), or)
if or.metricsReceiver, err = factory.CreateMetricsReceiver(context.Background(), params, cfg, mc); err != nil {
return err
}
if err = or.traceReceiver.Start(context.Background(), or); err != nil {
return err
}
return or.metricsReceiver.Start(context.Background(), or)
}

func (or *OTLPDataReceiver) Stop() {
or.receiver.Shutdown(context.Background())
func (or *OTLPDataReceiver) Stop() error {
if err := or.traceReceiver.Shutdown(context.Background()); err != nil {
return err
}
if err := or.metricsReceiver.Shutdown(context.Background()); err != nil {
return err
}
return nil
}

func (or *OTLPDataReceiver) GenConfigYAMLStr() string {
Expand All @@ -209,7 +238,7 @@ func (or *OTLPDataReceiver) ProtocolName() string {
// ZipkinDataReceiver implements Zipkin format receiver.
type ZipkinDataReceiver struct {
DataReceiverBase
receiver *zipkinreceiver.ZipkinReceiver
receiver component.TraceReceiver
}

const DefaultZipkinAddressPort = 9411
Expand All @@ -218,13 +247,13 @@ func NewZipkinDataReceiver(port int) *ZipkinDataReceiver {
return &ZipkinDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}}
}

func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error {
var err error
func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error {
factory := zipkinreceiver.Factory{}
cfg := factory.CreateDefaultConfig().(*zipkinreceiver.Config)
cfg.NameVal = "zipkin"
cfg.SetName(zr.ProtocolName())
cfg.Endpoint = fmt.Sprintf("localhost:%d", zr.Port)
zr.receiver, err = zipkinreceiver.New(cfg, tc)
var err error
zr.receiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, tc)

if err != nil {
return err
Expand All @@ -233,12 +262,8 @@ func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsume
return zr.receiver.Start(context.Background(), zr)
}

func (zr *ZipkinDataReceiver) Stop() {
if zr.receiver != nil {
if err := zr.receiver.Shutdown(context.Background()); err != nil {
log.Printf("Cannot stop Zipkin receiver: %s", err.Error())
}
}
func (zr *ZipkinDataReceiver) Stop() error {
return zr.receiver.Shutdown(context.Background())
}

func (zr *ZipkinDataReceiver) GenConfigYAMLStr() string {
Expand Down

0 comments on commit a6ca6b8

Please sign in to comment.