Skip to content

Commit

Permalink
Add telemetry for dropped data due to exporter sending queue overflow
Browse files Browse the repository at this point in the history
This change adds internal metrics for dropped spans, metric points and log records when exporter sending queue is full:
- exporter/enqueue_failed_metric_points
- exporter/enqueue_failed_spans
- exporter/enqueue_failed_log_records
  • Loading branch information
dmitryax committed Jun 1, 2021
1 parent 917be66 commit e6c6e57
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
## 💡 Enhancements 💡

- Add `doc.go` files to the consumer package and its subpackages (#3270)
- Add telemetry for dropped data due to exporter sending queue overflow (#3328)

## v0.27.0 Beta

Expand Down
7 changes: 7 additions & 0 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/obsreport"
)

// TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend.
Expand Down Expand Up @@ -164,6 +166,7 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel
// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.Component
obsrep *obsreport.Exporter
sender requestSender
qrSender *queuedRetrySender
}
Expand All @@ -173,6 +176,10 @@ func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings)
Component: componenthelper.New(bs.componentOptions...),
}

be.obsrep = obsreport.NewExporter(obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterID: cfg.ID(),
})
be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
be.sender = be.qrSender

Expand Down
14 changes: 8 additions & 6 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package exporterhelper

import (
"context"
"errors"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
Expand Down Expand Up @@ -87,16 +87,18 @@ func NewLogsExporter(
be := newBaseExporter(cfg, logger, bs)
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &logsExporterWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterID: cfg.ID(),
}),
obsrep: be.obsrep,
nextSender: nextSender,
}
})

lc, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error {
return be.sender.send(newLogsRequest(ctx, ld, pusher))
req := newLogsRequest(ctx, ld, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
be.obsrep.RecordLogsEnqueueFailure(req.context(), req.count())
}
return err
}, bs.consumerOptions...)

return &logsExporter{
Expand Down
24 changes: 24 additions & 0 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,30 @@ func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) {
checkRecordedMetricsForLogsExporter(t, le, want)
}

func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

rCfg := DefaultRetrySettings()
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
te, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(wantErr), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NotNil(t, te)

md := testdata.GenerateLogsTwoLogRecordsSameResourceOneDifferent()
const numBatches = 7
for i := 0; i < numBatches; i++ {
te.ConsumeLogs(context.Background(), md)
}

// 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow
obsreporttest.CheckExporterEnqueueFailedLogs(t, fakeLogsExporterName, int64(15))
}

func TestLogsExporter_WithSpan(t *testing.T) {
le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil))
require.Nil(t, err)
Expand Down
14 changes: 8 additions & 6 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package exporterhelper

import (
"context"
"errors"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
Expand Down Expand Up @@ -88,10 +88,7 @@ func NewMetricsExporter(
be := newBaseExporter(cfg, logger, bs)
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &metricsSenderWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterID: cfg.ID(),
}),
obsrep: be.obsrep,
nextSender: nextSender,
}
})
Expand All @@ -100,7 +97,12 @@ func NewMetricsExporter(
if bs.ResourceToTelemetrySettings.Enabled {
md = convertResourceToLabels(md)
}
return be.sender.send(newMetricsRequest(ctx, md, pusher))
req := newMetricsRequest(ctx, md, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
be.obsrep.RecordMetricsEnqueueFailure(req.context(), req.count())
}
return err
}, bs.consumerOptions...)

return &metricsExporter{
Expand Down
24 changes: 24 additions & 0 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,30 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
checkRecordedMetricsForMetricsExporter(t, me, want)
}

func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

rCfg := DefaultRetrySettings()
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
te, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(wantErr), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NotNil(t, te)

md := testdata.GenerateMetricsOneMetricOneDataPoint()
const numBatches = 7
for i := 0; i < numBatches; i++ {
te.ConsumeMetrics(context.Background(), md)
}

// 2 batched must be in queue, and 5 metric points rejected due to queue overflow
obsreporttest.CheckExporterEnqueueFailedMetrics(t, fakeMetricsExporterName, int64(5))
}

