Skip to content

Commit

Permalink
Refactor otlphttp exporter for upcoming API chnages (open-telemetry#3289
Browse files Browse the repository at this point in the history
)

* Refactor otlphttp exporter for upcoming API chnages

* addressed review comments
  • Loading branch information
pavankrish123 authored and dashpole committed Jun 14, 2021
1 parent ba0592f commit 0a731c2
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 30 deletions.
3 changes: 3 additions & 0 deletions exporter/otlphttpexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func createTracesExporter(
cfg,
params.Logger,
oce.pushTraceData,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
Expand Down Expand Up @@ -119,6 +120,7 @@ func createMetricsExporter(
cfg,
params.Logger,
oce.pushMetricsData,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
Expand Down Expand Up @@ -146,6 +148,7 @@ func createLogsExporter(
cfg,
params.Logger,
oce.pushLogData,
exporterhelper.WithStart(oce.start),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
Expand Down
37 changes: 22 additions & 15 deletions exporter/otlphttpexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/confighttp"
Expand Down Expand Up @@ -62,9 +63,10 @@ func TestCreateTracesExporter(t *testing.T) {
endpoint := "http://" + testutil.GetAvailableLocalAddress(t)

tests := []struct {
name string
config Config
mustFail bool
name string
config Config
mustFailOnCreate bool
mustFailOnStart bool
}{
{
name: "NoEndpoint",
Expand All @@ -74,7 +76,7 @@ func TestCreateTracesExporter(t *testing.T) {
Endpoint: "",
},
},
mustFail: true,
mustFailOnCreate: true,
},
{
name: "UseSecure",
Expand Down Expand Up @@ -128,7 +130,8 @@ func TestCreateTracesExporter(t *testing.T) {
},
},
},
mustFail: true,
mustFailOnCreate: false,
mustFailOnStart: true,
},
}

Expand All @@ -138,18 +141,22 @@ func TestCreateTracesExporter(t *testing.T) {
creationParams := component.ExporterCreateParams{Logger: zap.NewNop()}
consumer, err := factory.CreateTracesExporter(context.Background(), creationParams, &tt.config)

if tt.mustFail {
if tt.mustFailOnCreate {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, consumer)
return
}
assert.NoError(t, err)
assert.NotNil(t, consumer)
err = consumer.Start(context.Background(), componenttest.NewNopHost())
if tt.mustFailOnStart {
assert.Error(t, err)
}

err = consumer.Shutdown(context.Background())
if err != nil {
// Since the endpoint of OTLP exporter doesn't actually exist,
// exporter may already stop because it cannot connect.
assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
err = consumer.Shutdown(context.Background())
if err != nil {
// Since the endpoint of OTLP exporter doesn't actually exist,
// exporter may already stop because it cannot connect.
assert.Equal(t, err.Error(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}
})
}
Expand Down
29 changes: 18 additions & 11 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer/consumererror"
Expand Down Expand Up @@ -65,24 +66,30 @@ func newExporter(cfg config.Exporter, logger *zap.Logger) (*exporter, error) {
}
}

client, err := oCfg.HTTPClientSettings.ToClient()
// client construction is deferred to start
return &exporter{
config: oCfg,
logger: logger,
}, nil
}

// start actually creates the HTTP client. The client construction is deferred till this point as this
// is the only place we get hold of Extensions which are required to construct auth round tripper.
func (e *exporter) start(_ context.Context, _ component.Host) error {
client, err := e.config.HTTPClientSettings.ToClient()
if err != nil {
return nil, err
return err
}

if oCfg.Compression != "" {
if strings.ToLower(oCfg.Compression) == configgrpc.CompressionGzip {
if e.config.Compression != "" {
if strings.ToLower(e.config.Compression) == configgrpc.CompressionGzip {
client.Transport = middleware.NewCompressRoundTripper(client.Transport)
} else {
return nil, fmt.Errorf("unsupported compression type %q", oCfg.Compression)
return fmt.Errorf("unsupported compression type %q", e.config.Compression)
}
}

return &exporter{
config: oCfg,
client: client,
logger: logger,
}, nil
e.client = client
return nil
}

func (e *exporter) pushTraceData(ctx context.Context, traces pdata.Traces) error {
Expand Down
18 changes: 14 additions & 4 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,15 @@ func TestCompressionOptions(t *testing.T) {
factory := NewFactory()
cfg := createExporterConfig(test.baseURL, factory.CreateDefaultConfig())
cfg.Compression = test.compression
exp, err := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
exp, _ := factory.CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
err := exp.Start(context.Background(), componenttest.NewNopHost())
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})
if test.err {
assert.Error(t, err)
require.Error(t, err)
return
}
require.NoError(t, err)
startAndCleanup(t, exp)

td := testdata.GenerateTracesOneSpan()
assert.NoError(t, exp.ConsumeTraces(context.Background(), td))
Expand Down Expand Up @@ -451,6 +453,14 @@ func TestErrorResponses(t *testing.T) {
exp, err := createTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)

// start the exporter
err = exp.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, exp.Shutdown(context.Background()))
})

// generate traces
traces := pdata.NewTraces()
err = exp.ConsumeTraces(context.Background(), traces)
assert.Error(t, err)
Expand Down

0 comments on commit 0a731c2

Please sign in to comment.