From 63ed063a8c5847bd9276ef49597b8dd65fab4719 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 30 Mar 2021 14:25:05 -0700 Subject: [PATCH] Report metric about current size of the exporter retry queue This commit adds observability to queue_retry exporter helper. It adds the first metric "queue_length" that indicates current size of the queue per exporter. The metrics is updated every second. This is the first commit to address the issue https://github.com/open-telemetry/opentelemetry-collector/issues/2434 --- exporter/exporterhelper/common.go | 3 +- exporter/exporterhelper/common_test.go | 19 +++++-- exporter/exporterhelper/queued_retry.go | 42 +++++++++++++- exporter/exporterhelper/queued_retry_test.go | 58 ++++++++++++++++++++ 4 files changed, 113 insertions(+), 9 deletions(-) diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 61413852f6f..1a29846db69 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -186,8 +186,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { } // If no error then start the queuedRetrySender. - be.qrSender.start() - return nil + return be.qrSender.start() } // Shutdown all senders and exporter and is invoked during service shutdown. diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 6b642845925..a4d7f59f9ba 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opencensus.io/tag" "go.opencensus.io/trace" "go.uber.org/zap" @@ -27,12 +28,20 @@ import ( "go.opentelemetry.io/collector/config" ) -var okStatus = trace.Status{Code: trace.StatusCodeOK} +var ( + okStatus = trace.Status{Code: trace.StatusCodeOK} -var defaultExporterCfg = &config.ExporterSettings{ - TypeVal: "test", - NameVal: "test", -} + defaultExporterName = "test" + defaultExporterCfg = &config.ExporterSettings{ + TypeVal: "test", + NameVal: defaultExporterName, + } + + exporterTag, _ = tag.NewKey("exporter") + defautlExporterTags = []tag.Tag{ + {Key: exporterTag, Value: defaultExporterName}, + } +) func TestErrorToStatus(t *testing.T) { require.Equal(t, okStatus, errToStatus(nil)) diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 106122182e2..ae68dafc467 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -22,6 +22,9 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/jaegertracing/jaeger/pkg/queue" + "go.opencensus.io/metric" + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricproducer" "go.opencensus.io/trace" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -30,6 +33,20 @@ import ( "go.opentelemetry.io/collector/obsreport" ) +var ( + r = metric.NewRegistry() + + queueSizeGauge, _ = r.AddInt64DerivedGauge( + obsreport.ExporterKey+"/queue_size", + metric.WithDescription("Current size of the retry queue (in batches)"), + metric.WithLabelKeys(obsreport.ExporterKey), + metric.WithUnit(metricdata.UnitDimensionless)) +) + +func init() { + metricproducer.GlobalManager().AddProducer(r) +} + // QueueSettings defines configuration for queueing batches before sending to the consumerSender. type QueueSettings struct { // Enabled indicates whether to not enqueue batches before sending to the consumerSender. @@ -79,6 +96,7 @@ func DefaultRetrySettings() RetrySettings { } type queuedRetrySender struct { + fullName string cfg QueueSettings consumerSender requestSender queue *queue.BoundedQueue @@ -111,7 +129,8 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting sampledLogger := createSampledLogger(logger) traceAttr := trace.StringAttribute(obsreport.ExporterKey, fullName) return &queuedRetrySender{ - cfg: qCfg, + fullName: fullName, + cfg: qCfg, consumerSender: &retrySender{ traceAttribute: traceAttr, cfg: rCfg, @@ -127,11 +146,23 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting } // start is invoked during service startup. -func (qrs *queuedRetrySender) start() { +func (qrs *queuedRetrySender) start() error { qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) { req := item.(request) _ = qrs.consumerSender.send(req) }) + + // Start reporting queue length metric + if qrs.cfg.Enabled { + err := queueSizeGauge.UpsertEntry(func() int64 { + return int64(qrs.queue.Size()) + }, metricdata.NewLabelValue(qrs.fullName)) + if err != nil { + return fmt.Errorf("failed to create retry queue size metric: %v", err) + } + } + + return nil } // send implements the requestSender interface @@ -167,6 +198,13 @@ func (qrs *queuedRetrySender) send(req request) error { // shutdown is invoked during service shutdown. func (qrs *queuedRetrySender) shutdown() { + // Cleanup queue metrics reporting + if qrs.cfg.Enabled { + _ = queueSizeGauge.UpsertEntry(func() int64 { + return int64(0) + }, metricdata.NewLabelValue(qrs.fullName)) + } + // First stop the retry goroutines, so that unblocks the queue workers. close(qrs.retryStopCh) diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index ff128672ac9..eb8af0a1df5 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -17,6 +17,7 @@ package exporterhelper import ( "context" "errors" + "fmt" "sync" "sync/atomic" "testing" @@ -24,6 +25,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricproducer" + "go.opencensus.io/tag" "go.uber.org/zap" "go.opentelemetry.io/collector/component/componenttest" @@ -315,6 +319,23 @@ func TestQueuedRetryHappyPath(t *testing.T) { ocs.checkDroppedItemsCount(t, 0) } +func TestQueuedRetry_QueueMetricsReported(t *testing.T) { + qCfg := DefaultQueueSettings() + qCfg.NumConsumers = 0 // to make every request go straight to the queue + rCfg := DefaultRetrySettings() + be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + for i := 0; i < 7; i++ { + be.sender.send(newErrorRequest(context.Background())) + } + + checkValueForProducer(t, defautlExporterTags, int64(7), "exporter/queue_size") +} + func TestNoCancellationContext(t *testing.T) { deadline := time.Now().Add(1 * time.Second) ctx, cancelFunc := context.WithDeadline(context.Background(), deadline) @@ -440,3 +461,40 @@ func (ocs *observabilityConsumerSender) checkSendItemsCount(t *testing.T, want i func (ocs *observabilityConsumerSender) checkDroppedItemsCount(t *testing.T, want int) { assert.EqualValues(t, want, atomic.LoadInt64(&ocs.droppedItemsCount)) } + +// checkValueForProducer checks that the given metrics with wantTags is reported by one of the +// metric producers +func checkValueForProducer(t *testing.T, wantTags []tag.Tag, value int64, vName string) { + producers := metricproducer.GlobalManager().GetAll() + for _, producer := range producers { + for _, metric := range producer.Read() { + if metric.Descriptor.Name == vName && len(metric.TimeSeries) > 0 { + lastValue := metric.TimeSeries[len(metric.TimeSeries)-1] + if tagsMatchLabelKeys(wantTags, metric.Descriptor.LabelKeys, lastValue.LabelValues) { + require.Equal(t, value, lastValue.Points[len(lastValue.Points)-1].Value.(int64)) + return + } + + } + } + } + + require.Fail(t, fmt.Sprintf("could not find metric %v with tags %s reported", vName, wantTags)) +} + +// tagsMatchLabelKeys returns true if provided tags match keys and values +func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []metricdata.LabelValue) bool { + if len(tags) != len(keys) { + return false + } + for i := 0; i < len(tags); i++ { + var labelVal string + if labels[i].Present { + labelVal = labels[i].Value + } + if tags[i].Key.Name() != keys[i].Key || tags[i].Value != labelVal { + return false + } + } + return true +}