Skip to content

Commit

Permalink
ExporterHelper: Add ability to configure start function and remove du…
Browse files Browse the repository at this point in the history
…plicate code (#1337)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jul 14, 2020
1 parent 08e53d4 commit 68c4e05
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 127 deletions.
60 changes: 47 additions & 13 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,70 @@ import (
"context"

"go.opencensus.io/trace"

"go.opentelemetry.io/collector/component"
)

var (
okStatus = trace.Status{Code: trace.StatusCodeOK}
)

// Start specifies the function invoked when the exporter is being started.
type Start func(context.Context, component.Host) error

// Shutdown specifies the function invoked when the exporter is being shutdown.
type Shutdown func(context.Context) error

// ExporterOptions contains options concerning how an Exporter is configured.
type ExporterOptions struct {
shutdown Shutdown
}

// ExporterOption apply changes to ExporterOptions.
type ExporterOption func(*ExporterOptions)
// ExporterOption apply changes to internalOptions.
type ExporterOption func(*baseExporter)

// WithShutdown overrides the default Shutdown function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown Shutdown) ExporterOption {
return func(o *ExporterOptions) {
return func(o *baseExporter) {
o.shutdown = shutdown
}
}

// Construct the ExporterOptions from multiple ExporterOption.
func newExporterOptions(options ...ExporterOption) ExporterOptions {
var opts ExporterOptions
// WithStart overrides the default Start function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithStart(start Start) ExporterOption {
return func(o *baseExporter) {
o.start = start
}
}

// internalOptions contains internalOptions concerning how an Exporter is configured.
type baseExporter struct {
exporterFullName string
start Start
shutdown Shutdown
}

// Construct the internalOptions from multiple ExporterOption.
func newBaseExporter(exporterFullName string, options ...ExporterOption) baseExporter {
be := baseExporter{
exporterFullName: exporterFullName,
}

for _, op := range options {
op(&opts)
op(&be)
}

return be
}

func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
if be.start != nil {
return be.start(ctx, host)
}
return nil
}

// Shutdown stops the exporter and is invoked during shutdown.
func (be *baseExporter) Shutdown(ctx context.Context) error {
if be.shutdown != nil {
return be.shutdown(ctx)
}
return opts
return nil
}
19 changes: 19 additions & 0 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,37 @@
package exporterhelper

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"
"go.opencensus.io/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
)

func TestErrorToStatus(t *testing.T) {
require.Equal(t, okStatus, errToStatus(nil))
require.Equal(t, trace.Status{Code: trace.StatusCodeUnknown, Message: "my_error"}, errToStatus(errors.New("my_error")))
}

func TestBaseExporter(t *testing.T) {
be := newBaseExporter("test")
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
}

