From 95c5dcba5b1419e9dbdaa4faaec93de0ea7b022f Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 30 Mar 2021 14:25:05 -0700 Subject: [PATCH] Report metric about current size of the exporter retry queue This commit adds observability to queue_retry exporter helper. It adds the first metric "queue_length" that indicates current size of the queue per exporter. The metrics is updated every second. This is the first commit to address the issue https://github.com/open-telemetry/opentelemetry-collector/issues/2434 --- exporter/exporterhelper/common.go | 3 +- exporter/exporterhelper/queued_retry.go | 34 +++++++++++- exporter/exporterhelper/queued_retry_test.go | 17 ++++++ local/config.yaml | 36 +++++++++++++ obsreport/obsreporttest/obsreporttest.go | 47 +++++++++++++++++ sample zipkin | 55 ++++++++++++++++++++ 6 files changed, 188 insertions(+), 4 deletions(-) create mode 100644 local/config.yaml create mode 100644 sample zipkin diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 61413852f6f..1a29846db69 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -186,8 +186,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error { } // If no error then start the queuedRetrySender. - be.qrSender.start() - return nil + return be.qrSender.start() } // Shutdown all senders and exporter and is invoked during service shutdown. diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 106122182e2..3916a89186c 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -22,6 +22,9 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/jaegertracing/jaeger/pkg/queue" + "go.opencensus.io/metric" + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricproducer" "go.opencensus.io/trace" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -30,6 +33,16 @@ import ( "go.opentelemetry.io/collector/obsreport" ) +var ( + r = metric.NewRegistry() + + queueSizeGauge, _ = r.AddInt64DerivedGauge( + obsreport.ExporterKey+"/queue_size", + metric.WithDescription("Current size of the retry queue (in batches)"), + metric.WithLabelKeys(obsreport.ExporterKey), + metric.WithUnit(metricdata.UnitDimensionless)) +) + // QueueSettings defines configuration for queueing batches before sending to the consumerSender. type QueueSettings struct { // Enabled indicates whether to not enqueue batches before sending to the consumerSender. @@ -79,6 +92,7 @@ func DefaultRetrySettings() RetrySettings { } type queuedRetrySender struct { + fullName string cfg QueueSettings consumerSender requestSender queue *queue.BoundedQueue @@ -111,7 +125,8 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting sampledLogger := createSampledLogger(logger) traceAttr := trace.StringAttribute(obsreport.ExporterKey, fullName) return &queuedRetrySender{ - cfg: qCfg, + fullName: fullName, + cfg: qCfg, consumerSender: &retrySender{ traceAttribute: traceAttr, cfg: rCfg, @@ -127,11 +142,24 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting } // start is invoked during service startup. -func (qrs *queuedRetrySender) start() { +func (qrs *queuedRetrySender) start() error { qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) { req := item.(request) _ = qrs.consumerSender.send(req) }) + + // Start reporting queue length metric + if qrs.cfg.Enabled { + metricproducer.GlobalManager().AddProducer(r) + err := queueSizeGauge.UpsertEntry(func() int64 { + return int64(qrs.queue.Size()) + }, metricdata.NewLabelValue(qrs.fullName)) + if err != nil { + return fmt.Errorf("failed to create retry queue size metric: %v", err) + } + } + + return nil } // send implements the requestSender interface @@ -167,6 +195,8 @@ func (qrs *queuedRetrySender) send(req request) error { // shutdown is invoked during service shutdown. func (qrs *queuedRetrySender) shutdown() { + metricproducer.GlobalManager().DeleteProducer(r) + // First stop the retry goroutines, so that unblocks the queue workers. close(qrs.retryStopCh) diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index ff128672ac9..271baeac061 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -315,6 +315,23 @@ func TestQueuedRetryHappyPath(t *testing.T) { ocs.checkDroppedItemsCount(t, 0) } +func TestQueuedRetry_QueueMetricsReported(t *testing.T) { + qCfg := DefaultQueueSettings() + qCfg.NumConsumers = 0 // to make every request go straight to the queue + rCfg := DefaultRetrySettings() + be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + for i := 0; i < 7; i++ { + be.sender.send(newErrorRequest(context.Background())) + } + + obsreporttest.CheckExporterQueueRetryViews(t, defaultExporterCfg.Name(), int64(7)) +} + func TestNoCancellationContext(t *testing.T) { deadline := time.Now().Add(1 * time.Second) ctx, cancelFunc := context.WithDeadline(context.Background(), deadline) diff --git a/local/config.yaml b/local/config.yaml new file mode 100644 index 00000000000..05209845d1e --- /dev/null +++ b/local/config.yaml @@ -0,0 +1,36 @@ +extensions: + zpages: + endpoint: 0.0.0.0:55679 + +receivers: + otlp: + protocols: + grpc: + opencensus: + jaeger: + protocols: + grpc: + thrift_http: + zipkin: + +processors: + batch: {} + memory_limiter: + ballast_size_mib: 2000 + check_interval: 1s + limit_mib: 4000 + spike_limit_mib: 800 + +exporters: + logging: + otlp: + endpoint: localhost:1234 + +service: + pipelines: + traces: + receivers: [zipkin] + processors: [memory_limiter] + exporters: [logging, otlp] + + extensions: [zpages] diff --git a/obsreport/obsreporttest/obsreporttest.go b/obsreport/obsreporttest/obsreporttest.go index e0b9cc98a86..d098b552400 100644 --- a/obsreport/obsreporttest/obsreporttest.go +++ b/obsreport/obsreporttest/obsreporttest.go @@ -15,11 +15,14 @@ package obsreporttest import ( + "fmt" "reflect" "sort" "testing" "github.com/stretchr/testify/require" + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricproducer" "go.opencensus.io/stats/view" "go.opencensus.io/tag" @@ -79,6 +82,13 @@ func CheckExporterLogsViews(t *testing.T, exporter string, acceptedLogRecords, d checkValueForView(t, exporterTags, droppedLogRecords, "exporter/send_failed_log_records") } +// CheckExporterQueueRetryViews checks that for the current exported values for queue retry views match given values. +// When this function is called it is required to also call SetupRecordedMetricsTest as first thing. +func CheckExporterQueueRetryViews(t *testing.T, exporter string, queueLength int64) { + exporterTags := tagsForExporterView(exporter) + checkValueForProducer(t, exporterTags, queueLength, "exporter/queue_size") +} + // CheckProcessorTracesViews checks that for the current exported values for trace exporter views match given values. // When this function is called it is required to also call SetupRecordedMetricsTest as first thing. func CheckProcessorTracesViews(t *testing.T, processor string, acceptedSpans, refusedSpans, droppedSpans int64) { @@ -160,6 +170,43 @@ func checkValueForView(t *testing.T, wantTags []tag.Tag, value int64, vName stri require.Failf(t, "could not find tags", "wantTags: %s in rows %v", wantTags, rows) } +// checkValueForProducer checks that the given metrics with wantTags is reported by one of the +// metric producers +func checkValueForProducer(t *testing.T, wantTags []tag.Tag, value int64, vName string) { + producers := metricproducer.GlobalManager().GetAll() + for _, producer := range producers { + for _, metric := range producer.Read() { + if metric.Descriptor.Name == vName && len(metric.TimeSeries) > 0 { + lastValue := metric.TimeSeries[len(metric.TimeSeries)-1] + if tagsMatchLabelKeys(wantTags, metric.Descriptor.LabelKeys, lastValue.LabelValues) { + require.Equal(t, value, lastValue.Points[len(lastValue.Points)-1].Value.(int64)) + return + } + + } + } + } + + require.Fail(t, fmt.Sprintf("could not find metric %v with tags %s reported", vName, wantTags)) +} + +// tagsMatchLabelKeys returns true if provided tags match keys and values +func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []metricdata.LabelValue) bool { + if len(tags) != len(keys) { + return false + } + for i := 0; i < len(tags); i++ { + var labelVal string + if labels[i].Present { + labelVal = labels[i].Value + } + if tags[i].Key.Name() != keys[i].Key || tags[i].Value != labelVal { + return false + } + } + return true +} + // tagsForReceiverView returns the tags that are needed for the receiver views. func tagsForReceiverView(receiver, transport string) []tag.Tag { tags := make([]tag.Tag, 0, 2) diff --git a/sample zipkin b/sample zipkin new file mode 100644 index 00000000000..19a7f675cf0 --- /dev/null +++ b/sample zipkin @@ -0,0 +1,55 @@ +curl -vs localhost:9411/api/v1/spans -H'Content-type: application/json' -H 'Expect:' -d '[ + { + "traceId": "5e1b76cb257aa6fd", + "name": "app - root span", + "id": "168ba9a2869c3ae1", + "timestamp": 1473066067938000, + "duration": 484655, + "annotations": [ + { + "timestamp": 1473066067938000, + "value": "sr", + "endpoint": { + "serviceName": "app", + "ipv4": "0.0.0.0" + } + }, + { + "timestamp": 1473066068422655, + "value": "ss", + "endpoint": { + "serviceName": "app", + "ipv4": "0.0.0.0" + } + } + ], + "binaryAnnotations": [] + }, + { + "traceId": "5e1b76cb257aa6fd", + "name": "app test - get", + "id": "fbbff4adc94c01cb", + "parentId": "168ba9a2869c3ae1", + "timestamp": 1473066067939000, + "duration": 483823, + "annotations": [], + "binaryAnnotations": [ + { + "key": "error", + "value": "test", + "endpoint": { + "serviceName": "app", + "ipv4": "0.0.0.0" + } + }, + { + "key": "lc", + "value": "Application", + "endpoint": { + "serviceName": "app", + "ipv4": "0.0.0.0" + } + } + ] + } +]'