func TestMetricsExporter_WithSpan(t *testing.T) {
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
require.NoError(t, err)
Expand Down
4 changes: 3 additions & 1 deletion exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var (
metric.WithDescription("Current size of the retry queue (in batches)"),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

errSendingQueueIsFull = errors.New("sending_queue is full")
)

func init() {
Expand Down Expand Up @@ -189,7 +191,7 @@ func (qrs *queuedRetrySender) send(req request) error {
zap.Int("dropped_items", req.count()),
)
span.Annotate(qrs.traceAttributes, "Dropped item, sending_queue is full.")
return errors.New("sending_queue is full")
return errSendingQueueIsFull
}

span.Annotate(qrs.traceAttributes, "Enqueued item.")
Expand Down
15 changes: 8 additions & 7 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package exporterhelper

import (
"context"
"errors"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
Expand Down Expand Up @@ -88,17 +88,18 @@ func NewTracesExporter(
be := newBaseExporter(cfg, logger, bs)
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &tracesExporterWithObservability{
obsrep: obsreport.NewExporter(
obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterID: cfg.ID(),
}),
obsrep: be.obsrep,
nextSender: nextSender,
}
})

tc, err := consumerhelper.NewTraces(func(ctx context.Context, td pdata.Traces) error {
return be.sender.send(newTracesRequest(ctx, td, pusher))
req := newTracesRequest(ctx, td, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
be.obsrep.RecordTracesEnqueueFailure(req.context(), req.count())
}
return err
}, bs.consumerOptions...)

return &traceExporter{
Expand Down
24 changes: 24 additions & 0 deletions exporter/exporterhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,30 @@ func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) {
checkRecordedMetricsForTracesExporter(t, te, want)
}

func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

rCfg := DefaultRetrySettings()
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(wantErr), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NotNil(t, te)

td := testdata.GenerateTracesTwoSpansSameResource()
const numBatches = 7
for i := 0; i < numBatches; i++ {
te.ConsumeTraces(context.Background(), td)
}

// 2 batched must be in queue, and 5 batches (10 spans) rejected due to queue overflow
obsreporttest.CheckExporterEnqueueFailedTraces(t, fakeTracesExporterName, int64(10))
}

func TestTracesExporter_WithSpan(t *testing.T) {
te, err := NewTracesExporter(&fakeTracesExporterConfig, zap.NewNop(), newTraceDataPusher(nil))
require.NoError(t, err)
Expand Down
18 changes: 18 additions & 0 deletions internal/obsreportconfig/obsmetrics/obs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,22 @@ const (
SentSpansKey = "sent_spans"
// FailedToSendSpansKey used to track spans that failed to be sent by exporters.
FailedToSendSpansKey = "send_failed_spans"
// FailedToEnqueueSpansKey used to track spans that failed to be added to the sending queue.
FailedToEnqueueSpansKey = "enqueue_failed_spans"

// SentMetricPointsKey used to track metric points sent by exporters.
SentMetricPointsKey = "sent_metric_points"
// FailedToSendMetricPointsKey used to track metric points that failed to be sent by exporters.
FailedToSendMetricPointsKey = "send_failed_metric_points"
// FailedToEnqueueMetricPointsKey used to track metric points that failed to be added to the sending queue.
FailedToEnqueueMetricPointsKey = "enqueue_failed_metric_points"

// SentLogRecordsKey used to track logs sent by exporters.
SentLogRecordsKey = "sent_log_records"
// FailedToSendLogRecordsKey used to track logs that failed to be sent by exporters.
FailedToSendLogRecordsKey = "send_failed_log_records"
// FailedToEnqueueLogRecordsKey used to track logs records that failed to be added to the sending queue.
FailedToEnqueueLogRecordsKey = "enqueue_failed_log_records"
)

var (
Expand All @@ -60,6 +66,10 @@ var (
ExporterPrefix+FailedToSendSpansKey,
"Number of spans in failed attempts to send to destination.",
stats.UnitDimensionless)
ExporterFailedToEnqueueSpans = stats.Int64(
ExporterPrefix+FailedToEnqueueSpansKey,
"Number of spans failed to be added to the sending queue.",
stats.UnitDimensionless)
ExporterSentMetricPoints = stats.Int64(
ExporterPrefix+SentMetricPointsKey,
"Number of metric points successfully sent to destination.",
Expand All @@ -68,6 +78,10 @@ var (
ExporterPrefix+FailedToSendMetricPointsKey,
"Number of metric points in failed attempts to send to destination.",
stats.UnitDimensionless)
ExporterFailedToEnqueueMetricPoints = stats.Int64(
ExporterPrefix+FailedToEnqueueMetricPointsKey,
"Number of metric points failed to be added to the sending queue.",
stats.UnitDimensionless)
ExporterSentLogRecords = stats.Int64(
ExporterPrefix+SentLogRecordsKey,
"Number of log record successfully sent to destination.",
Expand All @@ -76,4 +90,8 @@ var (
ExporterPrefix+FailedToSendLogRecordsKey,
"Number of log records in failed attempts to send to destination.",
stats.UnitDimensionless)
ExporterFailedToEnqueueLogRecords = stats.Int64(
ExporterPrefix+FailedToEnqueueLogRecordsKey,
"Number of log records failed to be added to the sending queue.",
stats.UnitDimensionless)
)
3 changes: 3 additions & 0 deletions internal/obsreportconfig/obsreportconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,13 @@ func allViews() *ObsMetrics {
measures = []*stats.Int64Measure{
obsmetrics.ExporterSentSpans,
obsmetrics.ExporterFailedToSendSpans,
obsmetrics.ExporterFailedToEnqueueSpans,
obsmetrics.ExporterSentMetricPoints,
obsmetrics.ExporterFailedToSendMetricPoints,
obsmetrics.ExporterFailedToEnqueueMetricPoints,
obsmetrics.ExporterSentLogRecords,
obsmetrics.ExporterFailedToSendLogRecords,
obsmetrics.ExporterFailedToEnqueueLogRecords,
}
tagKeys = []tag.Key{obsmetrics.TagKeyExporter}
views = append(views, genViews(measures, tagKeys, view.Sum())...)
Expand Down
15 changes: 15 additions & 0 deletions obsreport/obsreport_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func (eor *Exporter) EndTracesOp(ctx context.Context, numSpans int, err error) {
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentSpansKey, obsmetrics.FailedToSendSpansKey)
}

// RecordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
func (eor *Exporter) RecordTracesEnqueueFailure(ctx context.Context, numSpans int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans)))
}

