Skip to content

Commit

Permalink
Report metric about current size of the exporter retry queue
Browse files Browse the repository at this point in the history
This commit adds observability to queue_retry exporter helper. It adds the first metric "queue_length" that indicates current size of the queue per exporter. The metrics is updated every second.

This is the first commit to address the issue open-telemetry#2434
  • Loading branch information
dmitryax committed Mar 30, 2021
1 parent 2c17034 commit d7e434a
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 56 deletions.
5 changes: 3 additions & 2 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/obsreport"
)

// TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend.
Expand Down Expand Up @@ -158,15 +159,15 @@ type baseExporter struct {
convertResourceToTelemetry bool
}

func newBaseExporter(cfg config.Exporter, logger *zap.Logger, options ...Option) *baseExporter {
func newBaseExporter(cfg config.Exporter, logger *zap.Logger, obsrep *obsreport.Exporter, options ...Option) *baseExporter {
bs := fromOptions(options)
be := &baseExporter{
Component: componenthelper.New(bs.componentOptions...),
cfg: cfg,
convertResourceToTelemetry: bs.ResourceToTelemetrySettings.Enabled,
}

be.qrSender = newQueuedRetrySender(cfg.Name(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
be.qrSender = newQueuedRetrySender(cfg.Name(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, obsrep, logger)
be.sender = be.qrSender

return be
Expand Down
12 changes: 11 additions & 1 deletion exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/obsreport"
)

var okStatus = trace.Status{Code: trace.StatusCodeOK}
Expand All @@ -40,7 +42,7 @@ func TestErrorToStatus(t *testing.T) {
}

func TestBaseExporter(t *testing.T) {
be := newBaseExporter(defaultExporterCfg, zap.NewNop())
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter())
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
}
Expand All @@ -50,6 +52,7 @@ func TestBaseExporterWithOptions(t *testing.T) {
be := newBaseExporter(
defaultExporterCfg,
zap.NewNop(),
newTestObsExporter(),
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()),
Expand All @@ -65,3 +68,10 @@ func errToStatus(err error) trace.Status {
}
return okStatus
}

func newTestObsExporter() *obsreport.Exporter {
return obsreport.NewExporter(obsreport.ExporterSettings{
Level: configtelemetry.LevelNormal,
ExporterName: defaultExporterCfg.Name(),
})
}
11 changes: 6 additions & 5 deletions exporter/exporterhelper/logshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,14 @@ func NewLogsExporter(
return nil, errNilPushLogsData
}

be := newBaseExporter(cfg, logger, options...)
obsrep := obsreport.NewExporter(obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterName: cfg.Name(),
})
be := newBaseExporter(cfg, logger, obsrep, options...)
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &logsExporterWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterName: cfg.Name(),
}),
obsrep: obsrep,
nextSender: nextSender,
}
})
Expand Down
11 changes: 6 additions & 5 deletions exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ func NewMetricsExporter(
return nil, errNilPushMetricsData
}

be := newBaseExporter(cfg, logger, options...)
obsrep := obsreport.NewExporter(obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterName: cfg.Name(),
})
be := newBaseExporter(cfg, logger, obsrep, options...)
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &metricsSenderWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterName: cfg.Name(),
}),
obsrep: obsrep,
nextSender: nextSender,
}
})
Expand Down
31 changes: 30 additions & 1 deletion exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ import (
"go.opentelemetry.io/collector/obsreport"
)

var (
// Time interval to report queue length metric
queueLengthReportInterval = 1 * time.Second
)

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
type QueueSettings struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Expand Down Expand Up @@ -85,6 +90,7 @@ type queuedRetrySender struct {
retryStopCh chan struct{}
traceAttributes []trace.Attribute
logger *zap.Logger
obsrep *obsreport.Exporter
}

func createSampledLogger(logger *zap.Logger) *zap.Logger {
Expand All @@ -106,7 +112,14 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger {
return logger.WithOptions(opts)
}

func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender {
func newQueuedRetrySender(
fullName string,
qCfg QueueSettings,
rCfg RetrySettings,
nextSender requestSender,
obsrep *obsreport.Exporter,
logger *zap.Logger,
) *queuedRetrySender {
retryStopCh := make(chan struct{})
sampledLogger := createSampledLogger(logger)
traceAttr := trace.StringAttribute(obsreport.ExporterKey, fullName)
Expand All @@ -122,6 +135,7 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting
queue: queue.NewBoundedQueue(qCfg.QueueSize, func(item interface{}) {}),
retryStopCh: retryStopCh,
traceAttributes: []trace.Attribute{traceAttr},
obsrep: obsrep,
logger: sampledLogger,
}
}
Expand All @@ -132,6 +146,21 @@ func (qrs *queuedRetrySender) start() {
req := item.(request)
_ = qrs.consumerSender.send(req)
})

// Start a timer to report the queue length every second.
ticker := time.NewTicker(queueLengthReportInterval)
go func() {
defer ticker.Stop()
for {
select {
case <-qrs.retryStopCh:
return
case <-ticker.C:
qrs.obsrep.ReportQueueLength(context.Background(), qrs.queue.Size())
}
}
}()

}

