Skip to content

Commit

Permalink
Refactored OpenCensus exporter to move connection into start for upco…
Browse files Browse the repository at this point in the history
…ming Auth changes. (#3339)

Updated OpenCensus for upcoming client auth extension changes

Link to tracking Issue:
#3287 #3282
  • Loading branch information
pavankrish123 authored May 31, 2021
1 parent 5ae3213 commit d1016eb
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 44 deletions.
2 changes: 2 additions & 0 deletions exporter/opencensusexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func createTracesExporter(ctx context.Context, params component.ExporterCreatePa
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown))
}

Expand All @@ -81,5 +82,6 @@ func createMetricsExporter(ctx context.Context, params component.ExporterCreateP
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown))
}
44 changes: 27 additions & 17 deletions exporter/opencensusexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configgrpc"
Expand All @@ -40,9 +41,10 @@ func TestCreateDefaultConfig(t *testing.T) {
func TestCreateTracesExporter(t *testing.T) {
endpoint := testutil.GetAvailableLocalAddress(t)
tests := []struct {
name string
config Config
mustFail bool
name string
config Config
mustFailOnCreate bool
mustFailOnStart bool
}{
{
name: "NoEndpoint",
Expand All @@ -53,7 +55,7 @@ func TestCreateTracesExporter(t *testing.T) {
},
NumWorkers: 3,
},
mustFail: true,
mustFailOnCreate: true,
},
{
name: "ZeroNumWorkers",
Expand All @@ -67,7 +69,7 @@ func TestCreateTracesExporter(t *testing.T) {
},
NumWorkers: 0,
},
mustFail: true,
mustFailOnCreate: true,
},
{
name: "UseSecure",
Expand Down Expand Up @@ -132,7 +134,8 @@ func TestCreateTracesExporter(t *testing.T) {
},
NumWorkers: 3,
},
mustFail: true,
mustFailOnCreate: false,
mustFailOnStart: true,
},
{
name: "CaCert",
Expand Down Expand Up @@ -163,28 +166,35 @@ func TestCreateTracesExporter(t *testing.T) {
},
NumWorkers: 3,
},
mustFail: true,
mustFailOnCreate: false,
mustFailOnStart: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
params := component.ExporterCreateParams{Logger: zap.NewNop()}
tReceiver, tErr := createTracesExporter(context.Background(), params, &tt.config)
checkErrorsAndShutdown(t, tReceiver, tErr, tt.mustFail)
mReceiver, mErr := createMetricsExporter(context.Background(), params, &tt.config)
checkErrorsAndShutdown(t, mReceiver, mErr, tt.mustFail)
tExporter, tErr := createTracesExporter(context.Background(), params, &tt.config)
checkErrorsAndStartAndShutdown(t, tExporter, tErr, tt.mustFailOnCreate, tt.mustFailOnStart)
mExporter, mErr := createMetricsExporter(context.Background(), params, &tt.config)
checkErrorsAndStartAndShutdown(t, mExporter, mErr, tt.mustFailOnCreate, tt.mustFailOnStart)
})
}
}

