Skip to content

Commit

Permalink
Report metric about current size of the exporter retry queue
Browse files Browse the repository at this point in the history
This commit adds observability to queue_retry exporter helper. It adds the first metric "queue_length" that indicates current size of the queue per exporter. The metrics is updated every second.

This is the first commit to address the issue open-telemetry#2434
  • Loading branch information
dmitryax committed Apr 1, 2021
1 parent f16bb4c commit 63ed063
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 9 deletions.
3 changes: 1 addition & 2 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
}

// If no error then start the queuedRetrySender.
be.qrSender.start()
return nil
return be.qrSender.start()
}

// Shutdown all senders and exporter and is invoked during service shutdown.
Expand Down
19 changes: 14 additions & 5 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"go.uber.org/zap"

Expand All @@ -27,12 +28,20 @@ import (
"go.opentelemetry.io/collector/config"
)

var okStatus = trace.Status{Code: trace.StatusCodeOK}
var (
okStatus = trace.Status{Code: trace.StatusCodeOK}

var defaultExporterCfg = &config.ExporterSettings{
TypeVal: "test",
NameVal: "test",
}
defaultExporterName = "test"
defaultExporterCfg = &config.ExporterSettings{
TypeVal: "test",
NameVal: defaultExporterName,
}

exporterTag, _ = tag.NewKey("exporter")
defautlExporterTags = []tag.Tag{
{Key: exporterTag, Value: defaultExporterName},
}
)

func TestErrorToStatus(t *testing.T) {
require.Equal(t, okStatus, errToStatus(nil))
Expand Down
42 changes: 40 additions & 2 deletions exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (

"github.com/cenkalti/backoff/v4"
"github.com/jaegertracing/jaeger/pkg/queue"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -30,6 +33,20 @@ import (
"go.opentelemetry.io/collector/obsreport"
)

var (
r = metric.NewRegistry()

queueSizeGauge, _ = r.AddInt64DerivedGauge(
obsreport.ExporterKey+"/queue_size",
metric.WithDescription("Current size of the retry queue (in batches)"),
metric.WithLabelKeys(obsreport.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))
)

func init() {
metricproducer.GlobalManager().AddProducer(r)
}

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
type QueueSettings struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Expand Down Expand Up @@ -79,6 +96,7 @@ func DefaultRetrySettings() RetrySettings {
}

type queuedRetrySender struct {
fullName string
cfg QueueSettings
consumerSender requestSender
queue *queue.BoundedQueue
Expand Down Expand Up @@ -111,7 +129,8 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting
sampledLogger := createSampledLogger(logger)
traceAttr := trace.StringAttribute(obsreport.ExporterKey, fullName)
return &queuedRetrySender{
cfg: qCfg,
fullName: fullName,
cfg: qCfg,
consumerSender: &retrySender{
traceAttribute: traceAttr,
cfg: rCfg,
Expand All @@ -127,11 +146,23 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting
}

// start is invoked during service startup.
func (qrs *queuedRetrySender) start() {
func (qrs *queuedRetrySender) start() error {
qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) {
req := item.(request)
_ = qrs.consumerSender.send(req)
})

// Start reporting queue length metric
if qrs.cfg.Enabled {
err := queueSizeGauge.UpsertEntry(func() int64 {
return int64(qrs.queue.Size())
}, metricdata.NewLabelValue(qrs.fullName))
if err != nil {
return fmt.Errorf("failed to create retry queue size metric: %v", err)
}
}

return nil
}

// send implements the requestSender interface
Expand Down Expand Up @@ -167,6 +198,13 @@ func (qrs *queuedRetrySender) send(req request) error {

// shutdown is invoked during service shutdown.
func (qrs *queuedRetrySender) shutdown() {
// Cleanup queue metrics reporting
if qrs.cfg.Enabled {
_ = queueSizeGauge.UpsertEntry(func() int64 {
return int64(0)
}, metricdata.NewLabelValue(qrs.fullName))
}

// First stop the retry goroutines, so that unblocks the queue workers.
close(qrs.retryStopCh)

Expand Down
58 changes: 58 additions & 0 deletions exporter/exporterhelper/queued_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ package exporterhelper
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
"go.opencensus.io/tag"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -315,6 +319,23 @@ func TestQueuedRetryHappyPath(t *testing.T) {
ocs.checkDroppedItemsCount(t, 0)
}

func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := DefaultRetrySettings()
be := newBaseExporter(defaultExporterCfg, zap.NewNop(), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

for i := 0; i < 7; i++ {
be.sender.send(newErrorRequest(context.Background()))
}

checkValueForProducer(t, defautlExporterTags, int64(7), "exporter/queue_size")
}

func TestNoCancellationContext(t *testing.T) {
deadline := time.Now().Add(1 * time.Second)
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline)
Expand Down Expand Up @@ -440,3 +461,40 @@ func (ocs *observabilityConsumerSender) checkSendItemsCount(t *testing.T, want i
func (ocs *observabilityConsumerSender) checkDroppedItemsCount(t *testing.T, want int) {
assert.EqualValues(t, want, atomic.LoadInt64(&ocs.droppedItemsCount))
}

// checkValueForProducer checks that the given metrics with wantTags is reported by one of the
// metric producers
func checkValueForProducer(t *testing.T, wantTags []tag.Tag, value int64, vName string) {
producers := metricproducer.GlobalManager().GetAll()
for _, producer := range producers {
for _, metric := range producer.Read() {
if metric.Descriptor.Name == vName && len(metric.TimeSeries) > 0 {
lastValue := metric.TimeSeries[len(metric.TimeSeries)-1]
if tagsMatchLabelKeys(wantTags, metric.Descriptor.LabelKeys, lastValue.LabelValues) {
require.Equal(t, value, lastValue.Points[len(lastValue.Points)-1].Value.(int64))
return
}

}
}
}

require.Fail(t, fmt.Sprintf("could not find metric %v with tags %s reported", vName, wantTags))
}

// tagsMatchLabelKeys returns true if provided tags match keys and values
func tagsMatchLabelKeys(tags []tag.Tag, keys []metricdata.LabelKey, labels []metricdata.LabelValue) bool {
if len(tags) != len(keys) {
return false
}
for i := 0; i < len(tags); i++ {
var labelVal string
if labels[i].Present {
labelVal = labels[i].Value
}
if tags[i].Key.Name() != keys[i].Key || tags[i].Value != labelVal {
return false
}
}
return true
}

0 comments on commit 63ed063

Please sign in to comment.