Skip to content

Commit

Permalink
[exporterhelper] Refactor options and baseExporter
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Sep 7, 2023
1 parent 744eb93 commit 7f46194
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 288 deletions.
232 changes: 157 additions & 75 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"context"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -27,11 +30,52 @@ func NewDefaultTimeoutSettings() TimeoutSettings {
}
}

type senderType string

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
start(ctx context.Context, host component.Host, set exporter.CreateSettings) error
shutdown()
send(req internal.Request) error
setNextSender(nextSender requestSender)
}

type baseRequestSender struct {
nextSender requestSender
startFunc func(ctx context.Context, host component.Host, set exporter.CreateSettings) error
shutdownFunc func()
sendFunc func(req internal.Request) error
}

var _ requestSender = (*baseRequestSender)(nil)

func (b *baseRequestSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error {
if b.startFunc == nil {
return nil
}
return b.startFunc(ctx, host, set)
}

func (b *baseRequestSender) send(req internal.Request) error {
if b.nextSender != nil {
return b.nextSender.send(req)
}
return req.Export(req.Context())
}

func (b *baseRequestSender) shutdown() {
if b.shutdownFunc == nil {
return
}
b.shutdownFunc()
}

func (b *baseRequestSender) setNextSender(nextSender requestSender) {
b.nextSender = nextSender
}

type obsrepSenderFactory func(obsrep *obsExporter) requestSender

// baseRequest is a base implementation for the internal.Request.
type baseRequest struct {
ctx context.Context
Expand All @@ -56,99 +100,68 @@ func (req *baseRequest) OnProcessingFinished() {
}
}

// baseSettings represents all the options that users can configure.
type baseSettings struct {
component.StartFunc
component.ShutdownFunc
consumerOptions []consumer.Option
TimeoutSettings
queue internal.ProducerConsumerQueue
RetrySettings
requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

// newBaseSettings returns the baseSettings starting from the default and applying all configured options.
// requestExporter indicates whether the base settings are for a new request exporter or not.
// TODO: The first three arguments will be removed when the old exporter helpers will be updated to call the new ones.
func newBaseSettings(requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, options ...Option) *baseSettings {
bs := &baseSettings{
requestExporter: requestExporter,
TimeoutSettings: NewDefaultTimeoutSettings(),
// TODO: Enable retry by default (call DefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
marshaler: marshaler,
unmarshaler: unmarshaler,
}

for _, op := range options {
op(bs)
}

return bs
}

// Option apply changes to baseSettings.
type Option func(*baseSettings)
type Option func(*baseExporter)

// WithStart overrides the default Start function for an exporter.
// The default start function does nothing and always returns nil.
func WithStart(start component.StartFunc) Option {
return func(o *baseSettings) {
return func(o *baseExporter) {
o.StartFunc = start
}
}

// WithShutdown overrides the default Shutdown function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown component.ShutdownFunc) Option {
return func(o *baseSettings) {
return func(o *baseExporter) {
o.ShutdownFunc = shutdown
}
}

// WithTimeout overrides the default TimeoutSettings for an exporter.
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeoutSettings TimeoutSettings) Option {
return func(o *baseSettings) {
o.TimeoutSettings = timeoutSettings
return func(o *baseExporter) {
o.timeoutSender.(*timeoutSender).cfg = timeoutSettings
}
}

// WithRetry overrides the default RetrySettings for an exporter.
// The default RetrySettings is to disable retries.
func WithRetry(retrySettings RetrySettings) Option {
return func(o *baseSettings) {
o.RetrySettings = retrySettings
return func(o *baseExporter) {
o.retrySender = newRetrySender(o.set.ID, retrySettings, o.sampledLogger, o.onTemporaryFailure)
}
}

// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueSettings) Option {
return func(o *baseSettings) {
return func(o *baseExporter) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
}
if !config.Enabled {
return
}
if config.StorageID == nil {
o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
return
var queue internal.ProducerConsumerQueue
if config.Enabled {
if config.StorageID == nil {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
} else {
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
}
}
o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
qs := newQueueSender(o.set.ID, o.signal, queue, o.sampledLogger)
o.queueSender = qs
o.setOnTemporaryFailure(qs.onTemporaryFailure)
}
}

// WithCapabilities overrides the default Capabilities() function for a Consumer.
// The default is non-mutable data.
// TODO: Verify if we can change the default to be mutable as we do for processors.
func WithCapabilities(capabilities consumer.Capabilities) Option {
return func(o *baseSettings) {
return func(o *baseExporter) {
o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities))
}
}
Expand All @@ -157,48 +170,98 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
type baseExporter struct {
component.StartFunc
component.ShutdownFunc
obsrep *obsExporter
sender requestSender
qrSender *queuedRetrySender

requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
signal component.DataType

set exporter.CreateSettings
obsrep *obsExporter
sampledLogger *zap.Logger

// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
queueSender requestSender
obsrepSender requestSender
retrySender requestSender
timeoutSender requestSender

// onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer.
onTemporaryFailure onRequestHandlingFinishedFunc

consumerOptions []consumer.Option
}

func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) {
be := &baseExporter{}
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

var err error
be.obsrep, err = newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments)
obsrep, err := newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments)
if err != nil {
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := bs.StartFunc.Start(ctx, host); err != nil {
return err
}
be := &baseExporter{
requestExporter: requestExporter,
marshaler: marshaler,
unmarshaler: unmarshaler,
signal: signal,

queueSender: &baseRequestSender{},
obsrepSender: osf(obsrep),
retrySender: &baseRequestSender{},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()},

// If no error then start the queuedRetrySender.
return be.qrSender.start(ctx, host, set)
set: set,
obsrep: obsrep,
sampledLogger: createSampledLogger(set.Logger),
}
be.ShutdownFunc = func(ctx context.Context) error {
// First shutdown the queued retry sender
be.qrSender.shutdown()
// Last shutdown the wrapped exporter itself.
return bs.ShutdownFunc.Shutdown(ctx)

for _, op := range options {
op(be)
}
be.connectSenders()

return be, nil
}

// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper.
// This can be used to wrap with observability (create spans, record metrics) the consumer sender.
func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) {
be.qrSender.consumerSender = f(be.qrSender.consumerSender)
func (be *baseExporter) connectSenders() {
be.queueSender.setNextSender(be.obsrepSender)
be.obsrepSender.setNextSender(be.retrySender)
be.retrySender.setNextSender(be.timeoutSender)
}

func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := be.StartFunc.Start(ctx, host); err != nil {
return err
}

// If no error then start the queueSender.
return be.queueSender.start(ctx, host, be.set)
}

func (be *baseExporter) Shutdown(ctx context.Context) error {
// First shutdown the retry sender
be.retrySender.shutdown()

// Then shutdown the queue sender
be.queueSender.shutdown()

// Last shutdown the wrapped exporter itself.
return be.ShutdownFunc.Shutdown(ctx)
}

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
be.onTemporaryFailure = onTemporaryFailure
if rs, ok := be.retrySender.(*retrySender); ok {
rs.onTemporaryFailure = onTemporaryFailure
}
}

// timeoutSender is a requestSender that adds a `timeout` to every request that passes this sender.
type timeoutSender struct {
*baseRequestSender
cfg TimeoutSettings
}

Expand All @@ -213,3 +276,22 @@ func (ts *timeoutSender) send(req internal.Request) error {
}
return req.Export(ctx)
}

func createSampledLogger(logger *zap.Logger) *zap.Logger {
if logger.Core().Enabled(zapcore.DebugLevel) {
// Debugging is enabled. Don't do any sampling.
return logger
}

// Create a logger that samples all messages to 1 per 10 seconds initially,
// and 1/100 of messages after that.
opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewSamplerWithOptions(
core,
10*time.Second,
1,
100,
)
})
return logger.WithOptions(opts)
}
20 changes: 11 additions & 9 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterhelper

import (
Expand Down Expand Up @@ -31,12 +32,16 @@ var (
}
)

func newNoopObsrepSender(_ *obsExporter) requestSender {
return &baseRequestSender{}
}

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil), "")
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
be, err = newBaseExporter(defaultSettings, newBaseSettings(true, nil, nil), "")
be, err = newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -45,13 +50,10 @@ func TestBaseExporter(t *testing.T) {
func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be, err := newBaseExporter(
defaultSettings,
newBaseSettings(
false, nil, nil,
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings())),
"",
defaultSettings, "", false, nil, nil, newNoopObsrepSender,
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings()),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
Loading

0 comments on commit 7f46194

Please sign in to comment.