func checkErrorsAndShutdown(t *testing.T, receiver component.Receiver, err error, mustFail bool) {
if mustFail {
func checkErrorsAndStartAndShutdown(t *testing.T, exporter component.Exporter, err error, mustFailOnCreate, mustFailOnStart bool) {
if mustFailOnCreate {
assert.NotNil(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, receiver)
return
}
assert.NoError(t, err)
assert.NotNil(t, exporter)

require.NoError(t, receiver.Shutdown(context.Background()))
sErr := exporter.Start(context.Background(), componenttest.NewNopHost())
if mustFailOnStart {
require.Error(t, sErr)
return
}
require.NoError(t, sErr)
require.NoError(t, exporter.Shutdown(context.Background()))
}
65 changes: 38 additions & 27 deletions exporter/opencensusexporter/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/internaldata"
)
Expand Down Expand Up @@ -57,7 +58,7 @@ type ocExporter struct {
metadata metadata.MD
}

func newOcExporter(ctx context.Context, cfg *Config) (*ocExporter, error) {
func newOcExporter(_ context.Context, cfg *Config) (*ocExporter, error) {
if cfg.Endpoint == "" {
return nil, errors.New("OpenCensus exporter cfg requires an Endpoint")
}
Expand All @@ -66,22 +67,46 @@ func newOcExporter(ctx context.Context, cfg *Config) (*ocExporter, error) {
return nil, errors.New("OpenCensus exporter cfg requires at least one worker")
}

dialOpts, err := cfg.GRPCClientSettings.ToDialOptions()
if err != nil {
return nil, err
oce := &ocExporter{
cfg: cfg,
metadata: metadata.New(cfg.GRPCClientSettings.Headers),
}
return oce, nil
}

// start creates the gRPC client Connection
func (oce *ocExporter) start(ctx context.Context, _ component.Host) error {
dialOpts, err := oce.cfg.GRPCClientSettings.ToDialOptions()
if err != nil {
return err
}
var clientConn *grpc.ClientConn
if clientConn, err = grpc.DialContext(ctx, cfg.GRPCClientSettings.Endpoint, dialOpts...); err != nil {
return nil, err
if clientConn, err = grpc.DialContext(ctx, oce.cfg.GRPCClientSettings.Endpoint, dialOpts...); err != nil {
return err
}

oce := &ocExporter{
cfg: cfg,
grpcClientConn: clientConn,
metadata: metadata.New(cfg.GRPCClientSettings.Headers),
oce.grpcClientConn = clientConn

if oce.tracesClients != nil {
oce.traceSvcClient = agenttracepb.NewTraceServiceClient(oce.grpcClientConn)
// Try to create rpc clients now.
for i := 0; i < oce.cfg.NumWorkers; i++ {
// Populate the channel with NumWorkers nil RPCs to keep the number of workers
// constant in the channel.
oce.tracesClients <- nil
}
}
return oce, nil

if oce.metricsClients != nil {
oce.metricsSvcClient = agentmetricspb.NewMetricsServiceClient(oce.grpcClientConn)
// Try to create rpc clients now.
for i := 0; i < oce.cfg.NumWorkers; i++ {
// Populate the channel with NumWorkers nil RPCs to keep the number of workers
// constant in the channel.
oce.metricsClients <- nil
}
}
return nil
}

func (oce *ocExporter) shutdown(context.Context) error {
Expand Down Expand Up @@ -109,14 +134,7 @@ func newTracesExporter(ctx context.Context, cfg *Config) (*ocExporter, error) {
if err != nil {
return nil, err
}
oce.traceSvcClient = agenttracepb.NewTraceServiceClient(oce.grpcClientConn)
oce.tracesClients = make(chan *tracesClientWithCancel, cfg.NumWorkers)
// Try to create rpc clients now.
for i := 0; i < cfg.NumWorkers; i++ {
// Populate the channel with NumWorkers nil RPCs to keep the number of workers
// constant in the channel.
oce.tracesClients <- nil
}
oce.tracesClients = make(chan *tracesClientWithCancel, oce.cfg.NumWorkers)
return oce, nil
}

Expand All @@ -125,14 +143,7 @@ func newMetricsExporter(ctx context.Context, cfg *Config) (*ocExporter, error) {
if err != nil {
return nil, err
}
oce.metricsSvcClient = agentmetricspb.NewMetricsServiceClient(oce.grpcClientConn)
oce.metricsClients = make(chan *metricsClientWithCancel, cfg.NumWorkers)
// Try to create rpc clients now.
for i := 0; i < cfg.NumWorkers; i++ {
// Populate the channel with NumWorkers nil RPCs to keep the number of workers
// constant in the channel.
oce.metricsClients <- nil
}
oce.metricsClients = make(chan *metricsClientWithCancel, oce.cfg.NumWorkers)
return oce, nil
}

Expand Down

0 comments on commit d1016eb

Please sign in to comment.