Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add telemetry for dropped data due to exporter sending queue overflow #3328

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- Add `doc.go` files to the consumer package and its subpackages (#3270)
- Automate triggering of doc-update on release (#3234)
- Enable Dependabot for Github Actions (#3312)
- Add telemetry for dropped data due to exporter sending queue overflow (#3328)

## v0.27.0 Beta

Expand Down
7 changes: 7 additions & 0 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/obsreport"
)

// TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend.
Expand Down Expand Up @@ -164,6 +166,7 @@ func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTel
// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.Component
obsrep *obsExporter
sender requestSender
qrSender *queuedRetrySender
}
Expand All @@ -173,6 +176,10 @@ func newBaseExporter(cfg config.Exporter, logger *zap.Logger, bs *baseSettings)
Component: componenthelper.New(bs.componentOptions...),
}

be.obsrep = newObsExporter(obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterID: cfg.ID(),
})
be.qrSender = newQueuedRetrySender(cfg.ID().String(), bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
be.sender = be.qrSender

Expand Down
17 changes: 9 additions & 8 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ package exporterhelper

import (
"context"
"errors"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
)

type logsRequest struct {
Expand Down Expand Up @@ -87,16 +86,18 @@ func NewLogsExporter(
be := newBaseExporter(cfg, logger, bs)
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &logsExporterWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterID: cfg.ID(),
}),
obsrep: be.obsrep,
nextSender: nextSender,
}
})

lc, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error {
return be.sender.send(newLogsRequest(ctx, ld, pusher))
req := newLogsRequest(ctx, ld, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(req.context(), req.count())
}
return err
}, bs.consumerOptions...)

return &logsExporter{
Expand All @@ -106,7 +107,7 @@ func NewLogsExporter(
}

type logsExporterWithObservability struct {
obsrep *obsreport.Exporter
obsrep *obsExporter
nextSender requestSender
}

Expand Down
24 changes: 24 additions & 0 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,30 @@ func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) {
checkRecordedMetricsForLogsExporter(t, le, want)
}

func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

rCfg := DefaultRetrySettings()
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
te, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(wantErr), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NotNil(t, te)

md := testdata.GenerateLogsTwoLogRecordsSameResourceOneDifferent()
const numBatches = 7
for i := 0; i < numBatches; i++ {
te.ConsumeLogs(context.Background(), md)
}

// 2 batched must be in queue, and 5 batches (15 log records) rejected due to queue overflow
checkExporterEnqueueFailedLogsStats(t, fakeLogsExporterName, int64(15))
}

func TestLogsExporter_WithSpan(t *testing.T) {
le, err := NewLogsExporter(&fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil))
require.Nil(t, err)
Expand Down
17 changes: 9 additions & 8 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ package exporterhelper

import (
"context"
"errors"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
)

type metricsRequest struct {
Expand Down Expand Up @@ -88,10 +87,7 @@ func NewMetricsExporter(
be := newBaseExporter(cfg, logger, bs)
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &metricsSenderWithObservability{
obsrep: obsreport.NewExporter(obsreport.ExporterSettings{
Level: configtelemetry.GetMetricsLevelFlagValue(),
ExporterID: cfg.ID(),
}),
obsrep: be.obsrep,
nextSender: nextSender,
}
})
Expand All @@ -100,7 +96,12 @@ func NewMetricsExporter(
if bs.ResourceToTelemetrySettings.Enabled {
md = convertResourceToLabels(md)
}
return be.sender.send(newMetricsRequest(ctx, md, pusher))
req := newMetricsRequest(ctx, md, pusher)
err := be.sender.send(req)
if errors.Is(err, errSendingQueueIsFull) {
be.obsrep.recordMetricsEnqueueFailure(req.context(), req.count())
}
return err
}, bs.consumerOptions...)

return &metricsExporter{
Expand All @@ -110,7 +111,7 @@ func NewMetricsExporter(
}

type metricsSenderWithObservability struct {
obsrep *obsreport.Exporter
obsrep *obsExporter
nextSender requestSender
}

Expand Down
24 changes: 24 additions & 0 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,30 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
checkRecordedMetricsForMetricsExporter(t, me, want)
}

func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

rCfg := DefaultRetrySettings()
qCfg := DefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
te, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(wantErr), WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NotNil(t, te)

md := testdata.GenerateMetricsOneMetricOneDataPoint()
const numBatches = 7
for i := 0; i < numBatches; i++ {
te.ConsumeMetrics(context.Background(), md)
}

// 2 batched must be in queue, and 5 metric points rejected due to queue overflow
checkExporterEnqueueFailedMetricsStats(t, fakeMetricsExporterName, int64(5))
}

func TestMetricsExporter_WithSpan(t *testing.T) {
me, err := NewMetricsExporter(&fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil))
require.NoError(t, err)
Expand Down
58 changes: 58 additions & 0 deletions exporter/exporterhelper/obsreport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporterhelper

import (
"context"

"go.opencensus.io/stats"
"go.opencensus.io/tag"

"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/obsreport"
)

// TODO: Incorporate this functionality along with tests from obsreport_test.go
// into existing `obsreport` package once its functionally is not exposed
// as public API. For now this part is kept private.

