Skip to content

Commit

Permalink
Report metric about current size of the exporter retry queue
Browse files Browse the repository at this point in the history
This commit adds observability to queue_retry exporter helper. It adds the first metric "queue_length" that indicates current size of the queue per exporter. The metrics is updated every second.

This is the first commit to address the issue open-telemetry#2434
  • Loading branch information
dmitryax committed Apr 1, 2021
1 parent a08a650 commit 95c5dcb
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 4 deletions.
3 changes: 1 addition & 2 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
}

// If no error then start the queuedRetrySender.
be.qrSender.start()
return nil
return be.qrSender.start()
}

// Shutdown all senders and exporter and is invoked during service shutdown.
Expand Down
34 changes: 32 additions & 2 deletions exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/jaegertracing/jaeger/pkg/queue"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -30,6 +33,16 @@ import (
"go.opentelemetry.io/collector/obsreport"
)

var (
r = metric.NewRegistry()

queueSizeGauge, _ = r.AddInt64DerivedGauge(
obsreport.ExporterKey+"/queue_size",
metric.WithDescription("Current size of the retry queue (in batches)"),
metric.WithLabelKeys(obsreport.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))
)

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
type QueueSettings struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Expand Down Expand Up @@ -79,6 +92,7 @@ func DefaultRetrySettings() RetrySettings {
}

