Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] [exporterhelper] Remove duplicated code between exporter helpers #9603

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueSettings) Option {
return func(o *baseExporter) {
if o.requestExporter {
if o.marshaler == nil || o.unmarshaler == nil {
panic("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
if !config.Enabled {
Expand Down Expand Up @@ -114,6 +114,9 @@ func WithQueue(config QueueSettings) Option {
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option {
return func(o *baseExporter) {
if o.marshaler != nil || o.unmarshaler != nil {
panic("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead")
}
if !cfg.Enabled {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return
Expand All @@ -135,15 +138,30 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
}
}

// withMarshaler is used to set the request marshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func withMarshaler(marshaler exporterqueue.Marshaler[Request]) Option {
return func(o *baseExporter) {
o.marshaler = marshaler
}
}

// withUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func withUnmarshaler(unmarshaler exporterqueue.Unmarshaler[Request]) Option {
return func(o *baseExporter) {
o.unmarshaler = unmarshaler
}
}

// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.StartFunc
component.ShutdownFunc

requestExporter bool
marshaler exporterqueue.Marshaler[Request]
unmarshaler exporterqueue.Unmarshaler[Request]
signal component.DataType
marshaler exporterqueue.Marshaler[Request]
unmarshaler exporterqueue.Unmarshaler[Request]
signal component.DataType

set exporter.CreateSettings
obsrep *ObsReport
Expand All @@ -162,20 +180,14 @@ type baseExporter struct {
consumerOptions []consumer.Option
}

// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones.
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool,
marshaler exporterqueue.Marshaler[Request], unmarshaler exporterqueue.Unmarshaler[Request], osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

func newBaseExporter(set exporter.CreateSettings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
if err != nil {
return nil, err
}

be := &baseExporter{
requestExporter: requestExporter,
marshaler: marshaler,
unmarshaler: unmarshaler,
signal: signal,
signal: signal,

queueSender: &baseRequestSender{},
obsrepSender: osf(obsReport),
Expand Down
27 changes: 15 additions & 12 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
)

Expand All @@ -36,11 +37,7 @@ func newNoopObsrepSender(*ObsReport) requestSender {
}

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, defaultType, false, nil, nil, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
be, err = newBaseExporter(defaultSettings, defaultType, true, nil, nil, newNoopObsrepSender)
be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -49,7 +46,7 @@ func TestBaseExporter(t *testing.T) {
func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be, err := newBaseExporter(
defaultSettings, defaultType, false, nil, nil, newNoopObsrepSender,
defaultSettings, defaultType, newNoopObsrepSender,
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings()),
Expand All @@ -68,15 +65,22 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
}
}

func TestQueueRetryOptionsWithRequestExporter(t *testing.T) {
bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, true, nil, nil, newNoopObsrepSender,
func TestQueueOptionsWithRequestExporter(t *testing.T) {
bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.Nil(t, err)
require.True(t, bs.requestExporter)
require.Nil(t, bs.marshaler)
require.Nil(t, bs.unmarshaler)
require.Panics(t, func() {
_, _ = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, true, nil, nil, newNoopObsrepSender,
_, _ = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueSettings()))
})
require.Panics(t, func() {
_, _ = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender,
withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))
})
}

func TestBaseExporterLogging(t *testing.T) {
Expand All @@ -85,9 +89,8 @@ func TestBaseExporterLogging(t *testing.T) {
set.Logger = zap.New(logger)
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
bs, err := newBaseExporter(set, defaultType, true, nil, nil, newNoopObsrepSender, WithRetry(rCfg))
bs, err := newBaseExporter(set, defaultType, newNoopObsrepSender, WithRetry(rCfg))
require.Nil(t, err)
require.True(t, bs.requestExporter)
sendErr := bs.send(context.Background(), newErrorRequest())
require.Error(t, sendErr)

Expand Down
38 changes: 11 additions & 27 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type logsExporter struct {

// NewLogsExporter creates an exporter.Logs that records observability metrics and wraps every request with a Span.
func NewLogsExporter(
_ context.Context,
ctx context.Context,
set exporter.CreateSettings,
cfg component.Config,
pusher consumer.ConsumeLogsFunc,
Expand All @@ -79,41 +79,25 @@ func NewLogsExporter(
if cfg == nil {
return nil, errNilConfig
}

if set.Logger == nil {
return nil, errNilLogger
}

if pusher == nil {
return nil, errNilPushLogsData
}

be, err := newBaseExporter(set, component.DataTypeLogs, false, logsRequestMarshaler,
newLogsRequestUnmarshalerFunc(pusher), newLogsExporterWithObservability, options...)
if err != nil {
return nil, err
}

lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
req := newLogsRequest(ld, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, queue.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
}
return serr
}, be.consumerOptions...)

return &logsExporter{
baseExporter: be,
Logs: lc,
}, err
logsOpts := []Option{withMarshaler(logsRequestMarshaler), withUnmarshaler(newLogsRequestUnmarshalerFunc(pusher))}
return NewLogsRequestExporter(ctx, set, requestFromLogs(pusher), append(logsOpts, options...)...)
}

// RequestFromLogsFunc converts plog.Logs data into a user-defined request.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type RequestFromLogsFunc func(context.Context, plog.Logs) (Request, error)

// requestFromLogs returns a RequestFromLogsFunc that converts plog.Logs into a Request.
func requestFromLogs(pusher consumer.ConsumeLogsFunc) RequestFromLogsFunc {
return func(_ context.Context, ld plog.Logs) (Request, error) {
return newLogsRequest(ld, pusher), nil
}
}

// NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
Expand All @@ -131,7 +115,7 @@ func NewLogsRequestExporter(
return nil, errNilLogsConverter
}

be, err := newBaseExporter(set, component.DataTypeLogs, true, nil, nil, newLogsExporterWithObservability, options...)
be, err := newBaseExporter(set, component.DataTypeLogs, newLogsExporterWithObservability, options...)
if err != nil {
return nil, err
}
Expand Down
38 changes: 11 additions & 27 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type metricsExporter struct {

// NewMetricsExporter creates an exporter.Metrics that records observability metrics and wraps every request with a Span.
func NewMetricsExporter(
_ context.Context,
ctx context.Context,
set exporter.CreateSettings,
cfg component.Config,
pusher consumer.ConsumeMetricsFunc,
Expand All @@ -79,41 +79,25 @@ func NewMetricsExporter(
if cfg == nil {
return nil, errNilConfig
}

if set.Logger == nil {
return nil, errNilLogger
}

if pusher == nil {
return nil, errNilPushMetricsData
}

be, err := newBaseExporter(set, component.DataTypeMetrics, false, metricsRequestMarshaler,
newMetricsRequestUnmarshalerFunc(pusher), newMetricsSenderWithObservability, options...)
if err != nil {
return nil, err
}

mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
req := newMetricsRequest(md, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, queue.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount()))
}
return serr
}, be.consumerOptions...)

return &metricsExporter{
baseExporter: be,
Metrics: mc,
}, err
metricsOpts := []Option{withMarshaler(metricsRequestMarshaler), withUnmarshaler(newMetricsRequestUnmarshalerFunc(pusher))}
return NewMetricsRequestExporter(ctx, set, requestFromMetrics(pusher), append(metricsOpts, options...)...)
}

// RequestFromMetricsFunc converts pdata.Metrics into a user-defined request.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type RequestFromMetricsFunc func(context.Context, pmetric.Metrics) (Request, error)

// requestFromMetrics returns a RequestFromMetricsFunc that converts pdata.Metrics into a Request.
func requestFromMetrics(pusher consumer.ConsumeMetricsFunc) RequestFromMetricsFunc {
return func(_ context.Context, md pmetric.Metrics) (Request, error) {
return newMetricsRequest(md, pusher), nil
}
}

// NewMetricsRequestExporter creates a new metrics exporter based on a custom MetricsConverter and RequestSender.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
Expand All @@ -131,7 +115,7 @@ func NewMetricsRequestExporter(
return nil, errNilMetricsConverter
}

be, err := newBaseExporter(set, component.DataTypeMetrics, true, nil, nil, newMetricsSenderWithObservability, options...)
be, err := newBaseExporter(set, component.DataTypeMetrics, newMetricsSenderWithObservability, options...)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading