diff --git a/exporter/opencensusexporter/factory.go b/exporter/opencensusexporter/factory.go index eb9d82e3da8..da6d45fe8ed 100644 --- a/exporter/opencensusexporter/factory.go +++ b/exporter/opencensusexporter/factory.go @@ -64,6 +64,7 @@ func createTracesExporter(ctx context.Context, params component.ExporterCreatePa exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithRetry(oCfg.RetrySettings), exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithStart(oce.start), exporterhelper.WithShutdown(oce.shutdown)) } @@ -81,5 +82,6 @@ func createMetricsExporter(ctx context.Context, params component.ExporterCreateP exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithRetry(oCfg.RetrySettings), exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithStart(oce.start), exporterhelper.WithShutdown(oce.shutdown)) } diff --git a/exporter/opencensusexporter/factory_test.go b/exporter/opencensusexporter/factory_test.go index e742ecbee06..123d67ac080 100644 --- a/exporter/opencensusexporter/factory_test.go +++ b/exporter/opencensusexporter/factory_test.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configgrpc" @@ -40,9 +41,10 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateTracesExporter(t *testing.T) { endpoint := testutil.GetAvailableLocalAddress(t) tests := []struct { - name string - config Config - mustFail bool + name string + config Config + mustFailOnCreate bool + mustFailOnStart bool }{ { name: "NoEndpoint", @@ -53,7 +55,7 @@ func TestCreateTracesExporter(t *testing.T) { }, NumWorkers: 3, }, - mustFail: true, + mustFailOnCreate: true, }, { name: "ZeroNumWorkers", @@ -67,7 +69,7 @@ func TestCreateTracesExporter(t *testing.T) { }, NumWorkers: 0, }, - mustFail: true, + mustFailOnCreate: true, }, { name: "UseSecure", @@ -132,7 +134,8 @@ func TestCreateTracesExporter(t *testing.T) { }, NumWorkers: 3, }, - mustFail: true, + mustFailOnCreate: false, + mustFailOnStart: true, }, { name: "CaCert", @@ -163,28 +166,35 @@ func TestCreateTracesExporter(t *testing.T) { }, NumWorkers: 3, }, - mustFail: true, + mustFailOnCreate: false, + mustFailOnStart: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { params := component.ExporterCreateParams{Logger: zap.NewNop()} - tReceiver, tErr := createTracesExporter(context.Background(), params, &tt.config) - checkErrorsAndShutdown(t, tReceiver, tErr, tt.mustFail) - mReceiver, mErr := createMetricsExporter(context.Background(), params, &tt.config) - checkErrorsAndShutdown(t, mReceiver, mErr, tt.mustFail) + tExporter, tErr := createTracesExporter(context.Background(), params, &tt.config) + checkErrorsAndStartAndShutdown(t, tExporter, tErr, tt.mustFailOnCreate, tt.mustFailOnStart) + mExporter, mErr := createMetricsExporter(context.Background(), params, &tt.config) + checkErrorsAndStartAndShutdown(t, mExporter, mErr, tt.mustFailOnCreate, tt.mustFailOnStart) }) } } -func checkErrorsAndShutdown(t *testing.T, receiver component.Receiver, err error, mustFail bool) { - if mustFail { +func checkErrorsAndStartAndShutdown(t *testing.T, exporter component.Exporter, err error, mustFailOnCreate, mustFailOnStart bool) { + if mustFailOnCreate { assert.NotNil(t, err) - } else { - assert.NoError(t, err) - assert.NotNil(t, receiver) + return + } + assert.NoError(t, err) + assert.NotNil(t, exporter) - require.NoError(t, receiver.Shutdown(context.Background())) + sErr := exporter.Start(context.Background(), componenttest.NewNopHost()) + if mustFailOnStart { + require.Error(t, sErr) + return } + require.NoError(t, sErr) + require.NoError(t, exporter.Shutdown(context.Background())) } diff --git a/exporter/opencensusexporter/opencensus.go b/exporter/opencensusexporter/opencensus.go index faa92a8961e..6977d2b7d09 100644 --- a/exporter/opencensusexporter/opencensus.go +++ b/exporter/opencensusexporter/opencensus.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/translator/internaldata" ) @@ -57,7 +58,7 @@ type ocExporter struct { metadata metadata.MD } -func newOcExporter(ctx context.Context, cfg *Config) (*ocExporter, error) { +func newOcExporter(_ context.Context, cfg *Config) (*ocExporter, error) { if cfg.Endpoint == "" { return nil, errors.New("OpenCensus exporter cfg requires an Endpoint") } @@ -66,22 +67,46 @@ func newOcExporter(ctx context.Context, cfg *Config) (*ocExporter, error) { return nil, errors.New("OpenCensus exporter cfg requires at least one worker") } - dialOpts, err := cfg.GRPCClientSettings.ToDialOptions() - if err != nil { - return nil, err + oce := &ocExporter{ + cfg: cfg, + metadata: metadata.New(cfg.GRPCClientSettings.Headers), } + return oce, nil +} +// start creates the gRPC client Connection +func (oce *ocExporter) start(ctx context.Context, _ component.Host) error { + dialOpts, err := oce.cfg.GRPCClientSettings.ToDialOptions() + if err != nil { + return err + } var clientConn *grpc.ClientConn - if clientConn, err = grpc.DialContext(ctx, cfg.GRPCClientSettings.Endpoint, dialOpts...); err != nil { - return nil, err + if clientConn, err = grpc.DialContext(ctx, oce.cfg.GRPCClientSettings.Endpoint, dialOpts...); err != nil { + return err } - oce := &ocExporter{ - cfg: cfg, - grpcClientConn: clientConn, - metadata: metadata.New(cfg.GRPCClientSettings.Headers), + oce.grpcClientConn = clientConn + + if oce.tracesClients != nil { + oce.traceSvcClient = agenttracepb.NewTraceServiceClient(oce.grpcClientConn) + // Try to create rpc clients now. + for i := 0; i < oce.cfg.NumWorkers; i++ { + // Populate the channel with NumWorkers nil RPCs to keep the number of workers + // constant in the channel. + oce.tracesClients <- nil + } } - return oce, nil + + if oce.metricsClients != nil { + oce.metricsSvcClient = agentmetricspb.NewMetricsServiceClient(oce.grpcClientConn) + // Try to create rpc clients now. + for i := 0; i < oce.cfg.NumWorkers; i++ { + // Populate the channel with NumWorkers nil RPCs to keep the number of workers + // constant in the channel. + oce.metricsClients <- nil + } + } + return nil } func (oce *ocExporter) shutdown(context.Context) error { @@ -109,14 +134,7 @@ func newTracesExporter(ctx context.Context, cfg *Config) (*ocExporter, error) { if err != nil { return nil, err } - oce.traceSvcClient = agenttracepb.NewTraceServiceClient(oce.grpcClientConn) - oce.tracesClients = make(chan *tracesClientWithCancel, cfg.NumWorkers) - // Try to create rpc clients now. - for i := 0; i < cfg.NumWorkers; i++ { - // Populate the channel with NumWorkers nil RPCs to keep the number of workers - // constant in the channel. - oce.tracesClients <- nil - } + oce.tracesClients = make(chan *tracesClientWithCancel, oce.cfg.NumWorkers) return oce, nil } @@ -125,14 +143,7 @@ func newMetricsExporter(ctx context.Context, cfg *Config) (*ocExporter, error) { if err != nil { return nil, err } - oce.metricsSvcClient = agentmetricspb.NewMetricsServiceClient(oce.grpcClientConn) - oce.metricsClients = make(chan *metricsClientWithCancel, cfg.NumWorkers) - // Try to create rpc clients now. - for i := 0; i < cfg.NumWorkers; i++ { - // Populate the channel with NumWorkers nil RPCs to keep the number of workers - // constant in the channel. - oce.metricsClients <- nil - } + oce.metricsClients = make(chan *metricsClientWithCancel, oce.cfg.NumWorkers) return oce, nil }