Skip to content

Commit

Permalink
First round of comments addressed
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdandrutu committed Jul 17, 2020
1 parent eb11553 commit 0828ede
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 191 deletions.
151 changes: 78 additions & 73 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,36 @@ var (
okStatus = trace.Status{Code: trace.StatusCodeOK}
)

// Settings for timeout. The timeout applies to individual attempts to send data to the backend.
type TimeoutSettings struct {
// Timeout is the timeout for each operation.
// Timeout is the timeout for every attempt to send data to the backend.
Timeout time.Duration `mapstructure:"timeout"`
}

// CreateDefaultTimeoutSettings returns the default settings for TimeoutSettings.
func CreateDefaultTimeoutSettings() TimeoutSettings {
return TimeoutSettings{
Timeout: 5 * time.Second,
}
}

type settings struct {
configmodels.Exporter
TimeoutSettings
QueuedSettings
RetrySettings
}

// request is an abstraction of an individual request (batch of data) independent of the type of the data (traces, metrics, logs).
type request interface {
context() context.Context
setContext(context.Context)
export(ctx context.Context) (int, error)
// Returns a new queue request that contains the items left to be exported.
// Returns a new request that contains the items left to be sent.
onPartialError(consumererror.PartialError) request
// Returns the cnt of spans/metric points or log records.
// Returns the count of spans/metric points or log records.
count() int
}

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
send(req request) (int, error)
}

// baseRequest is a base implementation for the request.
type baseRequest struct {
ctx context.Context
}
Expand All @@ -81,139 +79,146 @@ type Start func(context.Context, component.Host) error
// Shutdown specifies the function invoked when the exporter is being shutdown.
type Shutdown func(context.Context) error

// internalOptions represents all the options that users can configured.
type internalOptions struct {
TimeoutSettings
QueueSettings
RetrySettings
Start
Shutdown
}

// fromConfiguredOptions returns the internal options starting from the default and applying all configured options.
func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
// Start from the default options:
opts := &internalOptions{
TimeoutSettings: CreateDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call CreateDefaultQueueSettings)
QueueSettings: QueueSettings{Disabled: true},
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
RetrySettings: RetrySettings{Disabled: true},
Start: func(ctx context.Context, host component.Host) error { return nil },
Shutdown: func(ctx context.Context) error { return nil },
}

for _, op := range options {
op(opts)
}

return opts
}

// ExporterOption apply changes to internalOptions.
type ExporterOption func(*baseExporter)
type ExporterOption func(*internalOptions)

// WithShutdown overrides the default Shutdown function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown Shutdown) ExporterOption {
return func(o *baseExporter) {
o.shutdown = shutdown
return func(o *internalOptions) {
o.Shutdown = shutdown
}
}

// WithStart overrides the default Start function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithStart(start Start) ExporterOption {
return func(o *baseExporter) {
o.start = start
return func(o *internalOptions) {
o.Start = start
}
}