type queuedRetrySender struct {
fullName string
cfg QueueSettings
consumerSender requestSender
queue *queue.BoundedQueue
Expand Down Expand Up @@ -111,7 +125,8 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting
sampledLogger := createSampledLogger(logger)
traceAttr := trace.StringAttribute(obsreport.ExporterKey, fullName)
return &queuedRetrySender{
cfg: qCfg,
fullName: fullName,
cfg: qCfg,
consumerSender: &retrySender{
traceAttribute: traceAttr,
cfg: rCfg,
Expand All @@ -127,11 +142,24 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting
}

// start is invoked during service startup.
func (qrs *queuedRetrySender) start() {
func (qrs *queuedRetrySender) start() error {
qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) {
req := item.(request)
_ = qrs.consumerSender.send(req)
})

// Start reporting queue length metric
if qrs.cfg.Enabled {
metricproducer.GlobalManager().AddProducer(r)
err := queueSizeGauge.UpsertEntry(func() int64 {
return int64(qrs.queue.Size())
}, metricdata.NewLabelValue(qrs.fullName))
if err != nil {
return fmt.Errorf("failed to create retry queue size metric: %v", err)
}
}

return nil
}

// send implements the requestSender interface
Expand Down Expand Up @@ -167,6 +195,8 @@ func (qrs *queuedRetrySender) send(req request) error {

// shutdown is invoked during service shutdown.
func (qrs *queuedRetrySender) shutdown() {
metricproducer.GlobalManager().DeleteProducer(r)

// First stop the retry goroutines, so that unblocks the queue workers.
close(qrs.retryStopCh)

Expand Down
17 changes: 17 additions & 0 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,23 @@ func TestQueuedRetryHappyPath(t *testing.T) {
ocs.checkDroppedItemsCount(t, 0)
}

func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

for i := 0; i < 7; i++ {
be.sender.send(newErrorRequest(context.Background()))
}

obsreporttest.CheckExporterQueueRetryViews(t, defaultExporterCfg.Name(), int64(7))
}

func TestNoCancellationContext(t *testing.T) {
deadline := time.Now().Add(1 * time.Second)
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline)
Expand Down
36 changes: 36 additions & 0 deletions local/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
extensions:
zpages:
endpoint: 0.0.0.0:55679

receivers:
otlp:
protocols:
grpc:
opencensus:
jaeger:
protocols:
grpc:
thrift_http:
zipkin:

processors:
batch: {}
memory_limiter:
ballast_size_mib: 2000
check_interval: 1s
limit_mib: 4000
spike_limit_mib: 800

exporters:
logging:
otlp:
endpoint: localhost:1234

service:
pipelines:
traces:
receivers: [zipkin]
processors: [memory_limiter]
exporters: [logging, otlp]

extensions: [zpages]
47 changes: 47 additions & 0 deletions obsreport/obsreporttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package obsreporttest

import (
"fmt"
"reflect"
"sort"
"testing"

"github.com/stretchr/testify/require"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"

Expand Down Expand Up @@ -79,6 +82,13 @@ func CheckExporterLogsViews(t *testing.T, exporter string, acceptedLogRecords, d
checkValueForView(t, exporterTags, droppedLogRecords, "exporter/send_failed_log_records")
}

// CheckExporterQueueRetryViews checks that for the current exported values for queue retry views match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckExporterQueueRetryViews(t *testing.T, exporter string, queueLength int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForProducer(t, exporterTags, queueLength, "exporter/queue_size")
}

// CheckProcessorTracesViews checks that for the current exported values for trace exporter views match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func CheckProcessorTracesViews(t *testing.T, processor string, acceptedSpans, refusedSpans, droppedSpans int64) {
Expand Down Expand Up @@ -160,6 +170,43 @@ func checkValueForView(t *testing.T, wantTags []tag.Tag, value int64, vName stri
require.Failf(t, "could not find tags", "wantTags: %s in rows %v", wantTags, rows)
}

// checkValueForProducer checks that the given metrics with wantTags is reported by one of the
// metric producers
func checkValueForProducer(t *testing.T, wantTags []tag.Tag, value int64, vName string) {
producers := metricproducer.GlobalManager().GetAll()
for _, producer := range producers {
for _, metric := range producer.Read() {
if metric.Descriptor.Name == vName && len(metric.TimeSeries) > 0 {
lastValue := metric.TimeSeries[len(metric.TimeSeries)-1]
if tagsMatchLabelKeys(wantTags, metric.Descriptor.LabelKeys, lastValue.LabelValues) {
require.Equal(t, value, lastValue.Points[len(lastValue.Points)-1].Value.(int64))
return
}

}
}
}

require.Fail(t, fmt.Sprintf("could not find metric %v with tags %s reported", vName, wantTags))
}

// tagsMatchLabelKeys returns true if provided tags match keys and values
func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []metricdata.LabelValue) bool {
if len(tags) != len(keys) {
return false
}
for i := 0; i < len(tags); i++ {
var labelVal string
if labels[i].Present {
labelVal = labels[i].Value
}
if tags[i].Key.Name() != keys[i].Key || tags[i].Value != labelVal {
return false
}
}
return true
}

// tagsForReceiverView returns the tags that are needed for the receiver views.
func tagsForReceiverView(receiver, transport string) []tag.Tag {
tags := make([]tag.Tag, 0, 2)
Expand Down
55 changes: 55 additions & 0 deletions sample zipkin
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
curl -vs localhost:9411/api/v1/spans -H'Content-type: application/json' -H 'Expect:' -d '[
{
"traceId": "5e1b76cb257aa6fd",
"name": "app - root span",
"id": "168ba9a2869c3ae1",
"timestamp": 1473066067938000,
"duration": 484655,
"annotations": [
{
"timestamp": 1473066067938000,
"value": "sr",
"endpoint": {
"serviceName": "app",
"ipv4": "0.0.0.0"
}
},
{
"timestamp": 1473066068422655,
"value": "ss",
"endpoint": {
"serviceName": "app",
"ipv4": "0.0.0.0"
}
}
],
"binaryAnnotations": []
},
{
"traceId": "5e1b76cb257aa6fd",
"name": "app test - get",
"id": "fbbff4adc94c01cb",
"parentId": "168ba9a2869c3ae1",
"timestamp": 1473066067939000,
"duration": 483823,
"annotations": [],
"binaryAnnotations": [
{
"key": "error",
"value": "test",
"endpoint": {
"serviceName": "app",
"ipv4": "0.0.0.0"
}
},
{
"key": "lc",
"value": "Application",
"endpoint": {
"serviceName": "app",
"ipv4": "0.0.0.0"
}
}
]
}
]'

0 comments on commit 95c5dcb

Please sign in to comment.