Skip to content

Commit

Permalink
Report metric about current size of the exporter retry queue (open-te…
Browse files Browse the repository at this point in the history
…lemetry#2858)

This commit adds observability to queue_retry exporter helper. It adds the first metric "queue_length" that indicates current size of the queue per exporter. The metrics is updated every second.

This is the first commit to address the issue open-telemetry#2434
  • Loading branch information
dmitryax authored and pjanotti committed Apr 6, 2021
1 parent 9bf4fe2 commit e1cdc73
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 9 deletions.
3 changes: 1 addition & 2 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
}

// If no error then start the queuedRetrySender.
be.qrSender.start()
return nil
return be.qrSender.start()
}

// Shutdown all senders and exporter and is invoked during service shutdown.
Expand Down
19 changes: 14 additions & 5 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"go.uber.org/zap"

Expand All @@ -27,12 +28,20 @@ import (
"go.opentelemetry.io/collector/config"
)

var okStatus = trace.Status{Code: trace.StatusCodeOK}
var (
okStatus = trace.Status{Code: trace.StatusCodeOK}

var defaultExporterCfg = &config.ExporterSettings{
TypeVal: "test",
NameVal: "test",
}
defaultExporterName = "test"
defaultExporterCfg = &config.ExporterSettings{
TypeVal: "test",
NameVal: defaultExporterName,
}

exporterTag, _ = tag.NewKey("exporter")
defaultExporterTags = []tag.Tag{
{Key: exporterTag, Value: defaultExporterName},
}
)

func TestErrorToStatus(t *testing.T) {
require.Equal(t, okStatus, errToStatus(nil))
Expand Down
42 changes: 40 additions & 2 deletions exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/jaegertracing/jaeger/pkg/queue"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -30,6 +33,20 @@ import (
"go.opentelemetry.io/collector/obsreport"
)

var (
r = metric.NewRegistry()

queueSizeGauge, _ = r.AddInt64DerivedGauge(
obsreport.ExporterKey+"/queue_size",
metric.WithDescription("Current size of the retry queue (in batches)"),
metric.WithLabelKeys(obsreport.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))
)

func init() {
metricproducer.GlobalManager().AddProducer(r)
}

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
type QueueSettings struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Expand Down Expand Up @@ -79,6 +96,7 @@ func DefaultRetrySettings() RetrySettings {
}

type queuedRetrySender struct {
fullName string
cfg QueueSettings
consumerSender requestSender
queue *queue.BoundedQueue
Expand Down Expand Up @@ -111,7 +129,8 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting
sampledLogger := createSampledLogger(logger)
traceAttr := trace.StringAttribute(obsreport.ExporterKey, fullName)
return &queuedRetrySender{
cfg: qCfg,
fullName: fullName,
cfg: qCfg,
consumerSender: &retrySender{
traceAttribute: traceAttr,
cfg: rCfg,
Expand All @@ -127,11 +146,23 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting
}

// start is invoked during service startup.
func (qrs *queuedRetrySender) start() {
func (qrs *queuedRetrySender) start() error {
qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) {
req := item.(request)
_ = qrs.consumerSender.send(req)
})

// Start reporting queue length metric
if qrs.cfg.Enabled {
err := queueSizeGauge.UpsertEntry(func() int64 {
return int64(qrs.queue.Size())
}, metricdata.NewLabelValue(qrs.fullName))
if err != nil {
return fmt.Errorf("failed to create retry queue size metric: %v", err)
}
}

return nil
}

// send implements the requestSender interface
Expand Down Expand Up @@ -167,6 +198,13 @@ func (qrs *queuedRetrySender) send(req request) error {

// shutdown is invoked during service shutdown.
func (qrs *queuedRetrySender) shutdown() {
// Cleanup queue metrics reporting
if qrs.cfg.Enabled {
_ = queueSizeGauge.UpsertEntry(func() int64 {
return int64(0)
}, metricdata.NewLabelValue(qrs.fullName))
}

// First stop the retry goroutines, so that unblocks the queue workers.
close(qrs.retryStopCh)

Expand Down
57 changes: 57 additions & 0 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ package exporterhelper
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/tag"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -315,6 +319,22 @@ func TestQueuedRetryHappyPath(t *testing.T) {
ocs.checkDroppedItemsCount(t, 0)
}

func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

for i := 0; i < 7; i++ {
be.sender.send(newErrorRequest(context.Background()))
}
checkValueForProducer(t, defaultExporterTags, int64(7), "exporter/queue_size")

assert.NoError(t, be.Shutdown(context.Background()))
checkValueForProducer(t, defaultExporterTags, int64(0), "exporter/queue_size")
}

func TestNoCancellationContext(t *testing.T) {
deadline := time.Now().Add(1 * time.Second)
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline)
Expand Down Expand Up @@ -440,3 +460,40 @@ func (ocs *observabilityConsumerSender) checkSendItemsCount(t *testing.T, want i
func (ocs *observabilityConsumerSender) checkDroppedItemsCount(t *testing.T, want int) {
assert.EqualValues(t, want, atomic.LoadInt64(&ocs.droppedItemsCount))
}

// checkValueForProducer checks that the given metrics with wantTags is reported by one of the
// metric producers
func checkValueForProducer(t *testing.T, wantTags []tag.Tag, value int64, vName string) {
producers := metricproducer.GlobalManager().GetAll()
for _, producer := range producers {
for _, metric := range producer.Read() {
if metric.Descriptor.Name == vName && len(metric.TimeSeries) > 0 {
lastValue := metric.TimeSeries[len(metric.TimeSeries)-1]
if tagsMatchLabelKeys(wantTags, metric.Descriptor.LabelKeys, lastValue.LabelValues) {
require.Equal(t, value, lastValue.Points[len(lastValue.Points)-1].Value.(int64))
return
}

}
}
}

require.Fail(t, fmt.Sprintf("could not find metric %v with tags %s reported", vName, wantTags))
}

// tagsMatchLabelKeys returns true if provided tags match keys and values
func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []metricdata.LabelValue) bool {
if len(tags) != len(keys) {
return false
}
for i := 0; i < len(tags); i++ {
var labelVal string
if labels[i].Present {
labelVal = labels[i].Value
}
if tags[i].Key.Name() != keys[i].Key || tags[i].Value != labelVal {
return false
}
}
return true
}

0 comments on commit e1cdc73

Please sign in to comment.