// WithShutdown overrides the default TimeoutSettings for an exporter.
// WithTimeout overrides the default TimeoutSettings for an exporter.
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeout TimeoutSettings) ExporterOption {
return func(o *baseExporter) {
o.cfg.TimeoutSettings = timeout
func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption {
return func(o *internalOptions) {
o.TimeoutSettings = timeoutSettings
}
}

// WithRetry overrides the default RetrySettings for an exporter.
// The default RetrySettings is to disable retries.
func WithRetry(retry RetrySettings) ExporterOption {
return func(o *baseExporter) {
o.cfg.RetrySettings = retry
func WithRetry(retrySettings RetrySettings) ExporterOption {
return func(o *internalOptions) {
o.RetrySettings = retrySettings
}
}

// WithQueued overrides the default QueuedSettings for an exporter.
// The default QueuedSettings is to disable queueing.
func WithQueued(queued QueuedSettings) ExporterOption {
return func(o *baseExporter) {
o.cfg.QueuedSettings = queued
// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
func WithQueue(queueSettings QueueSettings) ExporterOption {
return func(o *internalOptions) {
o.QueueSettings = queueSettings
}
}

// internalOptions contains internalOptions concerning how an Exporter is configured.
// baseExporter contains common fields between different exporter types.
type baseExporter struct {
cfg *settings
cfg configmodels.Exporter
sender requestSender
rSender *retrySender
qSender *queuedSender
qrSender *queuedRetrySender
start Start
shutdown Shutdown
startOnce sync.Once
shutdownOnce sync.Once
}

// Construct the internalOptions from multiple ExporterOption.
func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *baseExporter {
opts := fromConfiguredOptions(options...)
be := &baseExporter{
cfg: &settings{
Exporter: cfg,
TimeoutSettings: CreateDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call CreateDefaultQueuedSettings
QueuedSettings: QueuedSettings{Disabled: true},
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
RetrySettings: RetrySettings{Disabled: true},
},
}

for _, op := range options {
op(be)
}

if be.start == nil {
be.start = func(ctx context.Context, host component.Host) error { return nil }
cfg: cfg,
start: opts.Start,
shutdown: opts.Shutdown,
}

if be.shutdown == nil {
be.shutdown = func(ctx context.Context) error { return nil }
}

be.sender = &timeoutSender{cfg: &be.cfg.TimeoutSettings}

be.rSender = newRetrySender(&be.cfg.RetrySettings, be.sender)
be.sender = be.rSender

be.qSender = newQueuedSender(&be.cfg.QueuedSettings, be.sender)
be.sender = be.qSender
be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings})
be.sender = be.qrSender

return be
}

// Start all senders and exporter and is invoked during service start.
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
err := componenterror.ErrAlreadyStarted
be.startOnce.Do(func() {
// First start the nextSender
// First start the wrapped exporter.
err = be.start(ctx, host)
if err != nil {
// TODO: Log errors, or check if it is recorded by the caller.
return
}

// If no error then start the queuedSender
be.qSender.start()
// If no error then start the queuedRetrySender.
be.qrSender.start()
})
return err
}

// Shutdown stops the nextSender and is invoked during shutdown.
// Shutdown all senders and exporter and is invoked during service shutdown.
func (be *baseExporter) Shutdown(ctx context.Context) error {
err := componenterror.ErrAlreadyStopped
be.shutdownOnce.Do(func() {
// First stop the retry goroutines
be.rSender.shutdown()

// All operations will try to export once but will not retry because retrying was disabled when be.rSender stopped.
be.qSender.shutdown()

// Last shutdown the nextSender itself.
// First shutdown the queued retry sender
be.qrSender.shutdown()
// Last shutdown the wrapped exporter itself.
err = be.shutdown(ctx)
})
return err
}

// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender.
type timeoutSender struct {
cfg *TimeoutSettings
cfg TimeoutSettings
}

func (te *timeoutSender) send(req request) (int, error) {
// send implements the requestSender interface
func (ts *timeoutSender) send(req request) (int, error) {
// Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be
// updated because this deadline most likely is before the next one.
ctx := req.context()
if te.cfg.Timeout > 0 {
if ts.cfg.Timeout > 0 {
var cancelFunc func()
ctx, cancelFunc = context.WithTimeout(req.context(), te.cfg.Timeout)
ctx, cancelFunc = context.WithTimeout(req.context(), ts.cfg.Timeout)
defer cancelFunc()
}
return req.export(ctx)
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/logshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld data.Logs) error {
return err
}

// NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span.
// NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span.
func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) {
if cfg == nil {
return nil, errNilConfig
Expand All @@ -79,9 +79,9 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio
be := newBaseExporter(cfg, options...)

// Record metrics on the consumer.
be.qSender.nextSender = &logsExporterWithObservability{
be.qrSender.consumerSender = &logsExporterWithObservability{
exporterName: cfg.Name(),
sender: be.qSender.nextSender,
sender: be.qrSender.consumerSender,
}

return &logsExporter{
Expand Down
14 changes: 6 additions & 8 deletions exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func (mexp *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consu
return err
}

// NewMetricsExporterOld creates an MetricsExporter that can record metrics and can wrap every request with a Span.
// If no internalOptions are passed it just adds the nextSender format as a tag in the Context.
// NewMetricsExporterOld creates an MetricsExporter that records observability metrics and wraps every request with a Span.
// TODO: Add support for retries.
func NewMetricsExporterOld(cfg configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) {
if cfg == nil {
Expand Down Expand Up @@ -129,8 +128,7 @@ func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metric
return err
}

// NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span.
// If no internalOptions are passed it just adds the nextSender format as a tag in the Context.
// NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span.
func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) {
if cfg == nil {
return nil, errNilConfig
Expand All @@ -143,9 +141,9 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa
be := newBaseExporter(cfg, options...)

// Record metrics on the consumer.
be.qSender.nextSender = &metricsSenderWithObservability{
be.qrSender.consumerSender = &metricsSenderWithObservability{
exporterName: cfg.Name(),
sender: be.qSender.nextSender,
sender: be.qrSender.consumerSender,
}

return &metricsExporter{
Expand All @@ -163,8 +161,8 @@ func (mewo *metricsSenderWithObservability) send(req request) (int, error) {
req.setContext(obsreport.StartMetricsExportOp(req.context(), mewo.exporterName))
numDroppedMetrics, err := mewo.sender.send(req)

// TODO: this is not ideal: req should come from the next function itself.
// temporarily loading req from internal format. Once full switch is done
// TODO: this is not ideal: it should come from the next function itself.
// temporarily loading it from internal format. Once full switch is done
// to new metrics will remove this.
mReq := req.(*metricsRequest)
numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(mReq.md)
Expand Down
Loading

0 comments on commit 0828ede

Please sign in to comment.