func TestBaseExporterWithOptions(t *testing.T) {
be := newBaseExporter(
"test",
WithStart(func(ctx context.Context, host component.Host) error { return errors.New("my error") }),
WithShutdown(func(ctx context.Context) error { return errors.New("my error") }))
require.Error(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.Error(t, be.Shutdown(context.Background()))
}

func errToStatus(err error) trace.Status {
if err != nil {
return trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}
Expand Down
26 changes: 4 additions & 22 deletions exporter/exporterhelper/logshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,8 @@ import (
type PushLogsData func(ctx context.Context, md data.Logs) (droppedTimeSeries int, err error)

type logsExporter struct {
exporterFullName string
pushLogsData PushLogsData
shutdown Shutdown
}

func (me *logsExporter) Start(ctx context.Context, host component.Host) error {
return nil
baseExporter
pushLogsData PushLogsData
}

func (me *logsExporter) ConsumeLogs(ctx context.Context, md data.Logs) error {
Expand All @@ -43,11 +38,6 @@ func (me *logsExporter) ConsumeLogs(ctx context.Context, md data.Logs) error {
return err
}

// Shutdown stops the exporter and is invoked during shutdown.
func (me *logsExporter) Shutdown(ctx context.Context) error {
return me.shutdown(ctx)
}

// NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span.
// TODO: Add support for retries.
func NewLogsExporter(config configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) {
Expand All @@ -59,19 +49,11 @@ func NewLogsExporter(config configmodels.Exporter, pushLogsData PushLogsData, op
return nil, errNilPushLogsData
}

opts := newExporterOptions(options...)

pushLogsData = pushLogsWithObservability(pushLogsData, config.Name())

// The default shutdown method always returns nil.
if opts.shutdown == nil {
opts.shutdown = func(context.Context) error { return nil }
}

return &logsExporter{
exporterFullName: config.Name(),
pushLogsData: pushLogsData,
shutdown: opts.shutdown,
baseExporter: newBaseExporter(config.Name(), options...),
pushLogsData: pushLogsData,
}, nil
}

Expand Down
56 changes: 10 additions & 46 deletions exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@ import (
type PushMetricsDataOld func(ctx context.Context, td consumerdata.MetricsData) (droppedTimeSeries int, err error)

type metricsExporterOld struct {
exporterFullName string
pushMetricsData PushMetricsDataOld
shutdown Shutdown
}

func (me *metricsExporterOld) Start(ctx context.Context, host component.Host) error {
return nil
baseExporter
pushMetricsData PushMetricsDataOld
}

func (me *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
Expand All @@ -45,13 +40,8 @@ func (me *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consume
return err
}

// Shutdown stops the exporter and is invoked during shutdown.
func (me *metricsExporterOld) Shutdown(ctx context.Context) error {
return me.shutdown(ctx)
}

// NewMetricsExporterOld creates an MetricsExporter that can record metrics and can wrap every request with a Span.
// If no options are passed it just adds the exporter format as a tag in the Context.
// If no internalOptions are passed it just adds the exporter format as a tag in the Context.
// TODO: Add support for retries.
func NewMetricsExporterOld(config configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) {
if config == nil {
Expand All @@ -62,19 +52,11 @@ func NewMetricsExporterOld(config configmodels.Exporter, pushMetricsData PushMet
return nil, errNilPushMetricsData
}

opts := newExporterOptions(options...)

pushMetricsData = pushMetricsWithObservabilityOld(pushMetricsData, config.Name())

// The default shutdown method always returns nil.
if opts.shutdown == nil {
opts.shutdown = func(context.Context) error { return nil }
}

return &metricsExporterOld{
exporterFullName: config.Name(),
pushMetricsData: pushMetricsData,
shutdown: opts.shutdown,
baseExporter: newBaseExporter(config.Name(), options...),
pushMetricsData: pushMetricsData,
}, nil
}

Expand Down Expand Up @@ -107,13 +89,8 @@ func NumTimeSeries(md consumerdata.MetricsData) int {
type PushMetricsData func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error)

type metricsExporter struct {
exporterFullName string
pushMetricsData PushMetricsData
shutdown Shutdown
}

func (me *metricsExporter) Start(ctx context.Context, host component.Host) error {
return nil
baseExporter
pushMetricsData PushMetricsData
}

func (me *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
Expand All @@ -122,13 +99,8 @@ func (me *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics)
return err
}

// Shutdown stops the exporter and is invoked during shutdown.
func (me *metricsExporter) Shutdown(ctx context.Context) error {
return me.shutdown(ctx)
}

// NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span.
// If no options are passed it just adds the exporter format as a tag in the Context.
// If no internalOptions are passed it just adds the exporter format as a tag in the Context.
// TODO: Add support for retries.
func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) {
if config == nil {
Expand All @@ -139,19 +111,11 @@ func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetric
return nil, errNilPushMetricsData
}

opts := newExporterOptions(options...)

pushMetricsData = pushMetricsWithObservability(pushMetricsData, config.Name())

// The default shutdown method always returns nil.
if opts.shutdown == nil {
opts.shutdown = func(context.Context) error { return nil }
}

return &metricsExporter{
exporterFullName: config.Name(),
pushMetricsData: pushMetricsData,
shutdown: opts.shutdown,
baseExporter: newBaseExporter(config.Name(), options...),
pushMetricsData: pushMetricsData,
}, nil
}

Expand Down
56 changes: 10 additions & 46 deletions exporter/exporterhelper/tracehelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,10 @@ type traceDataPusherOld func(ctx context.Context, td consumerdata.TraceData) (dr
// returns the number of dropped spans.
type traceDataPusher func(ctx context.Context, td pdata.Traces) (droppedSpans int, err error)

// traceExporterOld implements the exporter with additional helper options.
// traceExporterOld implements the exporter with additional helper internalOptions.
type traceExporterOld struct {
exporterFullName string
dataPusher traceDataPusherOld
shutdown Shutdown
}

func (te *traceExporterOld) Start(_ context.Context, _ component.Host) error {
return nil
baseExporter
dataPusher traceDataPusherOld
}

func (te *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
Expand All @@ -49,13 +44,8 @@ func (te *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerdat
return err
}

// Shutdown stops the exporter and is invoked during shutdown.
func (te *traceExporterOld) Shutdown(ctx context.Context) error {
return te.shutdown(ctx)
}

// NewTraceExporterOld creates an TraceExporterOld that can record metrics and can wrap every
// request with a Span. If no options are passed it just adds the exporter format as a
// request with a Span. If no internalOptions are passed it just adds the exporter format as a
// tag in the Context.
func NewTraceExporterOld(
config configmodels.Exporter,
Expand All @@ -71,19 +61,11 @@ func NewTraceExporterOld(
return nil, errNilPushTraceData
}

opts := newExporterOptions(options...)

dataPusher = dataPusher.withObservability(config.Name())

// The default shutdown function does nothing.
if opts.shutdown == nil {
opts.shutdown = func(context.Context) error { return nil }
}

return &traceExporterOld{
exporterFullName: config.Name(),
dataPusher: dataPusher,
shutdown: opts.shutdown,
baseExporter: newBaseExporter(config.Name(), options...),
dataPusher: dataPusher,
}, nil
}

Expand All @@ -105,13 +87,8 @@ func (p traceDataPusherOld) withObservability(exporterName string) traceDataPush
}

type traceExporter struct {
exporterFullName string
dataPusher traceDataPusher
shutdown Shutdown
}

func (te *traceExporter) Start(_ context.Context, _ component.Host) error {
return nil
baseExporter
dataPusher traceDataPusher
}

func (te *traceExporter) ConsumeTraces(
Expand All @@ -123,11 +100,6 @@ func (te *traceExporter) ConsumeTraces(
return err
}

// Shutdown stops the exporter and is invoked during shutdown.
func (te *traceExporter) Shutdown(ctx context.Context) error {
return te.shutdown(ctx)
}

// NewTraceExporter creates a TraceExporter that can record metrics and can wrap
// every request with a Span.
func NewTraceExporter(
Expand All @@ -144,19 +116,11 @@ func NewTraceExporter(
return nil, errNilPushTraceData
}

opts := newExporterOptions(options...)

dataPusher = dataPusher.withObservability(config.Name())

// The default shutdown function does nothing.
if opts.shutdown == nil {
opts.shutdown = func(context.Context) error { return nil }
}

return &traceExporter{
exporterFullName: config.Name(),
dataPusher: dataPusher,
shutdown: opts.shutdown,
baseExporter: newBaseExporter(config.Name(), options...),
dataPusher: dataPusher,
}, nil
}

Expand Down

0 comments on commit 68c4e05

Please sign in to comment.