Skip to content

Commit

Permalink
[exporterhelper] make enqueue failures available
Browse files Browse the repository at this point in the history
These metrics were only exporter either via OC or via the prometheus
exporter. Fixes open-telemetry#8673

Signed-off-by: Alex Boten <[email protected]>
  • Loading branch information
Alex Boten committed Oct 11, 2023
1 parent d7b49df commit 59142cc
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 95 deletions.
10 changes: 5 additions & 5 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) {
b.nextSender = nextSender
}

type obsrepSenderFactory func(obsrep *obsExporter) requestSender
type obsrepSenderFactory func(obsrep *ObsReport) requestSender

// baseRequest is a base implementation for the internal.Request.
type baseRequest struct {
Expand Down Expand Up @@ -143,7 +143,7 @@ type baseExporter struct {
signal component.DataType

set exporter.CreateSettings
obsrep *obsExporter
obsrep *ObsReport

// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
Expand All @@ -163,7 +163,7 @@ type baseExporter struct {
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

obsrep, err := newObsExporter(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments)
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
if err != nil {
return nil, err
}
Expand All @@ -175,12 +175,12 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
signal: signal,

queueSender: &baseRequestSender{},
obsrepSender: osf(obsrep),
obsrepSender: osf(obsReport),
retrySender: &baseRequestSender{},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()},

set: set,
obsrep: obsrep,
obsrep: obsReport,
}

for _, op := range options {
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewLogsExporter(
req := newLogsRequest(ctx, ld, pusher)
serr := be.send(req)
if errors.Is(serr, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count()))
be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeLogs, int64(req.Count()))
}
return serr
}, be.consumerOptions...)
Expand Down Expand Up @@ -151,7 +151,7 @@ func NewLogsRequestExporter(
r := newRequest(ctx, req)
sErr := be.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count()))
be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeLogs, int64(r.Count()))
}
return sErr
}, be.consumerOptions...)
Expand All @@ -164,10 +164,10 @@ func NewLogsRequestExporter(

type logsExporterWithObservability struct {
baseRequestSender
obsrep *obsExporter
obsrep *ObsReport
}

func newLogsExporterWithObservability(obsrep *obsExporter) requestSender {
func newLogsExporterWithObservability(obsrep *ObsReport) requestSender {
return &logsExporterWithObservability{obsrep: obsrep}
}

Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewMetricsExporter(
req := newMetricsRequest(ctx, md, pusher)
serr := be.send(req)
if errors.Is(serr, errSendingQueueIsFull) {
be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count()))
be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeMetrics, int64(req.Count()))
}
return serr
}, be.consumerOptions...)
Expand Down Expand Up @@ -151,7 +151,7 @@ func NewMetricsRequestExporter(
r := newRequest(ctx, req)
sErr := be.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count()))
be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeMetrics, int64(r.Count()))
}
return sErr
}, be.consumerOptions...)
Expand All @@ -164,10 +164,10 @@ func NewMetricsRequestExporter(

type metricsSenderWithObservability struct {
baseRequestSender
obsrep *obsExporter
obsrep *ObsReport
}

func newMetricsSenderWithObservability(obsrep *obsExporter) requestSender {
func newMetricsSenderWithObservability(obsrep *ObsReport) requestSender {
return &metricsSenderWithObservability{obsrep: obsrep}
}

Expand Down
77 changes: 69 additions & 8 deletions exporter/exporterhelper/obsexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ type ObsReport struct {
tracer trace.Tracer
logger *zap.Logger

useOtelForMetrics bool
otelAttrs []attribute.KeyValue
sentSpans metric.Int64Counter
failedToSendSpans metric.Int64Counter
sentMetricPoints metric.Int64Counter
failedToSendMetricPoints metric.Int64Counter
sentLogRecords metric.Int64Counter
failedToSendLogRecords metric.Int64Counter
useOtelForMetrics bool
otelAttrs []attribute.KeyValue
sentSpans metric.Int64Counter
failedToSendSpans metric.Int64Counter
failedToEnqueueSpans metric.Int64Counter
sentMetricPoints metric.Int64Counter
failedToSendMetricPoints metric.Int64Counter
failedToEnqueueMetricPoints metric.Int64Counter
sentLogRecords metric.Int64Counter
failedToSendLogRecords metric.Int64Counter
failedToEnqueueLogRecords metric.Int64Counter
}

// ObsReportSettings are settings for creating an ObsReport.
Expand Down Expand Up @@ -96,6 +99,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.failedToEnqueueSpans, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueSpansKey,
metric.WithDescription("Number of spans failed to be added to the sending queue."),
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.sentMetricPoints, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentMetricPointsKey,
metric.WithDescription("Number of metric points successfully sent to destination."),
Expand All @@ -108,6 +117,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.failedToEnqueueMetricPoints, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueMetricPointsKey,
metric.WithDescription("Number of metric points failed to be added to the sending queue."),
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.sentLogRecords, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.SentLogRecordsKey,
metric.WithDescription("Number of log record successfully sent to destination."),
Expand All @@ -120,6 +135,12 @@ func (or *ObsReport) createOtelMetrics(cfg ObsReportSettings) error {
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

or.failedToEnqueueLogRecords, err = meter.Int64Counter(
obsmetrics.ExporterPrefix+obsmetrics.FailedToEnqueueLogRecordsKey,
metric.WithDescription("Number of log records failed to be added to the sending queue."),
metric.WithUnit("1"))
errors = multierr.Append(errors, err)

return errors
}

Expand Down Expand Up @@ -252,3 +273,43 @@ func toNumItems(numExportedItems int, err error) (int64, int64) {
}
return int64(numExportedItems), 0
}

func (or *ObsReport) recordEnqueueFailure(ctx context.Context, dataType component.DataType, failed int64) {
if or.useOtelForMetrics {
or.recordEnqueueFailureWithOtel(ctx, dataType, failed)
} else {
or.recordEnqueueFailureWithOC(ctx, dataType, failed)
}
}

func (or *ObsReport) recordEnqueueFailureWithOC(ctx context.Context, dataType component.DataType, failed int64) {
var failedMeasure *stats.Int64Measure
switch dataType {
case component.DataTypeTraces:
failedMeasure = obsmetrics.ExporterFailedToSendSpans
case component.DataTypeMetrics:
failedMeasure = obsmetrics.ExporterFailedToSendMetricPoints
case component.DataTypeLogs:
failedMeasure = obsmetrics.ExporterFailedToSendLogRecords
}
if failed > 0 {
_ = stats.RecordWithTags(
ctx,
or.mutators,
failedMeasure.M(failed))
}
}

func (or *ObsReport) recordEnqueueFailureWithOtel(ctx context.Context, dataType component.DataType, failed int64) {
var enqueueFailedMeasure metric.Int64Counter
switch dataType {
case component.DataTypeTraces:
enqueueFailedMeasure = or.failedToEnqueueSpans
case component.DataTypeMetrics:
enqueueFailedMeasure = or.failedToEnqueueMetricPoints
case component.DataTypeLogs:
enqueueFailedMeasure = or.failedToEnqueueLogRecords
}

enqueueFailedMeasure.Add(ctx, failed, metric.WithAttributes(or.otelAttrs...))
}
73 changes: 3 additions & 70 deletions exporter/exporterhelper/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"

import (
"context"

"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/metric/metricproducer"
Expand All @@ -26,12 +24,9 @@ func init() {
}

type instruments struct {
registry *metric.Registry
queueSize *metric.Int64DerivedGauge
queueCapacity *metric.Int64DerivedGauge
failedToEnqueueTraceSpans *metric.Int64Cumulative
failedToEnqueueMetricPoints *metric.Int64Cumulative
failedToEnqueueLogRecords *metric.Int64Cumulative
registry *metric.Registry
queueSize *metric.Int64DerivedGauge
queueCapacity *metric.Int64DerivedGauge
}

func newInstruments(registry *metric.Registry) *instruments {
Expand All @@ -49,67 +44,5 @@ func newInstruments(registry *metric.Registry) *instruments {
metric.WithDescription("Fixed capacity of the retry queue (in batches)"),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueTraceSpans, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_spans",
metric.WithDescription("Number of spans failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueMetricPoints, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_metric_points",
metric.WithDescription("Number of metric points failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

insts.failedToEnqueueLogRecords, _ = registry.AddInt64Cumulative(
obsmetrics.ExporterKey+"/enqueue_failed_log_records",
metric.WithDescription("Number of log records failed to be added to the sending queue."),
metric.WithLabelKeys(obsmetrics.ExporterKey),
metric.WithUnit(metricdata.UnitDimensionless))

return insts
}

// obsExporter is a helper to add observability to an exporter.
type obsExporter struct {
*ObsReport
failedToEnqueueTraceSpansEntry *metric.Int64CumulativeEntry
failedToEnqueueMetricPointsEntry *metric.Int64CumulativeEntry
failedToEnqueueLogRecordsEntry *metric.Int64CumulativeEntry
}

// newObsExporter creates a new observability exporter.
func newObsExporter(cfg ObsReportSettings, insts *instruments) (*obsExporter, error) {
labelValue := metricdata.NewLabelValue(cfg.ExporterID.String())
failedToEnqueueTraceSpansEntry, _ := insts.failedToEnqueueTraceSpans.GetEntry(labelValue)
failedToEnqueueMetricPointsEntry, _ := insts.failedToEnqueueMetricPoints.GetEntry(labelValue)
failedToEnqueueLogRecordsEntry, _ := insts.failedToEnqueueLogRecords.GetEntry(labelValue)

exp, err := NewObsReport(cfg)
if err != nil {
return nil, err
}

return &obsExporter{
ObsReport: exp,
failedToEnqueueTraceSpansEntry: failedToEnqueueTraceSpansEntry,
failedToEnqueueMetricPointsEntry: failedToEnqueueMetricPointsEntry,
failedToEnqueueLogRecordsEntry: failedToEnqueueLogRecordsEntry,
}, nil
}

// recordTracesEnqueueFailure records number of spans that failed to be added to the sending queue.
func (eor *obsExporter) recordTracesEnqueueFailure(_ context.Context, numSpans int64) {
eor.failedToEnqueueTraceSpansEntry.Inc(numSpans)
}

// recordMetricsEnqueueFailure records number of metric points that failed to be added to the sending queue.
func (eor *obsExporter) recordMetricsEnqueueFailure(_ context.Context, numMetricPoints int64) {
eor.failedToEnqueueMetricPointsEntry.Inc(numMetricPoints)
}

// recordLogsEnqueueFailure records number of log records that failed to be added to the sending queue.
func (eor *obsExporter) recordLogsEnqueueFailure(_ context.Context, numLogRecords int64) {
eor.failedToEnqueueLogRecordsEntry.Inc(numLogRecords)
}
8 changes: 4 additions & 4 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewTracesExporter(
req := newTracesRequest(ctx, td, pusher)
serr := be.send(req)
if errors.Is(serr, errSendingQueueIsFull) {
be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.Count()))
be.obsrep.recordEnqueueFailure(req.Context(), component.DataTypeTraces, int64(req.Count()))
}
return serr
}, be.consumerOptions...)
Expand Down Expand Up @@ -151,7 +151,7 @@ func NewTracesRequestExporter(
r := newRequest(ctx, req)
sErr := be.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count()))
be.obsrep.recordEnqueueFailure(r.Context(), component.DataTypeTraces, int64(r.Count()))
}
return sErr
}, be.consumerOptions...)
Expand All @@ -164,10 +164,10 @@ func NewTracesRequestExporter(

type tracesExporterWithObservability struct {
baseRequestSender
obsrep *obsExporter
obsrep *ObsReport
}

func newTracesExporterWithObservability(obsrep *obsExporter) requestSender {
func newTracesExporterWithObservability(obsrep *ObsReport) requestSender {
return &tracesExporterWithObservability{obsrep: obsrep}
}

Expand Down
18 changes: 18 additions & 0 deletions internal/obsreportconfig/obsmetrics/obs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@ const (
SentSpansKey = "sent_spans"
// FailedToSendSpansKey used to track spans that failed to be sent by exporters.
FailedToSendSpansKey = "send_failed_spans"
// FailedToEnqueueSpansKey used to track spans that failed to be enqueued by exporters.
FailedToEnqueueSpansKey = "enqueue_failed_spans"

// SentMetricPointsKey used to track metric points sent by exporters.
SentMetricPointsKey = "sent_metric_points"
// FailedToSendMetricPointsKey used to track metric points that failed to be sent by exporters.
FailedToSendMetricPointsKey = "send_failed_metric_points"
// FailedToEnqueueMetricPointsKey used to track metric points that failed to be enqueued by exporters.
FailedToEnqueueMetricPointsKey = "enqueue_failed_metric_points"

// SentLogRecordsKey used to track logs sent by exporters.
SentLogRecordsKey = "sent_log_records"
// FailedToSendLogRecordsKey used to track logs that failed to be sent by exporters.
FailedToSendLogRecordsKey = "send_failed_log_records"
// FailedToEnqueueLogRecordsKey used to track logs that failed to be enqueued by exporters.
FailedToEnqueueLogRecordsKey = "enqueue_failed_log_records"
)

var (
Expand All @@ -49,6 +55,10 @@ var (
ExporterPrefix+FailedToSendSpansKey,
"Number of spans in failed attempts to send to destination.",
stats.UnitDimensionless)
ExporterFailedToEnqueueSpans = stats.Int64(
ExporterPrefix+FailedToEnqueueSpansKey,
"Number of spans failed to be added to the sending queue.",
stats.UnitDimensionless)
ExporterSentMetricPoints = stats.Int64(
ExporterPrefix+SentMetricPointsKey,
"Number of metric points successfully sent to destination.",
Expand All @@ -57,6 +67,10 @@ var (
ExporterPrefix+FailedToSendMetricPointsKey,
"Number of metric points in failed attempts to send to destination.",
stats.UnitDimensionless)
ExporterFailedToEnqueueMetricPoints = stats.Int64(
ExporterPrefix+FailedToEnqueueMetricPointsKey,
"Number of metric points failed to be added to the sending queue.",
stats.UnitDimensionless)
ExporterSentLogRecords = stats.Int64(
ExporterPrefix+SentLogRecordsKey,
"Number of log record successfully sent to destination.",
Expand All @@ -65,4 +79,8 @@ var (
ExporterPrefix+FailedToSendLogRecordsKey,
"Number of log records in failed attempts to send to destination.",
stats.UnitDimensionless)
ExporterFailedToEnqueueLogRecords = stats.Int64(
ExporterPrefix+FailedToEnqueueLogRecordsKey,
"Number of log records failed to be added to the sending queue.",
stats.UnitDimensionless)
)

0 comments on commit 59142cc

Please sign in to comment.