// StartMetricsOp is called at the start of an Export operation.
// The returned context should be used in other calls to the Exporter functions
// dealing with the same export operation.
Expand All @@ -78,6 +83,11 @@ func (eor *Exporter) EndMetricsOp(ctx context.Context, numMetricPoints int, err
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentMetricPointsKey, obsmetrics.FailedToSendMetricPointsKey)
}

// RecordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
func (eor *Exporter) RecordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints)))
}

// StartLogsOp is called at the start of an Export operation.
// The returned context should be used in other calls to the Exporter functions
// dealing with the same export operation.
Expand All @@ -92,6 +102,11 @@ func (eor *Exporter) EndLogsOp(ctx context.Context, numLogRecords int, err error
endSpan(ctx, err, numSent, numFailedToSend, obsmetrics.SentLogRecordsKey, obsmetrics.FailedToSendLogRecordsKey)
}

// RecordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
func (eor *Exporter) RecordLogsEnqueueFailure(ctx context.Context, numLogRecords int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords)))
}

// startSpan creates the span used to trace the operation. Returning
// the updated context and the created span.
func (eor *Exporter) startSpan(ctx context.Context, operationSuffix string) context.Context {
Expand Down
20 changes: 20 additions & 0 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,26 @@ func TestExportLogsOp(t *testing.T) {
obsreporttest.CheckExporterLogs(t, exporter, int64(sentLogRecords), int64(failedToSendLogRecords))
}

func TestExportEnqueueFailure(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

obsrep := NewExporter(ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})

logRecords := 7
obsrep.RecordLogsEnqueueFailure(context.Background(), logRecords)
obsreporttest.CheckExporterEnqueueFailedLogs(t, exporter, int64(logRecords))

spans := 12
obsrep.RecordTracesEnqueueFailure(context.Background(), spans)
obsreporttest.CheckExporterEnqueueFailedTraces(t, exporter, int64(spans))

metricPoints := 21
obsrep.RecordMetricsEnqueueFailure(context.Background(), metricPoints)
obsreporttest.CheckExporterEnqueueFailedMetrics(t, exporter, int64(metricPoints))
}

func TestReceiveWithLongLivedCtx(t *testing.T) {
ss := &spanStore{}
trace.RegisterExporter(ss)
Expand Down
Loading

0 comments on commit e6c6e57

Please sign in to comment.