Skip to content

Commit

Permalink
Report metric about current size of the exporter retry queue
Browse files Browse the repository at this point in the history
This commit adds observability to queue_retry exporter helper. It adds the first metric "queue_length" that indicates current size of the queue per exporter. The metrics is updated every second.

This is the first commit to address the issue open-telemetry#2434
  • Loading branch information
dmitryax committed Apr 1, 2021
1 parent a08a650 commit 6100fdc
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ install-tools:

.PHONY: otelcol
otelcol:
go generate ./...
# go generate ./...
$(MAKE) build-binary-internal

.PHONY: run
Expand Down
15 changes: 9 additions & 6 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,22 @@ type baseExporter struct {
convertResourceToTelemetry bool
}

func newBaseExporter(cfg config.Exporter, logger *zap.Logger, options ...Option) *baseExporter {
func newBaseExporter(cfg config.Exporter, logger *zap.Logger, options ...Option) (*baseExporter, error) {
bs := fromOptions(options)
be := &baseExporter{
Component: componenthelper.New(bs.componentOptions...),
cfg: cfg,
convertResourceToTelemetry: bs.ResourceToTelemetrySettings.Enabled,
}

be.qrSender = newQueuedRetrySender(cfg.Name(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
be.sender = be.qrSender
qrSender, err := newQueuedRetrySender(cfg.Name(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
if err != nil {
return nil, err
}
be.sender = qrSender
be.qrSender = qrSender

return be
return be, nil
}

// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper.
Expand All @@ -186,8 +190,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
}

// If no error then start the queuedRetrySender.
be.qrSender.start()
return nil
return be.qrSender.start()
}

// Shutdown all senders and exporter and is invoked during service shutdown.
Expand Down
6 changes: 4 additions & 2 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,23 @@ func TestErrorToStatus(t *testing.T) {
}

func TestBaseExporter(t *testing.T) {
be := newBaseExporter(defaultExporterCfg, zap.NewNop())
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop())
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
}

func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be := newBaseExporter(
be, err := newBaseExporter(
defaultExporterCfg,
zap.NewNop(),
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings()),
WithTimeout(DefaultTimeoutSettings()),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
}
Expand Down
5 changes: 4 additions & 1 deletion exporter/exporterhelper/logshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ func NewLogsExporter(
return nil, errNilPushLogsData
}

be := newBaseExporter(cfg, logger, options...)
be, err := newBaseExporter(cfg, logger, options...)
if err != nil {
return nil, err
}
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &logsExporterWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Expand Down
5 changes: 4 additions & 1 deletion exporter/exporterhelper/metricshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func NewMetricsExporter(
return nil, errNilPushMetricsData
}

be := newBaseExporter(cfg, logger, options...)
be, err := newBaseExporter(cfg, logger, options...)
if err != nil {
return nil, err
}
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &metricsSenderWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Expand Down
44 changes: 40 additions & 4 deletions exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/jaegertracing/jaeger/pkg/queue"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -30,6 +33,16 @@ import (
"go.opentelemetry.io/collector/obsreport"
)

var (
r = metric.NewRegistry()

queueSizeGauge, _ = r.AddInt64DerivedGauge(
obsreport.ExporterKey+"/queue_size",
metric.WithDescription("Current size of the retry queue (in batches)"),
metric.WithLabelKeys(obsreport.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))
)

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
type QueueSettings struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Expand Down Expand Up @@ -79,6 +92,7 @@ func DefaultRetrySettings() RetrySettings {
}

type queuedRetrySender struct {
fullName string
cfg QueueSettings
consumerSender requestSender
queue *queue.BoundedQueue
Expand Down Expand Up @@ -106,12 +120,19 @@ func createSampledLogger(logger *zap.Logger) *zap.Logger {
return logger.WithOptions(opts)
}

func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender {
func newQueuedRetrySender(
fullName string,
qCfg QueueSettings,
rCfg RetrySettings,
nextSender requestSender,
logger *zap.Logger,
) (*queuedRetrySender, error) {
retryStopCh := make(chan struct{})
sampledLogger := createSampledLogger(logger)
traceAttr := trace.StringAttribute(obsreport.ExporterKey, fullName)
return &queuedRetrySender{
cfg: qCfg,
fullName: fullName,
cfg: qCfg,
consumerSender: &retrySender{
traceAttribute: traceAttr,
cfg: rCfg,
Expand All @@ -123,15 +144,28 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting
retryStopCh: retryStopCh,
traceAttributes: []trace.Attribute{traceAttr},
logger: sampledLogger,
}
}, nil
}

// start is invoked during service startup.
func (qrs *queuedRetrySender) start() {
func (qrs *queuedRetrySender) start() error {
qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) {
req := item.(request)
_ = qrs.consumerSender.send(req)
})

// Start reporting queue length metric
if qrs.cfg.Enabled {
metricproducer.GlobalManager().AddProducer(r)
err := queueSizeGauge.UpsertEntry(func() int64 {
return int64(qrs.queue.Size())
}, metricdata.NewLabelValue(qrs.fullName))
if err != nil {
return fmt.Errorf("failed to create retry queue size metric: %v", err)
}
}

return nil
}

// send implements the requestSender interface
Expand Down Expand Up @@ -167,6 +201,8 @@ func (qrs *queuedRetrySender) send(req request) error {

// shutdown is invoked during service shutdown.
func (qrs *queuedRetrySender) shutdown() {
metricproducer.GlobalManager().DeleteProducer(r)

// First stop the retry goroutines, so that unblocks the queue workers.
close(qrs.retryStopCh)

Expand Down
54 changes: 43 additions & 11 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import (
func TestQueuedRetry_DropOnPermanentError(t *testing.T) {
qCfg := DefaultQueueSettings()
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -59,7 +60,8 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) {
qCfg := DefaultQueueSettings()
rCfg := DefaultRetrySettings()
rCfg.Enabled = false
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -84,7 +86,8 @@ func TestQueuedRetry_OnError(t *testing.T) {
qCfg.NumConsumers = 1
rCfg := DefaultRetrySettings()
rCfg.InitialInterval = 0
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -110,7 +113,8 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 1
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -143,7 +147,8 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 1
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -172,7 +177,8 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) {
rCfg := DefaultRetrySettings()
rCfg.InitialInterval = time.Millisecond
rCfg.MaxElapsedTime = 100 * time.Millisecond
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -210,7 +216,8 @@ func TestQueuedRetry_ThrottleError(t *testing.T) {
qCfg.NumConsumers = 1
rCfg := DefaultRetrySettings()
rCfg.InitialInterval = 10 * time.Millisecond
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -241,7 +248,8 @@ func TestQueuedRetry_RetryOnError(t *testing.T) {
qCfg.QueueSize = 1
rCfg := DefaultRetrySettings()
rCfg.InitialInterval = 0
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -267,14 +275,15 @@ func TestQueuedRetry_DropOnFull(t *testing.T) {
qCfg := DefaultQueueSettings()
qCfg.QueueSize = 0
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})
err := be.sender.send(newMockRequest(context.Background(), 2, errors.New("transient error")))
err = be.sender.send(newMockRequest(context.Background(), 2, errors.New("transient error")))
require.Error(t, err)
}

Expand All @@ -285,7 +294,8 @@ func TestQueuedRetryHappyPath(t *testing.T) {

qCfg := DefaultQueueSettings()
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -315,6 +325,28 @@ func TestQueuedRetryHappyPath(t *testing.T) {
ocs.checkDroppedItemsCount(t, 0)
}

func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := DefaultRetrySettings()
be, err := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

for i := 0; i < 7; i++ {
be.sender.send(newErrorRequest(context.Background()))
}

obsreporttest.CheckExporterQueueRetryViews(t, defaultExporterCfg.Name(), int64(7))
}

func TestNoCancellationContext(t *testing.T) {
deadline := time.Now().Add(1 * time.Second)
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline)
Expand Down
5 changes: 4 additions & 1 deletion exporter/exporterhelper/tracehelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func NewTraceExporter(
return nil, errNilPushTraceData
}

be := newBaseExporter(cfg, logger, options...)
be, err := newBaseExporter(cfg, logger, options...)
if err != nil {
return nil, err
}
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &tracesExporterWithObservability{
obsrep: obsreport.NewExporter(
Expand Down
36 changes: 36 additions & 0 deletions local/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
extensions:
zpages:
endpoint: 0.0.0.0:55679

receivers:
otlp:
protocols:
grpc:
opencensus:
jaeger:
protocols:
grpc:
thrift_http:
zipkin:

processors:
batch: {}
memory_limiter:
ballast_size_mib: 2000
check_interval: 1s
limit_mib: 4000
spike_limit_mib: 800

exporters:
logging:
otlp:
endpoint: localhost:1234

service:
pipelines:
traces:
receivers: [zipkin]
processors: [memory_limiter]
exporters: [logging, otlp]

extensions: [zpages]
Loading

0 comments on commit 6100fdc

Please sign in to comment.