-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[exporterhelper] Refactor options and baseExporter #8369
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: bug_fix | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) | ||
component: exporterhelper | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Stop logging error messages suggesting user to enable `retry_on_failure` or `sending_queue` when they are not available. | ||
|
||
# One or more tracking issues or pull requests related to the change | ||
issues: [8369] | ||
|
||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [user] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,9 @@ import ( | |
"context" | ||
"time" | ||
|
||
"go.uber.org/zap" | ||
"go.uber.org/zap/zapcore" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/exporter" | ||
|
@@ -29,9 +32,34 @@ func NewDefaultTimeoutSettings() TimeoutSettings { | |
|
||
// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). | ||
type requestSender interface { | ||
start(ctx context.Context, host component.Host, set exporter.CreateSettings) error | ||
shutdown() | ||
send(req internal.Request) error | ||
setNextSender(nextSender requestSender) | ||
} | ||
|
||
type baseRequestSender struct { | ||
nextSender requestSender | ||
} | ||
|
||
var _ requestSender = (*baseRequestSender)(nil) | ||
|
||
func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error { | ||
return nil | ||
} | ||
|
||
func (b *baseRequestSender) shutdown() {} | ||
|
||
func (b *baseRequestSender) send(req internal.Request) error { | ||
return b.nextSender.send(req) | ||
} | ||
|
||
func (b *baseRequestSender) setNextSender(nextSender requestSender) { | ||
b.nextSender = nextSender | ||
} | ||
|
||
type obsrepSenderFactory func(obsrep *obsExporter) requestSender | ||
|
||
// baseRequest is a base implementation for the internal.Request. | ||
type baseRequest struct { | ||
ctx context.Context | ||
|
@@ -56,99 +84,68 @@ func (req *baseRequest) OnProcessingFinished() { | |
} | ||
} | ||
|
||
// baseSettings represents all the options that users can configure. | ||
type baseSettings struct { | ||
component.StartFunc | ||
component.ShutdownFunc | ||
consumerOptions []consumer.Option | ||
TimeoutSettings | ||
queue internal.ProducerConsumerQueue | ||
RetrySettings | ||
requestExporter bool | ||
marshaler internal.RequestMarshaler | ||
unmarshaler internal.RequestUnmarshaler | ||
} | ||
|
||
// newBaseSettings returns the baseSettings starting from the default and applying all configured options. | ||
// requestExporter indicates whether the base settings are for a new request exporter or not. | ||
// TODO: The first three arguments will be removed when the old exporter helpers will be updated to call the new ones. | ||
func newBaseSettings(requestExporter bool, marshaler internal.RequestMarshaler, | ||
unmarshaler internal.RequestUnmarshaler, options ...Option) *baseSettings { | ||
bs := &baseSettings{ | ||
requestExporter: requestExporter, | ||
TimeoutSettings: NewDefaultTimeoutSettings(), | ||
// TODO: Enable retry by default (call DefaultRetrySettings) | ||
RetrySettings: RetrySettings{Enabled: false}, | ||
marshaler: marshaler, | ||
unmarshaler: unmarshaler, | ||
} | ||
|
||
for _, op := range options { | ||
op(bs) | ||
} | ||
|
||
return bs | ||
} | ||
|
||
// Option apply changes to baseSettings. | ||
type Option func(*baseSettings) | ||
// Option apply changes to baseExporter. | ||
type Option func(*baseExporter) | ||
|
||
// WithStart overrides the default Start function for an exporter. | ||
// The default start function does nothing and always returns nil. | ||
func WithStart(start component.StartFunc) Option { | ||
return func(o *baseSettings) { | ||
return func(o *baseExporter) { | ||
o.StartFunc = start | ||
} | ||
} | ||
|
||
// WithShutdown overrides the default Shutdown function for an exporter. | ||
// The default shutdown function does nothing and always returns nil. | ||
func WithShutdown(shutdown component.ShutdownFunc) Option { | ||
return func(o *baseSettings) { | ||
return func(o *baseExporter) { | ||
o.ShutdownFunc = shutdown | ||
} | ||
} | ||
|
||
// WithTimeout overrides the default TimeoutSettings for an exporter. | ||
// The default TimeoutSettings is 5 seconds. | ||
func WithTimeout(timeoutSettings TimeoutSettings) Option { | ||
return func(o *baseSettings) { | ||
o.TimeoutSettings = timeoutSettings | ||
return func(o *baseExporter) { | ||
o.timeoutSender.cfg = timeoutSettings | ||
} | ||
} | ||
|
||
// WithRetry overrides the default RetrySettings for an exporter. | ||
// The default RetrySettings is to disable retries. | ||
func WithRetry(retrySettings RetrySettings) Option { | ||
return func(o *baseSettings) { | ||
o.RetrySettings = retrySettings | ||
return func(o *baseExporter) { | ||
o.retrySender = newRetrySender(o.set.ID, retrySettings, o.sampledLogger, o.onTemporaryFailure) | ||
} | ||
} | ||
|
||
// WithQueue overrides the default QueueSettings for an exporter. | ||
// The default QueueSettings is to disable queueing. | ||
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. | ||
func WithQueue(config QueueSettings) Option { | ||
return func(o *baseSettings) { | ||
return func(o *baseExporter) { | ||
if o.requestExporter { | ||
panic("queueing is not available for the new request exporters yet") | ||
} | ||
if !config.Enabled { | ||
return | ||
var queue internal.ProducerConsumerQueue | ||
if config.Enabled { | ||
if config.StorageID == nil { | ||
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) | ||
} else { | ||
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) | ||
} | ||
} | ||
if config.StorageID == nil { | ||
o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) | ||
return | ||
} | ||
o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) | ||
qs := newQueueSender(o.set.ID, o.signal, queue, o.sampledLogger) | ||
o.queueSender = qs | ||
o.setOnTemporaryFailure(qs.onTemporaryFailure) | ||
} | ||
} | ||
|
||
// WithCapabilities overrides the default Capabilities() function for a Consumer. | ||
// The default is non-mutable data. | ||
// TODO: Verify if we can change the default to be mutable as we do for processors. | ||
func WithCapabilities(capabilities consumer.Capabilities) Option { | ||
return func(o *baseSettings) { | ||
return func(o *baseExporter) { | ||
o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities)) | ||
} | ||
} | ||
|
@@ -157,48 +154,106 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { | |
type baseExporter struct { | ||
component.StartFunc | ||
component.ShutdownFunc | ||
obsrep *obsExporter | ||
sender requestSender | ||
qrSender *queuedRetrySender | ||
|
||
requestExporter bool | ||
marshaler internal.RequestMarshaler | ||
unmarshaler internal.RequestUnmarshaler | ||
signal component.DataType | ||
|
||
set exporter.CreateSettings | ||
obsrep *obsExporter | ||
sampledLogger *zap.Logger | ||
|
||
// Chain of senders that the exporter helper applies before passing the data to the actual exporter. | ||
// The data is handled by each sender in the respective order starting from the queueSender. | ||
// Most of the senders are optional, and initialized with a no-op path-through sender. | ||
queueSender requestSender | ||
obsrepSender requestSender | ||
retrySender requestSender | ||
timeoutSender *timeoutSender // timeoutSender is always initialized. | ||
|
||
// onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer. | ||
onTemporaryFailure onRequestHandlingFinishedFunc | ||
|
||
consumerOptions []consumer.Option | ||
} | ||
|
||
func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) { | ||
be := &baseExporter{} | ||
// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones. | ||
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler, | ||
unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { | ||
|
||
var err error | ||
be.obsrep, err = newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments) | ||
obsrep, err := newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) | ||
be.sender = be.qrSender | ||
be.StartFunc = func(ctx context.Context, host component.Host) error { | ||
// First start the wrapped exporter. | ||
if err := bs.StartFunc.Start(ctx, host); err != nil { | ||
return err | ||
} | ||
be := &baseExporter{ | ||
requestExporter: requestExporter, | ||
marshaler: marshaler, | ||
unmarshaler: unmarshaler, | ||
signal: signal, | ||
|
||
queueSender: &baseRequestSender{}, | ||
obsrepSender: osf(obsrep), | ||
retrySender: &baseRequestSender{}, | ||
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()}, | ||
|
||
// If no error then start the queuedRetrySender. | ||
return be.qrSender.start(ctx, host, set) | ||
set: set, | ||
obsrep: obsrep, | ||
sampledLogger: createSampledLogger(set.Logger), | ||
} | ||
be.ShutdownFunc = func(ctx context.Context) error { | ||
// First shutdown the queued retry sender | ||
be.qrSender.shutdown() | ||
// Last shutdown the wrapped exporter itself. | ||
return bs.ShutdownFunc.Shutdown(ctx) | ||
|
||
for _, op := range options { | ||
op(be) | ||
} | ||
be.connectSenders() | ||
|
||
return be, nil | ||
} | ||
|
||
// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper. | ||
// This can be used to wrap with observability (create spans, record metrics) the consumer sender. | ||
func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) { | ||
be.qrSender.consumerSender = f(be.qrSender.consumerSender) | ||
// send sends the request using the first sender in the chain. | ||
func (be *baseExporter) send(req internal.Request) error { | ||
return be.queueSender.send(req) | ||
} | ||
|
||
// connectSenders connects the senders in the predefined order. | ||
func (be *baseExporter) connectSenders() { | ||
be.queueSender.setNextSender(be.obsrepSender) | ||
be.obsrepSender.setNextSender(be.retrySender) | ||
be.retrySender.setNextSender(be.timeoutSender) | ||
} | ||
|
||
func (be *baseExporter) Start(ctx context.Context, host component.Host) error { | ||
// First start the wrapped exporter. | ||
if err := be.StartFunc.Start(ctx, host); err != nil { | ||
return err | ||
} | ||
|
||
// If no error then start the queueSender. | ||
return be.queueSender.start(ctx, host, be.set) | ||
} | ||
|
||
func (be *baseExporter) Shutdown(ctx context.Context) error { | ||
// First shutdown the retry sender, so it can push any pending requests to back the queue. | ||
be.retrySender.shutdown() | ||
|
||
// Then shutdown the queue sender. | ||
be.queueSender.shutdown() | ||
|
||
// Last shutdown the wrapped exporter itself. | ||
return be.ShutdownFunc.Shutdown(ctx) | ||
} | ||
|
||
func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) { | ||
be.onTemporaryFailure = onTemporaryFailure | ||
if rs, ok := be.retrySender.(*retrySender); ok { | ||
rs.onTemporaryFailure = onTemporaryFailure | ||
} | ||
} | ||
|
||
// timeoutSender is a requestSender that adds a `timeout` to every request that passes this sender. | ||
type timeoutSender struct { | ||
baseRequestSender | ||
cfg TimeoutSettings | ||
} | ||
|
||
|
@@ -213,3 +268,22 @@ func (ts *timeoutSender) send(req internal.Request) error { | |
} | ||
return req.Export(ctx) | ||
} | ||
|
||
func createSampledLogger(logger *zap.Logger) *zap.Logger { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to revisit this, seems very arbitrary to have a sampler logger only for some cases. Maybe just better document as comment of the func why we have this and where should be used, but it is still random for me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. We have another effort to address this #8134. I'm just moving it from one file to another as is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you @dmitryax , Please @bogdandrutu @dmitryax take a look at my PR #8134. Thanks :) |
||
if logger.Core().Enabled(zapcore.DebugLevel) { | ||
// Debugging is enabled. Don't do any sampling. | ||
return logger | ||
} | ||
|
||
// Create a logger that samples all messages to 1 per 10 seconds initially, | ||
// and 1/100 of messages after that. | ||
opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { | ||
return zapcore.NewSamplerWithOptions( | ||
core, | ||
10*time.Second, | ||
1, | ||
100, | ||
) | ||
}) | ||
return logger.WithOptions(opts) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing this signature should not be considered a breaking change since the argument is of unexported type, so users cannot implement it directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would suggest in a separate PR to change this to an interface with a private func. I think this is kind of the recommended new way of doing options.