// obsExporter is a helper to add observability to a component.Exporter.
type obsExporter struct {
*obsreport.Exporter
mutators []tag.Mutator
}

// newObsExporter creates a new observability exporter.
func newObsExporter(cfg obsreport.ExporterSettings) *obsExporter {
return &obsExporter{
obsreport.NewExporter(cfg),
[]tag.Mutator{tag.Upsert(obsmetrics.TagKeyExporter, cfg.ExporterID.String(), tag.WithTTL(tag.TTLNoPropagation))},
}
}

// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
func (eor *obsExporter) recordTracesEnqueueFailure(ctx context.Context, numSpans int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueSpans.M(int64(numSpans)))
}

// recordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
func (eor *obsExporter) recordMetricsEnqueueFailure(ctx context.Context, numMetricPoints int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueMetricPoints.M(int64(numMetricPoints)))
}

// recordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
func (eor *obsExporter) recordLogsEnqueueFailure(ctx context.Context, numLogRecords int) {
_ = stats.RecordWithTags(ctx, eor.mutators, obsmetrics.ExporterFailedToEnqueueLogRecords.M(int64(numLogRecords)))
}
109 changes: 109 additions & 0 deletions exporter/exporterhelper/obsreport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package exporterhelper

import (
"context"
"reflect"
"sort"
"testing"

"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)

func TestExportEnqueueFailure(t *testing.T) {
doneFn, err := obsreporttest.SetupRecordedMetricsTest()
require.NoError(t, err)
defer doneFn()

exporter := config.NewID("fakeExporter")

obsrep := newObsExporter(obsreport.ExporterSettings{Level: configtelemetry.LevelNormal, ExporterID: exporter})

logRecords := 7
obsrep.recordLogsEnqueueFailure(context.Background(), logRecords)
checkExporterEnqueueFailedLogsStats(t, exporter, int64(logRecords))

spans := 12
obsrep.recordTracesEnqueueFailure(context.Background(), spans)
checkExporterEnqueueFailedTracesStats(t, exporter, int64(spans))

metricPoints := 21
obsrep.recordMetricsEnqueueFailure(context.Background(), metricPoints)
checkExporterEnqueueFailedMetricsStats(t, exporter, int64(metricPoints))
}

// checkExporterEnqueueFailedTracesStats checks that reported number of spans failed to enqueue match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func checkExporterEnqueueFailedTracesStats(t *testing.T, exporter config.ComponentID, spans int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, spans, "exporter/enqueue_failed_spans")
}

// checkExporterEnqueueFailedMetricsStats checks that reported number of metric points failed to enqueue match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func checkExporterEnqueueFailedMetricsStats(t *testing.T, exporter config.ComponentID, metricPoints int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, metricPoints, "exporter/enqueue_failed_metric_points")
}

// checkExporterEnqueueFailedLogsStats checks that reported number of log records failed to enqueue match given values.
// When this function is called it is required to also call SetupRecordedMetricsTest as first thing.
func checkExporterEnqueueFailedLogsStats(t *testing.T, exporter config.ComponentID, logRecords int64) {
exporterTags := tagsForExporterView(exporter)
checkValueForView(t, exporterTags, logRecords, "exporter/enqueue_failed_log_records")
}

// checkValueForView checks that for the current exported value in the view with the given name
// for {LegacyTagKeyReceiver: receiverName} is equal to "value".
func checkValueForView(t *testing.T, wantTags []tag.Tag, value int64, vName string) {
// Make sure the tags slice is sorted by tag keys.
sortTags(wantTags)

rows, err := view.RetrieveData(vName)
require.NoError(t, err)

for _, row := range rows {
// Make sure the tags slice is sorted by tag keys.
sortTags(row.Tags)
if reflect.DeepEqual(wantTags, row.Tags) {
sum := row.Data.(*view.SumData)
require.Equal(t, float64(value), sum.Value)
return
}
}

require.Failf(t, "could not find tags", "wantTags: %s in rows %v", wantTags, rows)
}

// tagsForExporterView returns the tags that are needed for the exporter views.
func tagsForExporterView(exporter config.ComponentID) []tag.Tag {
return []tag.Tag{
{Key: exporterTag, Value: exporter.String()},
}
}

func sortTags(tags []tag.Tag) {
sort.SliceStable(tags, func(i, j int) bool {
return tags[i].Key.Name() < tags[j].Key.Name()
})
}
4 changes: 3 additions & 1 deletion exporter/exporterhelper/queued_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var (
metric.WithDescription("Current size of the retry queue (in batches)"),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

errSendingQueueIsFull = errors.New("sending_queue is full")
)

func init() {
Expand Down Expand Up @@ -189,7 +191,7 @@ func (qrs *queuedRetrySender) send(req request) error {
zap.Int("dropped_items", req.count()),
)
span.Annotate(qrs.traceAttributes, "Dropped item, sending_queue is full.")
return errors.New("sending_queue is full")
return errSendingQueueIsFull
}

span.Annotate(qrs.traceAttributes, "Enqueued item.")
Expand Down
Loading