// send implements the requestSender interface
Expand Down
46 changes: 36 additions & 10 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
func TestQueuedRetry_DropOnPermanentError(t *testing.T) {
qCfg := DefaultQueueSettings()
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter(), WithRetry(rCfg), WithQueue(qCfg))
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -59,7 +59,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) {
qCfg := DefaultQueueSettings()
rCfg := DefaultRetrySettings()
rCfg.Enabled = false
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter(), WithRetry(rCfg), WithQueue(qCfg))
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -84,7 +84,7 @@ func TestQueuedRetry_OnError(t *testing.T) {
qCfg.NumConsumers = 1
rCfg := DefaultRetrySettings()
rCfg.InitialInterval = 0
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter(), WithRetry(rCfg), WithQueue(qCfg))
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -110,7 +110,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 1
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter(), WithRetry(rCfg), WithQueue(qCfg))
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 1
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter(), WithRetry(rCfg), WithQueue(qCfg))
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) {
rCfg := DefaultRetrySettings()
rCfg.InitialInterval = time.Millisecond
rCfg.MaxElapsedTime = 100 * time.Millisecond
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter(), WithRetry(rCfg), WithQueue(qCfg))
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) {
qCfg.NumConsumers = 1
rCfg := DefaultRetrySettings()
rCfg.InitialInterval = 10 * time.Millisecond
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter(), WithRetry(rCfg), WithQueue(qCfg))
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) {
qCfg.QueueSize = 1
rCfg := DefaultRetrySettings()
rCfg.InitialInterval = 0
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter(), WithRetry(rCfg), WithQueue(qCfg))
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -267,7 +267,7 @@ func TestQueuedRetry_DropOnFull(t *testing.T) {
qCfg := DefaultQueueSettings()
qCfg.QueueSize = 0
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter(), WithRetry(rCfg), WithQueue(qCfg))
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -285,7 +285,7 @@ func TestQueuedRetryHappyPath(t *testing.T) {

qCfg := DefaultQueueSettings()
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter(), WithRetry(rCfg), WithQueue(qCfg))
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -315,6 +315,32 @@ func TestQueuedRetryHappyPath(t *testing.T) {
ocs.checkDroppedItemsCount(t, 0)
}

func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

queueLengthReportInterval = 10 * time.Millisecond

qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 1
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), newTestObsExporter(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

for i := 0; i < 10; i++ {
be.sender.send(newErrorRequest(context.Background()))
}

<-time.After(20 * time.Millisecond)

// 1 request is in-flight, 9 requests are in the queue
obsreporttest.CheckExporterQueueRetryViews(t, defaultExporterCfg.Name(), int64(9))
}

func TestNoCancellationContext(t *testing.T) {
deadline := time.Now().Add(1 * time.Second)
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline)
Expand Down
13 changes: 7 additions & 6 deletions exporter/exporterhelper/tracehelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ func NewTraceExporter(
return nil, errNilPushTraceData
}

be := newBaseExporter(cfg, logger, options...)
obsrep := obsreport.NewExporter(
obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterName: cfg.Name(),
})
be := newBaseExporter(cfg, logger, obsrep, options...)
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &tracesExporterWithObservability{
obsrep: obsreport.NewExporter(
obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterName: cfg.Name(),
}),
obsrep: obsrep,
nextSender: nextSender,
}
})
Expand Down
7 changes: 7 additions & 0 deletions obsreport/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ func AllViews() (views []*view.View) {
tagKeys = []tag.Key{tagKeyExporter}
views = append(views, genViews(measures, tagKeys, view.Sum())...)

// Queued retry views.
measures = []*stats.Int64Measure{
mQueueLength,
}
tagKeys = []tag.Key{tagKeyExporter}
views = append(views, genViews(measures, tagKeys, view.LastValue())...)

// Processor views.
measures = []*stats.Int64Measure{
mProcessorAcceptedSpans,
Expand Down
18 changes: 18 additions & 0 deletions obsreport/obsreport_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ const (
SentLogRecordsKey = "sent_log_records"
// Key used to track logs that failed to be sent by exporters.
FailedToSendLogRecordsKey = "send_failed_log_records"

// Queue retry metrics
QueueLengthKey = "queue_length"
)

var (
Expand Down Expand Up @@ -81,6 +84,12 @@ var (
exporterPrefix+FailedToSendLogRecordsKey,
"Number of log records in failed attempts to send to destination.",
stats.UnitDimensionless)

// Queued retry related metrics
mQueueLength = stats.Int64(
exporterPrefix+QueueLengthKey,
"Current length of the queue (in batches)",
stats.UnitDimensionless)
)

type Exporter struct {
Expand Down Expand Up @@ -146,6 +155,15 @@ func (eor *Exporter) EndLogsExportOp(ctx context.Context, numLogRecords int, err
endSpan(ctx, err, numSent, numFailedToSend, SentLogRecordsKey, FailedToSendLogRecordsKey)
}

// ReportQueueLength reports the current queue length metric
func (eor *Exporter) ReportQueueLength(ctx context.Context, queueLength int) {
if gLevel == configtelemetry.LevelNone {
return
}
// Ignore the error for now. This should not happen.
_ = stats.RecordWithTags(ctx, eor.mutators, mQueueLength.M(int64(queueLength)))
}

// startSpan creates the span used to trace the operation. Returning
// the updated context and the created span.
func (eor *Exporter) startSpan(ctx context.Context, operationSuffix string) context.Context {
Expand Down
Loading

0 comments on commit d7e434a

Please sign in to comment.