diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index a4fe5fb3da8..ea411ea63bf 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -45,7 +45,7 @@ type request interface { context() context.Context // setContext updates the Context of the requests. setContext(context.Context) - export(ctx context.Context) (int, error) + export(ctx context.Context) error // Returns a new request that contains the items left to be sent. onPartialError(consumererror.PartialError) request // Returns the count of spans/metric points or log records. @@ -54,7 +54,7 @@ type request interface { // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). type requestSender interface { - send(req request) (int, error) + send(req request) error } // baseRequest is a base implementation for the request. @@ -205,7 +205,7 @@ type timeoutSender struct { } // send implements the requestSender interface -func (ts *timeoutSender) send(req request) (int, error) { +func (ts *timeoutSender) send(req request) error { // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be // updated because this deadline most likely is before the next one. ctx := req.context() diff --git a/exporter/exporterhelper/factory_test.go b/exporter/exporterhelper/factory_test.go index b4694a21853..e6d95ac3c75 100644 --- a/exporter/exporterhelper/factory_test.go +++ b/exporter/exporterhelper/factory_test.go @@ -36,14 +36,14 @@ var ( TypeVal: typeStr, NameVal: typeStr, } - nopTracesExporter, _ = NewTraceExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, td pdata.Traces) (droppedSpans int, err error) { - return 0, nil + nopTracesExporter, _ = NewTraceExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, td pdata.Traces) error { + return nil }) - nopMetricsExporter, _ = NewMetricsExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error) { - return 0, nil + nopMetricsExporter, _ = NewMetricsExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, md pdata.Metrics) error { + return nil }) - nopLogsExporter, _ = NewLogsExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, md pdata.Logs) (droppedTimeSeries int, err error) { - return 0, nil + nopLogsExporter, _ = NewLogsExporter(defaultCfg, zap.NewNop(), func(ctx context.Context, md pdata.Logs) error { + return nil }) ) diff --git a/exporter/exporterhelper/logshelper.go b/exporter/exporterhelper/logshelper.go index 70461fa6780..62135bfe142 100644 --- a/exporter/exporterhelper/logshelper.go +++ b/exporter/exporterhelper/logshelper.go @@ -29,7 +29,7 @@ import ( // PushLogs is a helper function that is similar to ConsumeLogs but also returns // the number of dropped logs. -type PushLogs func(ctx context.Context, md pdata.Logs) (droppedTimeSeries int, err error) +type PushLogs func(ctx context.Context, md pdata.Logs) error type logsRequest struct { baseRequest @@ -49,7 +49,7 @@ func (req *logsRequest) onPartialError(partialErr consumererror.PartialError) re return newLogsRequest(req.ctx, partialErr.GetLogs(), req.pusher) } -func (req *logsRequest) export(ctx context.Context) (int, error) { +func (req *logsRequest) export(ctx context.Context) error { return req.pusher(ctx, req.ld) } @@ -63,8 +63,7 @@ type logsExporter struct { } func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { - _, err := lexp.sender.send(newLogsRequest(ctx, ld, lexp.pusher)) - return err + return lexp.sender.send(newLogsRequest(ctx, ld, lexp.pusher)) } // NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span. @@ -105,9 +104,9 @@ type logsExporterWithObservability struct { nextSender requestSender } -func (lewo *logsExporterWithObservability) send(req request) (int, error) { +func (lewo *logsExporterWithObservability) send(req request) error { req.setContext(lewo.obsrep.StartLogsExportOp(req.context())) - numDroppedLogs, err := lewo.nextSender.send(req) + err := lewo.nextSender.send(req) lewo.obsrep.EndLogsExportOp(req.context(), req.count(), err) - return numDroppedLogs, err + return err } diff --git a/exporter/exporterhelper/logshelper_test.go b/exporter/exporterhelper/logshelper_test.go index 6169225ca6d..6a7c3a4304d 100644 --- a/exporter/exporterhelper/logshelper_test.go +++ b/exporter/exporterhelper/logshelper_test.go @@ -57,13 +57,13 @@ func TestLogsRequest(t *testing.T) { } func TestLogsExporter_InvalidName(t *testing.T) { - le, err := NewLogsExporter(nil, zap.NewNop(), newPushLogsData(0, nil)) + le, err := NewLogsExporter(nil, zap.NewNop(), newPushLogsData(nil)) require.Nil(t, le) require.Equal(t, errNilConfig, err) } func TestLogsExporter_NilLogger(t *testing.T) { - le, err := NewLogsExporter(fakeLogsExporterConfig, nil, newPushLogsData(0, nil)) + le, err := NewLogsExporter(fakeLogsExporterConfig, nil, newPushLogsData(nil)) require.Nil(t, le) require.Equal(t, errNilLogger, err) } @@ -76,7 +76,7 @@ func TestLogsExporter_NilPushLogsData(t *testing.T) { func TestLogsExporter_Default(t *testing.T) { ld := testdata.GenerateLogDataEmpty() - le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, nil)) + le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil)) assert.NotNil(t, le) assert.NoError(t, err) @@ -87,22 +87,14 @@ func TestLogsExporter_Default(t *testing.T) { func TestLogsExporter_Default_ReturnError(t *testing.T) { ld := testdata.GenerateLogDataEmpty() want := errors.New("my_error") - le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, want)) + le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(want)) require.Nil(t, err) require.NotNil(t, le) require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) } func TestLogsExporter_WithRecordLogs(t *testing.T) { - le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, nil)) - require.Nil(t, err) - require.NotNil(t, le) - - checkRecordedMetricsForLogsExporter(t, le, nil) -} - -func TestLogsExporter_WithRecordLogs_NonZeroDropped(t *testing.T) { - le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(1, nil)) + le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil)) require.Nil(t, err) require.NotNil(t, le) @@ -111,7 +103,7 @@ func TestLogsExporter_WithRecordLogs_NonZeroDropped(t *testing.T) { func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) { want := errors.New("my_error") - le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, want)) + le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(want)) require.Nil(t, err) require.NotNil(t, le) @@ -119,14 +111,7 @@ func TestLogsExporter_WithRecordLogs_ReturnError(t *testing.T) { } func TestLogsExporter_WithSpan(t *testing.T) { - le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, nil)) - require.Nil(t, err) - require.NotNil(t, le) - checkWrapSpanForLogsExporter(t, le, nil, 1) -} - -func TestLogsExporter_WithSpan_NonZeroDropped(t *testing.T) { - le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(1, nil)) + le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil)) require.Nil(t, err) require.NotNil(t, le) checkWrapSpanForLogsExporter(t, le, nil, 1) @@ -134,7 +119,7 @@ func TestLogsExporter_WithSpan_NonZeroDropped(t *testing.T) { func TestLogsExporter_WithSpan_ReturnError(t *testing.T) { want := errors.New("my_error") - le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, want)) + le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(want)) require.Nil(t, err) require.NotNil(t, le) checkWrapSpanForLogsExporter(t, le, want, 1) @@ -144,7 +129,7 @@ func TestLogsExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } - le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, nil), WithShutdown(shutdown)) + le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil), WithShutdown(shutdown)) assert.NotNil(t, le) assert.NoError(t, err) @@ -156,16 +141,16 @@ func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } - le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(0, nil), WithShutdown(shutdownErr)) + le, err := NewLogsExporter(fakeLogsExporterConfig, zap.NewNop(), newPushLogsData(nil), WithShutdown(shutdownErr)) assert.NotNil(t, le) assert.NoError(t, err) assert.Equal(t, le.Shutdown(context.Background()), want) } -func newPushLogsData(droppedTimeSeries int, retError error) PushLogs { - return func(ctx context.Context, td pdata.Logs) (int, error) { - return droppedTimeSeries, retError +func newPushLogsData(retError error) PushLogs { + return func(ctx context.Context, td pdata.Logs) error { + return retError } } diff --git a/exporter/exporterhelper/metricshelper.go b/exporter/exporterhelper/metricshelper.go index f92ec03cf3f..bee9311e0ce 100644 --- a/exporter/exporterhelper/metricshelper.go +++ b/exporter/exporterhelper/metricshelper.go @@ -29,7 +29,7 @@ import ( // PushMetrics is a helper function that is similar to ConsumeMetrics but also returns // the number of dropped metrics. -type PushMetrics func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error) +type PushMetrics func(ctx context.Context, md pdata.Metrics) error type metricsRequest struct { baseRequest @@ -49,7 +49,7 @@ func (req *metricsRequest) onPartialError(partialErr consumererror.PartialError) return newMetricsRequest(req.ctx, partialErr.GetMetrics(), req.pusher) } -func (req *metricsRequest) export(ctx context.Context) (int, error) { +func (req *metricsRequest) export(ctx context.Context) error { return req.pusher(ctx, req.md) } @@ -67,8 +67,7 @@ func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metric if mexp.baseExporter.convertResourceToTelemetry { md = convertResourceToLabels(md) } - _, err := mexp.sender.send(newMetricsRequest(ctx, md, mexp.pusher)) - return err + return mexp.sender.send(newMetricsRequest(ctx, md, mexp.pusher)) } // NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span. @@ -109,16 +108,9 @@ type metricsSenderWithObservability struct { nextSender requestSender } -func (mewo *metricsSenderWithObservability) send(req request) (int, error) { +func (mewo *metricsSenderWithObservability) send(req request) error { req.setContext(mewo.obsrep.StartMetricsExportOp(req.context())) - _, err := mewo.nextSender.send(req) - - // TODO: this is not ideal: it should come from the next function itself. - // temporarily loading it from internal format. Once full switch is done - // to new metrics will remove this. - mReq := req.(*metricsRequest) - numReceivedMetrics, numPoints := mReq.md.MetricAndDataPointCount() - - mewo.obsrep.EndMetricsExportOp(req.context(), numPoints, err) - return numReceivedMetrics, err + err := mewo.nextSender.send(req) + mewo.obsrep.EndMetricsExportOp(req.context(), req.count(), err) + return err } diff --git a/exporter/exporterhelper/metricshelper_test.go b/exporter/exporterhelper/metricshelper_test.go index deb37bfc60e..de162e4b4bb 100644 --- a/exporter/exporterhelper/metricshelper_test.go +++ b/exporter/exporterhelper/metricshelper_test.go @@ -57,13 +57,13 @@ func TestMetricsRequest(t *testing.T) { } func TestMetricsExporter_InvalidName(t *testing.T) { - me, err := NewMetricsExporter(nil, zap.NewNop(), newPushMetricsData(0, nil)) + me, err := NewMetricsExporter(nil, zap.NewNop(), newPushMetricsData(nil)) require.Nil(t, me) require.Equal(t, errNilConfig, err) } func TestMetricsExporter_NilLogger(t *testing.T) { - me, err := NewMetricsExporter(fakeMetricsExporterConfig, nil, newPushMetricsData(0, nil)) + me, err := NewMetricsExporter(fakeMetricsExporterConfig, nil, newPushMetricsData(nil)) require.Nil(t, me) require.Equal(t, errNilLogger, err) } @@ -76,7 +76,7 @@ func TestMetricsExporter_NilPushMetricsData(t *testing.T) { func TestMetricsExporter_Default(t *testing.T) { md := testdata.GenerateMetricsEmpty() - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil)) + me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil)) assert.NotNil(t, me) assert.NoError(t, err) @@ -87,22 +87,14 @@ func TestMetricsExporter_Default(t *testing.T) { func TestMetricsExporter_Default_ReturnError(t *testing.T) { md := testdata.GenerateMetricsEmpty() want := errors.New("my_error") - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, want)) + me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want)) require.Nil(t, err) require.NotNil(t, me) require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) } func TestMetricsExporter_WithRecordMetrics(t *testing.T) { - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil)) - require.Nil(t, err) - require.NotNil(t, me) - - checkRecordedMetricsForMetricsExporter(t, me, nil) -} - -func TestMetricsExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) { - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(1, nil)) + me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil)) require.Nil(t, err) require.NotNil(t, me) @@ -111,7 +103,7 @@ func TestMetricsExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) { func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, want)) + me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want)) require.Nil(t, err) require.NotNil(t, me) @@ -119,14 +111,7 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) { } func TestMetricsExporter_WithSpan(t *testing.T) { - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil)) - require.Nil(t, err) - require.NotNil(t, me) - checkWrapSpanForMetricsExporter(t, me, nil, 1) -} - -func TestMetricsExporter_WithSpan_NonZeroDropped(t *testing.T) { - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(1, nil)) + me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil)) require.Nil(t, err) require.NotNil(t, me) checkWrapSpanForMetricsExporter(t, me, nil, 1) @@ -134,7 +119,7 @@ func TestMetricsExporter_WithSpan_NonZeroDropped(t *testing.T) { func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) { want := errors.New("my_error") - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, want)) + me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(want)) require.Nil(t, err) require.NotNil(t, me) checkWrapSpanForMetricsExporter(t, me, want, 1) @@ -144,7 +129,7 @@ func TestMetricsExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil), WithShutdown(shutdown)) + me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithShutdown(shutdown)) assert.NotNil(t, me) assert.NoError(t, err) @@ -154,7 +139,7 @@ func TestMetricsExporter_WithShutdown(t *testing.T) { func TestMetricsExporter_WithResourceToTelemetryConversionDisabled(t *testing.T) { md := testdata.GenerateMetricsTwoMetrics() - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil), WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings())) + me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithResourceToTelemetryConversion(defaultResourceToTelemetrySettings())) assert.NotNil(t, me) assert.NoError(t, err) @@ -164,7 +149,7 @@ func TestMetricsExporter_WithResourceToTelemetryConversionDisabled(t *testing.T) func TestMetricsExporter_WithResourceToTelemetryConversionEbabled(t *testing.T) { md := testdata.GenerateMetricsTwoMetrics() - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil), WithResourceToTelemetryConversion(ResourceToTelemetrySettings{Enabled: true})) + me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithResourceToTelemetryConversion(ResourceToTelemetrySettings{Enabled: true})) assert.NotNil(t, me) assert.NoError(t, err) @@ -176,16 +161,16 @@ func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } - me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(0, nil), WithShutdown(shutdownErr)) + me, err := NewMetricsExporter(fakeMetricsExporterConfig, zap.NewNop(), newPushMetricsData(nil), WithShutdown(shutdownErr)) assert.NotNil(t, me) assert.NoError(t, err) assert.Equal(t, me.Shutdown(context.Background()), want) } -func newPushMetricsData(droppedTimeSeries int, retError error) PushMetrics { - return func(ctx context.Context, td pdata.Metrics) (int, error) { - return droppedTimeSeries, retError +func newPushMetricsData(retError error) PushMetrics { + return func(ctx context.Context, td pdata.Metrics) error { + return retError } } diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 87295f72e7f..90f985fba27 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -130,21 +130,21 @@ func newQueuedRetrySender(fullName string, qCfg QueueSettings, rCfg RetrySetting func (qrs *queuedRetrySender) start() { qrs.queue.StartConsumers(qrs.cfg.NumConsumers, func(item interface{}) { req := item.(request) - _, _ = qrs.consumerSender.send(req) + _ = qrs.consumerSender.send(req) }) } // send implements the requestSender interface -func (qrs *queuedRetrySender) send(req request) (int, error) { +func (qrs *queuedRetrySender) send(req request) error { if !qrs.cfg.Enabled { - n, err := qrs.consumerSender.send(req) + err := qrs.consumerSender.send(req) if err != nil { qrs.logger.Error( "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", - zap.Int("dropped_items", n), + zap.Int("dropped_items", req.count()), ) } - return n, err + return err } // Prevent cancellation and deadline to propagate to the context stored in the queue. @@ -158,11 +158,11 @@ func (qrs *queuedRetrySender) send(req request) (int, error) { zap.Int("dropped_items", req.count()), ) span.Annotate(qrs.traceAttributes, "Dropped item, sending_queue is full.") - return req.count(), errors.New("sending_queue is full") + return errors.New("sending_queue is full") } span.Annotate(qrs.traceAttributes, "Enqueued item.") - return 0, nil + return nil } // shutdown is invoked during service shutdown. @@ -197,16 +197,16 @@ type retrySender struct { } // send implements the requestSender interface -func (rs *retrySender) send(req request) (int, error) { +func (rs *retrySender) send(req request) error { if !rs.cfg.Enabled { - n, err := rs.nextSender.send(req) + err := rs.nextSender.send(req) if err != nil { rs.logger.Error( "Exporting failed. Try enabling retry_on_failure config option.", zap.Error(err), ) } - return n, err + return err } // Do not use NewExponentialBackOff since it calls Reset and the code here must @@ -229,10 +229,10 @@ func (rs *retrySender) send(req request) (int, error) { rs.traceAttribute, trace.Int64Attribute("retry_num", retryNum)}, "Sending request.") - droppedItems, err := rs.nextSender.send(req) + err := rs.nextSender.send(req) if err == nil { - return droppedItems, nil + return nil } // Immediately drop data on permanent errors. @@ -240,9 +240,9 @@ func (rs *retrySender) send(req request) (int, error) { rs.logger.Error( "Exporting failed. The error is not retryable. Dropping data.", zap.Error(err), - zap.Int("dropped_items", droppedItems), + zap.Int("dropped_items", req.count()), ) - return droppedItems, err + return err } // If partial error, update data and stats with non exported data. @@ -257,9 +257,9 @@ func (rs *retrySender) send(req request) (int, error) { rs.logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), - zap.Int("dropped_items", droppedItems), + zap.Int("dropped_items", req.count()), ) - return req.count(), err + return err } if throttleErr, isThrottle := err.(*throttleRetry); isThrottle { @@ -283,9 +283,9 @@ func (rs *retrySender) send(req request) (int, error) { // back-off, but get interrupted when shutting down or request is cancelled or timed out. select { case <-req.context().Done(): - return req.count(), fmt.Errorf("request is cancelled or timed out %w", err) + return fmt.Errorf("request is cancelled or timed out %w", err) case <-rs.stopCh: - return req.count(), fmt.Errorf("interrupted due to shutdown %w", err) + return fmt.Errorf("interrupted due to shutdown %w", err) case <-time.After(backoffDelay): } } diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index 8c1be799c10..4ca91d3f568 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -46,9 +46,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { mockR := newMockRequest(context.Background(), 2, consumererror.Permanent(errors.New("bad data"))) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - droppedItems, err := be.sender.send(mockR) - require.NoError(t, err) - assert.Equal(t, 0, droppedItems) + require.NoError(t, be.sender.send(mockR)) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -72,9 +70,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - droppedItems, err := be.sender.send(mockR) - require.NoError(t, err) - assert.Equal(t, 0, droppedItems) + require.NoError(t, be.sender.send(mockR)) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -100,9 +96,7 @@ func TestQueuedRetry_PartialError(t *testing.T) { mockR := newMockRequest(context.Background(), 2, partialErr) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - droppedItems, err := be.sender.send(mockR) - require.NoError(t, err) - assert.Equal(t, 0, droppedItems) + require.NoError(t, be.sender.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -124,18 +118,14 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { firstMockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - droppedItems, err := be.sender.send(firstMockR) - require.NoError(t, err) - assert.Equal(t, 0, droppedItems) + require.NoError(t, be.sender.send(firstMockR)) }) // Enqueue another request to ensure when calling shutdown we drain the queue. secondMockR := newMockRequest(context.Background(), 3, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - droppedItems, err := be.sender.send(secondMockR) - require.NoError(t, err) - assert.Equal(t, 0, droppedItems) + require.NoError(t, be.sender.send(secondMockR)) }) assert.NoError(t, be.Shutdown(context.Background())) @@ -166,9 +156,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { mockR := newMockRequest(ctx, 2, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - droppedItems, err := be.sender.send(mockR) - require.NoError(t, err) - assert.Equal(t, 0, droppedItems) + require.NoError(t, be.sender.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -194,18 +182,14 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { ocs.run(func() { // Add an item that will always fail. - droppedItems, err := be.sender.send(newErrorRequest(context.Background())) - require.NoError(t, err) - assert.Equal(t, 0, droppedItems) + require.NoError(t, be.sender.send(newErrorRequest(context.Background()))) }) mockR := newMockRequest(context.Background(), 2, nil) start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - droppedItems, err := be.sender.send(mockR) - require.NoError(t, err) - assert.Equal(t, 0, droppedItems) + require.NoError(t, be.sender.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -238,9 +222,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - droppedItems, err := be.sender.send(mockR) - require.NoError(t, err) - assert.Equal(t, 0, droppedItems) + require.NoError(t, be.sender.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -270,9 +252,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - droppedItems, err := be.sender.send(mockR) - require.NoError(t, err) - assert.Equal(t, 0, droppedItems) + require.NoError(t, be.sender.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -294,9 +274,8 @@ func TestQueuedRetry_DropOnFull(t *testing.T) { t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) - droppedItems, err := be.sender.send(newMockRequest(context.Background(), 2, errors.New("transient error"))) + err := be.sender.send(newMockRequest(context.Background(), 2, errors.New("transient error"))) require.Error(t, err) - assert.Equal(t, 2, droppedItems) } func TestQueuedRetryHappyPath(t *testing.T) { @@ -320,9 +299,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { ocs.run(func() { req := newMockRequest(context.Background(), 2, nil) reqs = append(reqs, req) - droppedItems, err := be.sender.send(req) - require.NoError(t, err) - assert.Equal(t, 0, droppedItems) + require.NoError(t, be.sender.send(req)) }) } @@ -358,8 +335,8 @@ type mockErrorRequest struct { baseRequest } -func (mer *mockErrorRequest) export(_ context.Context) (int, error) { - return 0, errors.New("transient error") +func (mer *mockErrorRequest) export(_ context.Context) error { + return errors.New("transient error") } func (mer *mockErrorRequest) onPartialError(consumererror.PartialError) request { @@ -384,17 +361,17 @@ type mockRequest struct { requestCount *int64 } -func (m *mockRequest) export(ctx context.Context) (int, error) { +func (m *mockRequest) export(ctx context.Context) error { atomic.AddInt64(m.requestCount, 1) m.mu.Lock() defer m.mu.Unlock() err := m.consumeError m.consumeError = nil if err != nil { - return m.cnt, err + return err } // Respond like gRPC/HTTP, if context is cancelled, return error - return 0, ctx.Err() + return ctx.Err() } func (m *mockRequest) onPartialError(consumererror.PartialError) request { @@ -436,12 +413,15 @@ func newObservabilityConsumerSender(nextSender requestSender) *observabilityCons return &observabilityConsumerSender{waitGroup: new(sync.WaitGroup), nextSender: nextSender} } -func (ocs *observabilityConsumerSender) send(req request) (int, error) { - dic, err := ocs.nextSender.send(req) - atomic.AddInt64(&ocs.sentItemsCount, int64(req.count()-dic)) - atomic.AddInt64(&ocs.droppedItemsCount, int64(dic)) +func (ocs *observabilityConsumerSender) send(req request) error { + err := ocs.nextSender.send(req) + if err != nil { + atomic.AddInt64(&ocs.droppedItemsCount, int64(req.count())) + } else { + atomic.AddInt64(&ocs.sentItemsCount, int64(req.count())) + } ocs.waitGroup.Done() - return dic, err + return err } func (ocs *observabilityConsumerSender) run(fn func()) { diff --git a/exporter/exporterhelper/tracehelper.go b/exporter/exporterhelper/tracehelper.go index 42ab0651458..4e614a0f603 100644 --- a/exporter/exporterhelper/tracehelper.go +++ b/exporter/exporterhelper/tracehelper.go @@ -29,7 +29,7 @@ import ( // PushTraces is a helper function that is similar to ConsumeTraces but also // returns the number of dropped spans. -type PushTraces func(ctx context.Context, td pdata.Traces) (droppedSpans int, err error) +type PushTraces func(ctx context.Context, td pdata.Traces) error type tracesRequest struct { baseRequest @@ -49,7 +49,7 @@ func (req *tracesRequest) onPartialError(partialErr consumererror.PartialError) return newTracesRequest(req.ctx, partialErr.GetTraces(), req.pusher) } -func (req *tracesRequest) export(ctx context.Context) (int, error) { +func (req *tracesRequest) export(ctx context.Context) error { return req.pusher(ctx, req.td) } @@ -63,8 +63,7 @@ type traceExporter struct { } func (texp *traceExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) error { - _, err := texp.sender.send(newTracesRequest(ctx, td, texp.pusher)) - return err + return texp.sender.send(newTracesRequest(ctx, td, texp.pusher)) } // NewTraceExporter creates a TracesExporter that records observability metrics and wraps every request with a Span. @@ -106,14 +105,10 @@ type tracesExporterWithObservability struct { nextSender requestSender } -func (tewo *tracesExporterWithObservability) send(req request) (int, error) { +func (tewo *tracesExporterWithObservability) send(req request) error { req.setContext(tewo.obsrep.StartTracesExportOp(req.context())) // Forward the data to the next consumer (this pusher is the next). - droppedSpans, err := tewo.nextSender.send(req) - - // TODO: this is not ideal: it should come from the next function itself. - // temporarily loading it from internal format. Once full switch is done - // to new metrics will remove this. + err := tewo.nextSender.send(req) tewo.obsrep.EndTracesExportOp(req.context(), req.count(), err) - return droppedSpans, err + return err } diff --git a/exporter/exporterhelper/tracehelper_test.go b/exporter/exporterhelper/tracehelper_test.go index a007b6b1aa9..19911b2e782 100644 --- a/exporter/exporterhelper/tracehelper_test.go +++ b/exporter/exporterhelper/tracehelper_test.go @@ -66,13 +66,13 @@ func (tote *testOCTraceExporter) ExportSpan(sd *trace.SpanData) { } func TestTraceExporter_InvalidName(t *testing.T) { - te, err := NewTraceExporter(nil, zap.NewNop(), newTraceDataPusher(0, nil)) + te, err := NewTraceExporter(nil, zap.NewNop(), newTraceDataPusher(nil)) require.Nil(t, te) require.Equal(t, errNilConfig, err) } func TestTraceExporter_NilLogger(t *testing.T) { - te, err := NewTraceExporter(fakeTraceExporterConfig, nil, newTraceDataPusher(0, nil)) + te, err := NewTraceExporter(fakeTraceExporterConfig, nil, newTraceDataPusher(nil)) require.Nil(t, te) require.Equal(t, errNilLogger, err) } @@ -85,7 +85,7 @@ func TestTraceExporter_NilPushTraceData(t *testing.T) { func TestTraceExporter_Default(t *testing.T) { td := pdata.NewTraces() - te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(0, nil)) + te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(nil)) assert.NotNil(t, te) assert.NoError(t, err) @@ -96,7 +96,7 @@ func TestTraceExporter_Default(t *testing.T) { func TestTraceExporter_Default_ReturnError(t *testing.T) { td := pdata.NewTraces() want := errors.New("my_error") - te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(0, want)) + te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(want)) require.Nil(t, err) require.NotNil(t, te) @@ -105,15 +105,7 @@ func TestTraceExporter_Default_ReturnError(t *testing.T) { } func TestTraceExporter_WithRecordMetrics(t *testing.T) { - te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(0, nil)) - require.Nil(t, err) - require.NotNil(t, te) - - checkRecordedMetricsForTraceExporter(t, te, nil) -} - -func TestTraceExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) { - te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(1, nil)) + te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(nil)) require.Nil(t, err) require.NotNil(t, te) @@ -122,7 +114,7 @@ func TestTraceExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) { func TestTraceExporter_WithRecordMetrics_ReturnError(t *testing.T) { want := errors.New("my_error") - te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(0, want)) + te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(want)) require.Nil(t, err) require.NotNil(t, te) @@ -130,15 +122,7 @@ func TestTraceExporter_WithRecordMetrics_ReturnError(t *testing.T) { } func TestTraceExporter_WithSpan(t *testing.T) { - te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(0, nil)) - require.Nil(t, err) - require.NotNil(t, te) - - checkWrapSpanForTraceExporter(t, te, nil, 1) -} - -func TestTraceExporter_WithSpan_NonZeroDropped(t *testing.T) { - te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(1, nil)) + te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(nil)) require.Nil(t, err) require.NotNil(t, te) @@ -147,7 +131,7 @@ func TestTraceExporter_WithSpan_NonZeroDropped(t *testing.T) { func TestTraceExporter_WithSpan_ReturnError(t *testing.T) { want := errors.New("my_error") - te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(0, want)) + te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(want)) require.Nil(t, err) require.NotNil(t, te) @@ -158,7 +142,7 @@ func TestTraceExporter_WithShutdown(t *testing.T) { shutdownCalled := false shutdown := func(context.Context) error { shutdownCalled = true; return nil } - te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(0, nil), WithShutdown(shutdown)) + te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(nil), WithShutdown(shutdown)) assert.NotNil(t, te) assert.NoError(t, err) @@ -170,16 +154,16 @@ func TestTraceExporter_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } - te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(0, nil), WithShutdown(shutdownErr)) + te, err := NewTraceExporter(fakeTraceExporterConfig, zap.NewNop(), newTraceDataPusher(nil), WithShutdown(shutdownErr)) assert.NotNil(t, te) assert.NoError(t, err) assert.Equal(t, te.Shutdown(context.Background()), want) } -func newTraceDataPusher(droppedSpans int, retError error) PushTraces { - return func(ctx context.Context, td pdata.Traces) (int, error) { - return droppedSpans, retError +func newTraceDataPusher(retError error) PushTraces { + return func(ctx context.Context, td pdata.Traces) error { + return retError } } diff --git a/exporter/jaegerexporter/exporter.go b/exporter/jaegerexporter/exporter.go index 77d481438c0..e1e66673567 100644 --- a/exporter/jaegerexporter/exporter.go +++ b/exporter/jaegerexporter/exporter.go @@ -112,30 +112,29 @@ type stateReporter interface { func (s *protoGRPCSender) pushTraceData( ctx context.Context, td pdata.Traces, -) (droppedSpans int, err error) { +) error { batches, err := jaegertranslator.InternalTracesToJaegerProto(td) if err != nil { - return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err)) + return consumererror.Permanent(fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err)) } if s.metadata.Len() > 0 { ctx = metadata.NewOutgoingContext(ctx, s.metadata) } - var sentSpans int for _, batch := range batches { _, err = s.client.PostSpans( ctx, &jaegerproto.PostSpansRequest{Batch: *batch}, grpc.WaitForReady(s.waitForReady)) + if err != nil { s.logger.Debug("failed to push trace data to Jaeger", zap.Error(err)) - return td.SpanCount() - sentSpans, fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err) + return fmt.Errorf("failed to push trace data via Jaeger exporter: %w", err) } - sentSpans += len(batch.Spans) } - return 0, nil + return nil } func (s *protoGRPCSender) shutdown(context.Context) error { diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index fff86bdb695..940aeff4fde 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -36,16 +36,16 @@ type kafkaTracesProducer struct { logger *zap.Logger } -func (e *kafkaTracesProducer) traceDataPusher(_ context.Context, td pdata.Traces) (int, error) { +func (e *kafkaTracesProducer) traceDataPusher(_ context.Context, td pdata.Traces) error { messages, err := e.marshaller.Marshal(td) if err != nil { - return td.SpanCount(), consumererror.Permanent(err) + return consumererror.Permanent(err) } err = e.producer.SendMessages(producerMessages(messages, e.topic)) if err != nil { - return td.SpanCount(), err + return err } - return 0, nil + return nil } func (e *kafkaTracesProducer) Close(context.Context) error { @@ -60,16 +60,16 @@ type kafkaMetricsProducer struct { logger *zap.Logger } -func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pdata.Metrics) (int, error) { +func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pdata.Metrics) error { messages, err := e.marshaller.Marshal(md) if err != nil { - return md.MetricCount(), consumererror.Permanent(err) + return consumererror.Permanent(err) } err = e.producer.SendMessages(producerMessages(messages, e.topic)) if err != nil { - return md.MetricCount(), err + return err } - return 0, nil + return nil } func (e *kafkaMetricsProducer) Close(context.Context) error { diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index 6630699a657..647809f38c0 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -103,9 +103,8 @@ func TestTraceDataPusher(t *testing.T) { t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) }) - droppedSpans, err := p.traceDataPusher(context.Background(), testdata.GenerateTraceDataTwoSpansSameResource()) + err := p.traceDataPusher(context.Background(), testdata.GenerateTraceDataTwoSpansSameResource()) require.NoError(t, err) - assert.Equal(t, 0, droppedSpans) } func TestTraceDataPusher_err(t *testing.T) { @@ -123,9 +122,8 @@ func TestTraceDataPusher_err(t *testing.T) { require.NoError(t, p.Close(context.Background())) }) td := testdata.GenerateTraceDataTwoSpansSameResource() - droppedSpans, err := p.traceDataPusher(context.Background(), td) + err := p.traceDataPusher(context.Background(), td) assert.EqualError(t, err, expErr.Error()) - assert.Equal(t, td.SpanCount(), droppedSpans) } func TestTraceDataPusher_marshall_error(t *testing.T) { @@ -135,10 +133,9 @@ func TestTraceDataPusher_marshall_error(t *testing.T) { logger: zap.NewNop(), } td := testdata.GenerateTraceDataTwoSpansSameResource() - droppedSpans, err := p.traceDataPusher(context.Background(), td) + err := p.traceDataPusher(context.Background(), td) require.Error(t, err) assert.Contains(t, err.Error(), expErr.Error()) - assert.Equal(t, td.SpanCount(), droppedSpans) } func TestMetricsDataPusher(t *testing.T) { @@ -153,9 +150,8 @@ func TestMetricsDataPusher(t *testing.T) { t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) }) - dropped, err := p.metricsDataPusher(context.Background(), testdata.GenerateMetricsTwoMetrics()) + err := p.metricsDataPusher(context.Background(), testdata.GenerateMetricsTwoMetrics()) require.NoError(t, err) - assert.Equal(t, 0, dropped) } func TestMetricsDataPusher_err(t *testing.T) { @@ -173,9 +169,8 @@ func TestMetricsDataPusher_err(t *testing.T) { require.NoError(t, p.Close(context.Background())) }) md := testdata.GenerateMetricsTwoMetrics() - dropped, err := p.metricsDataPusher(context.Background(), md) + err := p.metricsDataPusher(context.Background(), md) assert.EqualError(t, err, expErr.Error()) - assert.Equal(t, md.MetricCount(), dropped) } func TestMetricsDataPusher_marshal_error(t *testing.T) { @@ -185,10 +180,9 @@ func TestMetricsDataPusher_marshal_error(t *testing.T) { logger: zap.NewNop(), } md := testdata.GenerateMetricsTwoMetrics() - dropped, err := p.metricsDataPusher(context.Background(), md) + err := p.metricsDataPusher(context.Background(), md) require.Error(t, err) assert.Contains(t, err.Error(), expErr.Error()) - assert.Equal(t, md.MetricCount(), dropped) } type tracesErrorMarshaller struct { diff --git a/exporter/loggingexporter/logging_exporter.go b/exporter/loggingexporter/logging_exporter.go index 1cb5460a6d1..6eb13bb2092 100644 --- a/exporter/loggingexporter/logging_exporter.go +++ b/exporter/loggingexporter/logging_exporter.go @@ -322,12 +322,12 @@ type loggingExporter struct { func (s *loggingExporter) pushTraceData( _ context.Context, td pdata.Traces, -) (int, error) { +) error { s.logger.Info("TracesExporter", zap.Int("#spans", td.SpanCount())) if !s.debug { - return 0, nil + return nil } buf := logDataBuffer{} @@ -365,17 +365,17 @@ func (s *loggingExporter) pushTraceData( } s.logger.Debug(buf.str.String()) - return 0, nil + return nil } func (s *loggingExporter) pushMetricsData( _ context.Context, md pdata.Metrics, -) (int, error) { +) error { s.logger.Info("MetricsExporter", zap.Int("#metrics", md.MetricCount())) if !s.debug { - return 0, nil + return nil } buf := logDataBuffer{} @@ -401,7 +401,7 @@ func (s *loggingExporter) pushMetricsData( s.logger.Debug(buf.str.String()) - return 0, nil + return nil } // newTraceExporter creates an exporter.TracesExporter that just drops the @@ -467,11 +467,11 @@ func newLogsExporter(config configmodels.Exporter, level string, logger *zap.Log func (s *loggingExporter) pushLogData( _ context.Context, ld pdata.Logs, -) (int, error) { +) error { s.logger.Info("LogsExporter", zap.Int("#logs", ld.LogRecordCount())) if !s.debug { - return 0, nil + return nil } buf := logDataBuffer{} @@ -497,7 +497,7 @@ func (s *loggingExporter) pushLogData( s.logger.Debug(buf.str.String()) - return 0, nil + return nil } func loggerSync(logger *zap.Logger) func(context.Context) error { diff --git a/exporter/opencensusexporter/opencensus.go b/exporter/opencensusexporter/opencensus.go index 114bff626e7..2484d17195b 100644 --- a/exporter/opencensusexporter/opencensus.go +++ b/exporter/opencensusexporter/opencensus.go @@ -136,12 +136,12 @@ func newMetricsExporter(ctx context.Context, cfg *Config) (*ocExporter, error) { return oce, nil } -func (oce *ocExporter) pushTraceData(_ context.Context, td pdata.Traces) (int, error) { +func (oce *ocExporter) pushTraceData(_ context.Context, td pdata.Traces) error { // Get first available trace Client. tClient, ok := <-oce.tracesClients if !ok { err := errors.New("failed to push traces, OpenCensus exporter was already stopped") - return td.SpanCount(), err + return err } // In any of the metricsClients channel we keep always NumWorkers object (sometimes nil), @@ -154,7 +154,7 @@ func (oce *ocExporter) pushTraceData(_ context.Context, td pdata.Traces) (int, e if err != nil { // Cannot create an RPC, put back nil to keep the number of workers constant. oce.tracesClients <- nil - return td.SpanCount(), err + return err } } @@ -178,19 +178,19 @@ func (oce *ocExporter) pushTraceData(_ context.Context, td pdata.Traces) (int, e // put back nil to keep the number of workers constant. tClient.cancel() oce.tracesClients <- nil - return td.SpanCount(), err + return err } } oce.tracesClients <- tClient - return 0, nil + return nil } -func (oce *ocExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (int, error) { +func (oce *ocExporter) pushMetricsData(_ context.Context, md pdata.Metrics) error { // Get first available mClient. mClient, ok := <-oce.metricsClients if !ok { err := errors.New("failed to push metrics, OpenCensus exporter was already stopped") - return metricPointCount(md), err + return err } // In any of the metricsClients channel we keep always NumWorkers object (sometimes nil), @@ -203,7 +203,7 @@ func (oce *ocExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (int if err != nil { // Cannot create an RPC, put back nil to keep the number of workers constant. oce.metricsClients <- nil - return metricPointCount(md), err + return err } } @@ -228,11 +228,11 @@ func (oce *ocExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (int // put back nil to keep the number of workers constant. mClient.cancel() oce.metricsClients <- nil - return metricPointCount(md), err + return err } } oce.metricsClients <- mClient - return 0, nil + return nil } func (oce *ocExporter) createTraceServiceRPC() (*tracesClientWithCancel, error) { @@ -264,8 +264,3 @@ func (oce *ocExporter) createMetricsServiceRPC() (*metricsClientWithCancel, erro } return &metricsClientWithCancel{cancel: cancel, msec: metricsClient}, nil } - -func metricPointCount(md pdata.Metrics) int { - _, pc := md.MetricAndDataPointCount() - return pc -} diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index c7ac37a3540..0b1f72e96d8 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -65,40 +65,34 @@ func (e *exporterImp) shutdown(context.Context) error { return e.w.stop() } -func (e *exporterImp) pushTraceData(ctx context.Context, td pdata.Traces) (int, error) { +func (e *exporterImp) pushTraceData(ctx context.Context, td pdata.Traces) error { request := &otlptrace.ExportTraceServiceRequest{ ResourceSpans: pdata.TracesToOtlp(td), } - err := e.w.exportTrace(ctx, request) - - if err != nil { - return td.SpanCount(), fmt.Errorf("failed to push trace data via OTLP exporter: %w", err) + if err := e.w.exportTrace(ctx, request); err != nil { + return fmt.Errorf("failed to push trace data via OTLP exporter: %w", err) } - return 0, nil + return nil } -func (e *exporterImp) pushMetricsData(ctx context.Context, md pdata.Metrics) (int, error) { +func (e *exporterImp) pushMetricsData(ctx context.Context, md pdata.Metrics) error { request := &otlpmetrics.ExportMetricsServiceRequest{ ResourceMetrics: pdata.MetricsToOtlp(md), } - err := e.w.exportMetrics(ctx, request) - - if err != nil { - return md.MetricCount(), fmt.Errorf("failed to push metrics data via OTLP exporter: %w", err) + if err := e.w.exportMetrics(ctx, request); err != nil { + return fmt.Errorf("failed to push metrics data via OTLP exporter: %w", err) } - return 0, nil + return nil } -func (e *exporterImp) pushLogData(ctx context.Context, logs pdata.Logs) (int, error) { +func (e *exporterImp) pushLogData(ctx context.Context, logs pdata.Logs) error { request := &otlplogs.ExportLogsServiceRequest{ ResourceLogs: internal.LogsToOtlp(logs.InternalRep()), } - err := e.w.exportLogs(ctx, request) - - if err != nil { - return logs.LogRecordCount(), fmt.Errorf("failed to push log data via OTLP exporter: %w", err) + if err := e.w.exportLogs(ctx, request); err != nil { + return fmt.Errorf("failed to push log data via OTLP exporter: %w", err) } - return 0, nil + return nil } type grpcSender struct { diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 252478acedb..f004b4a2fec 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -85,46 +85,46 @@ func newExporter(cfg configmodels.Exporter, logger *zap.Logger) (*exporterImp, e }, nil } -func (e *exporterImp) pushTraceData(ctx context.Context, traces pdata.Traces) (int, error) { +func (e *exporterImp) pushTraceData(ctx context.Context, traces pdata.Traces) error { request, err := traces.ToOtlpProtoBytes() if err != nil { - return traces.SpanCount(), consumererror.Permanent(err) + return consumererror.Permanent(err) } err = e.export(ctx, e.tracesURL, request) if err != nil { - return traces.SpanCount(), err + return err } - return 0, nil + return nil } -func (e *exporterImp) pushMetricsData(ctx context.Context, metrics pdata.Metrics) (int, error) { +func (e *exporterImp) pushMetricsData(ctx context.Context, metrics pdata.Metrics) error { request, err := metrics.ToOtlpProtoBytes() if err != nil { - return metrics.MetricCount(), consumererror.Permanent(err) + return consumererror.Permanent(err) } err = e.export(ctx, e.metricsURL, request) if err != nil { - return metrics.MetricCount(), err + return err } - return 0, nil + return nil } -func (e *exporterImp) pushLogData(ctx context.Context, logs pdata.Logs) (int, error) { +func (e *exporterImp) pushLogData(ctx context.Context, logs pdata.Logs) error { request, err := logs.ToOtlpProtoBytes() if err != nil { - return logs.LogRecordCount(), consumererror.Permanent(err) + return consumererror.Permanent(err) } err = e.export(ctx, e.logsURL, request) if err != nil { - return logs.LogRecordCount(), err + return err } - return 0, nil + return nil } func (e *exporterImp) export(ctx context.Context, url string, request []byte) error { diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 0f4d993d790..ad66be0a563 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -90,13 +90,13 @@ func (prwe *PrwExporter) Shutdown(context.Context) error { // PushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of // TimeSeries, validates and handles each individual metric, adding the converted TimeSeries to the map, and finally // exports the map. -func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int, error) { +func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) error { prwe.wg.Add(1) defer prwe.wg.Done() select { case <-prwe.closeChan: - return md.MetricCount(), errors.New("shutdown has been called") + return errors.New("shutdown has been called") default: tsMap := map[string]*prompb.TimeSeries{} dropped := 0 @@ -154,10 +154,10 @@ func (prwe *PrwExporter) PushMetrics(ctx context.Context, md pdata.Metrics) (int } if dropped != 0 { - return dropped, consumererror.CombineErrors(errs) + return consumererror.CombineErrors(errs) } - return 0, nil + return nil } } diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 55afb473633..fce766f18fc 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -137,8 +137,7 @@ func Test_Shutdown(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, ok := prwe.PushMetrics(context.Background(), testdata.GenerateMetricsEmpty()) - errChan <- ok + errChan <- prwe.PushMetrics(context.Background(), testdata.GenerateMetricsEmpty()) }() } wg.Wait() @@ -157,7 +156,7 @@ func Test_export(t *testing.T) { sample2 := getSample(floatVal2, msTime2) ts1 := getTimeSeries(labels, sample1, sample2) handleFunc := func(w http.ResponseWriter, r *http.Request, code int) { - // The following is a handler function that reads the sent httpRequest, unmarshals, and checks if the WriteRequest + // The following is a handler function that reads the sent httpRequest, unmarshal, and checks if the WriteRequest // preserves the TimeSeries data correctly body, err := ioutil.ReadAll(r.Body) if err != nil { @@ -486,13 +485,12 @@ func Test_PushMetrics(t *testing.T) { } tests := []struct { - name string - md *pdata.Metrics - reqTestFunc func(t *testing.T, r *http.Request, expected int) - expectedTimeSeries int - httpResponseCode int - numDroppedTimeSeries int - returnErr bool + name string + md *pdata.Metrics + reqTestFunc func(t *testing.T, r *http.Request, expected int) + expectedTimeSeries int + httpResponseCode int + returnErr bool }{ { "invalid_type_case", @@ -500,7 +498,6 @@ func Test_PushMetrics(t *testing.T) { nil, 0, http.StatusAccepted, - invalidTypeBatch.MetricCount(), true, }, { @@ -509,7 +506,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 2, http.StatusAccepted, - 0, false, }, { @@ -518,7 +514,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 2, http.StatusAccepted, - 0, false, }, { @@ -527,7 +522,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 2, http.StatusAccepted, - 0, false, }, { @@ -536,7 +530,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 2, http.StatusAccepted, - 0, false, }, { @@ -545,7 +538,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 12, http.StatusAccepted, - 0, false, }, { @@ -554,7 +546,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 12, http.StatusAccepted, - 0, false, }, { @@ -563,7 +554,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 10, http.StatusAccepted, - 0, false, }, { @@ -572,7 +562,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 5, http.StatusAccepted, - 0, false, }, { @@ -581,7 +570,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 5, http.StatusAccepted, - 0, false, }, { @@ -590,7 +578,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 5, http.StatusServiceUnavailable, - 1, true, }, { @@ -599,7 +586,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 0, http.StatusAccepted, - nilDataPointDoubleGaugeBatch.MetricCount(), true, }, { @@ -608,7 +594,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 0, http.StatusAccepted, - nilDataPointIntGaugeBatch.MetricCount(), true, }, { @@ -617,7 +602,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 0, http.StatusAccepted, - nilDataPointDoubleSumBatch.MetricCount(), true, }, { @@ -626,7 +610,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 0, http.StatusAccepted, - nilDataPointIntSumBatch.MetricCount(), true, }, { @@ -635,7 +618,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 0, http.StatusAccepted, - nilDataPointDoubleHistogramBatch.MetricCount(), true, }, { @@ -644,7 +626,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 0, http.StatusAccepted, - nilDataPointIntHistogramBatch.MetricCount(), true, }, { @@ -653,7 +634,6 @@ func Test_PushMetrics(t *testing.T) { checkFunc, 0, http.StatusAccepted, - nilDataPointDoubleSummaryBatch.MetricCount(), true, }, } @@ -691,8 +671,7 @@ func Test_PushMetrics(t *testing.T) { c := http.DefaultClient prwe, nErr := NewPrwExporter(config.Namespace, serverURL.String(), c, map[string]string{}) require.NoError(t, nErr) - numDroppedTimeSeries, err := prwe.PushMetrics(context.Background(), *tt.md) - assert.Equal(t, tt.numDroppedTimeSeries, numDroppedTimeSeries) + err := prwe.PushMetrics(context.Background(), *tt.md) if tt.returnErr { assert.Error(t, err) return diff --git a/exporter/zipkinexporter/zipkin.go b/exporter/zipkinexporter/zipkin.go index 8054a2244a7..dcfd864aac5 100644 --- a/exporter/zipkinexporter/zipkin.go +++ b/exporter/zipkinexporter/zipkin.go @@ -65,30 +65,30 @@ func createZipkinExporter(cfg *Config) (*zipkinExporter, error) { return ze, nil } -func (ze *zipkinExporter) pushTraceData(ctx context.Context, td pdata.Traces) (int, error) { +func (ze *zipkinExporter) pushTraceData(ctx context.Context, td pdata.Traces) error { tbatch, err := zipkin.InternalTracesToZipkinSpans(td) if err != nil { - return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)) + return consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)) } body, err := ze.serializer.Serialize(tbatch) if err != nil { - return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)) + return consumererror.Permanent(fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err)) } req, err := http.NewRequestWithContext(ctx, "POST", ze.url, bytes.NewReader(body)) if err != nil { - return td.SpanCount(), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err) + return fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err) } req.Header.Set("Content-Type", ze.serializer.ContentType()) resp, err := ze.client.Do(req) if err != nil { - return td.SpanCount(), fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err) + return fmt.Errorf("failed to push trace data via Zipkin exporter: %w", err) } _ = resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode > 299 { - return td.SpanCount(), fmt.Errorf("failed the request with status code %d", resp.StatusCode) + return fmt.Errorf("failed the request with status code %d", resp.StatusCode) } - return 0, nil + return nil }