From 0462e5c85d1b9b2ad51eff36d6cf98e692689e0b Mon Sep 17 00:00:00 2001 From: zpzhuSplunk <127359723+zpzhuSplunk@users.noreply.github.com> Date: Fri, 26 Jul 2024 08:21:13 -0700 Subject: [PATCH] [exporterhelper] record metric should log the number of log records before the data are sent to the consumers downstream (#10402) #### Description The sender metric within the exporterhelper should measure the number of items coming into the sender, not what was done with the items downstream, if the components downstream are mutable. An example of this is provided as a unit test within this PR. This PR also addresses nil panics that some users are experiencing. #### Link to tracking issue Fixes https://github.com/open-telemetry/opentelemetry-collector/issues/10033 --- .chloggen/exporterhelper_metric_fix.yaml | 20 ++++++++++++++++++++ exporter/exporterhelper/logs.go | 3 ++- exporter/exporterhelper/logs_test.go | 22 ++++++++++++++++++++++ exporter/exporterhelper/metrics.go | 3 ++- exporter/exporterhelper/metrics_test.go | 21 +++++++++++++++++++++ exporter/exporterhelper/traces.go | 3 ++- exporter/exporterhelper/traces_test.go | 22 ++++++++++++++++++++++ 7 files changed, 91 insertions(+), 3 deletions(-) create mode 100644 .chloggen/exporterhelper_metric_fix.yaml diff --git a/.chloggen/exporterhelper_metric_fix.yaml b/.chloggen/exporterhelper_metric_fix.yaml new file mode 100644 index 00000000000..e83849a4603 --- /dev/null +++ b/.chloggen/exporterhelper_metric_fix.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: There is no guarantee that after the exporterhelper sends the plog/pmetric/ptrace data downstream that the data won't be mutated in some way. (e.g by the batch_sender) This mutation could result in the proceeding call to req.ItemsCount() to provide inaccurate information to be logged. + +# One or more tracking issues or pull requests related to the change +issues: [10033] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index c8505f5b97c..a996cb932e6 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -155,7 +155,8 @@ func newLogsExporterWithObservability(obsrep *ObsReport) requestSender { func (lewo *logsExporterWithObservability) send(ctx context.Context, req Request) error { c := lewo.obsrep.StartLogsOp(ctx) + numLogRecords := req.ItemsCount() err := lewo.nextSender.send(c, req) - lewo.obsrep.EndLogsOp(c, req.ItemsCount(), err) + lewo.obsrep.EndLogsOp(c, numLogRecords, err) return err } diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index c6b48ebdf41..aa50cf1f623 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -191,6 +191,21 @@ func TestLogsExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForLogsExporter(t, tt, le, nil) } +func TestLogsExporter_pLogModifiedDownStream_WithRecordMetrics(t *testing.T) { + tt, err := componenttest.SetupTelemetry(fakeLogsExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + le, err := NewLogsExporter(context.Background(), exporter.Settings{ID: fakeLogsExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeLogsExporterConfig, newPushLogsDataModifiedDownstream(nil), WithCapabilities(consumer.Capabilities{MutatesData: true})) + assert.NotNil(t, le) + assert.NoError(t, err) + ld := testdata.GenerateLogs(2) + + assert.NoError(t, le.ConsumeLogs(context.Background(), ld)) + assert.Equal(t, 0, ld.LogRecordCount()) + require.NoError(t, tt.CheckExporterLogs(int64(2), 0)) +} + func TestLogsRequestExporter_WithRecordMetrics(t *testing.T) { tt, err := componenttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) @@ -359,6 +374,13 @@ func TestLogsRequestExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, le.Shutdown(context.Background()), want) } +func newPushLogsDataModifiedDownstream(retError error) consumer.ConsumeLogsFunc { + return func(_ context.Context, log plog.Logs) error { + log.ResourceLogs().MoveAndAppendTo(plog.NewResourceLogsSlice()) + return retError + } +} + func newPushLogsData(retError error) consumer.ConsumeLogsFunc { return func(_ context.Context, _ plog.Logs) error { return retError diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 683dc576f7a..e0412230ed9 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -155,7 +155,8 @@ func newMetricsSenderWithObservability(obsrep *ObsReport) requestSender { func (mewo *metricsSenderWithObservability) send(ctx context.Context, req Request) error { c := mewo.obsrep.StartMetricsOp(ctx) + numMetricDataPoints := req.ItemsCount() err := mewo.nextSender.send(c, req) - mewo.obsrep.EndMetricsOp(c, req.ItemsCount(), err) + mewo.obsrep.EndMetricsOp(c, numMetricDataPoints, err) return err } diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 062c3af7297..f849ded612c 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -192,6 +192,21 @@ func TestMetricsExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForMetricsExporter(t, tt, me, nil) } +func TestMetricsExporter_pMetricModifiedDownStream_WithRecordMetrics(t *testing.T) { + tt, err := componenttest.SetupTelemetry(fakeMetricsExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + me, err := NewMetricsExporter(context.Background(), exporter.Settings{ID: fakeMetricsExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeMetricsExporterConfig, newPushMetricsDataModifiedDownstream(nil), WithCapabilities(consumer.Capabilities{MutatesData: true})) + require.NoError(t, err) + require.NotNil(t, me) + md := testdata.GenerateMetrics(2) + + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.Equal(t, 0, md.MetricCount()) + require.NoError(t, tt.CheckExporterMetrics(int64(4), 0)) +} + func TestMetricsRequestExporter_WithRecordMetrics(t *testing.T) { tt, err := componenttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) @@ -370,6 +385,12 @@ func newPushMetricsData(retError error) consumer.ConsumeMetricsFunc { return retError } } +func newPushMetricsDataModifiedDownstream(retError error) consumer.ConsumeMetricsFunc { + return func(_ context.Context, metric pmetric.Metrics) error { + metric.ResourceMetrics().MoveAndAppendTo(pmetric.NewResourceMetricsSlice()) + return retError + } +} func checkRecordedMetricsForMetricsExporter(t *testing.T, tt componenttest.TestTelemetry, me exporter.Metrics, wantError error) { md := testdata.GenerateMetrics(2) diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index e510bae3dd3..6017516c2ea 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -155,8 +155,9 @@ func newTracesExporterWithObservability(obsrep *ObsReport) requestSender { func (tewo *tracesExporterWithObservability) send(ctx context.Context, req Request) error { c := tewo.obsrep.StartTracesOp(ctx) + numTraceSpans := req.ItemsCount() // Forward the data to the next consumer (this pusher is the next). err := tewo.nextSender.send(c, req) - tewo.obsrep.EndTracesOp(c, req.ItemsCount(), err) + tewo.obsrep.EndTracesOp(c, numTraceSpans, err) return err } diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 60f0006a10c..82baa5c229c 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -189,6 +189,21 @@ func TestTracesExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForTracesExporter(t, tt, te, nil) } +func TestTracesExporter_pLogModifiedDownStream_WithRecordMetrics(t *testing.T) { + tt, err := componenttest.SetupTelemetry(fakeTracesExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + te, err := NewTracesExporter(context.Background(), exporter.Settings{ID: fakeTracesExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeTracesExporterConfig, newTraceDataPusherModifiedDownstream(nil), WithCapabilities(consumer.Capabilities{MutatesData: true})) + assert.NotNil(t, te) + assert.NoError(t, err) + td := testdata.GenerateTraces(2) + + assert.NoError(t, te.ConsumeTraces(context.Background(), td)) + assert.Equal(t, 0, td.SpanCount()) + require.NoError(t, tt.CheckExporterTraces(int64(2), 0)) +} + func TestTracesRequestExporter_WithRecordMetrics(t *testing.T) { tt, err := componenttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) @@ -372,6 +387,13 @@ func newTraceDataPusher(retError error) consumer.ConsumeTracesFunc { } } +func newTraceDataPusherModifiedDownstream(retError error) consumer.ConsumeTracesFunc { + return func(_ context.Context, trace ptrace.Traces) error { + trace.ResourceSpans().MoveAndAppendTo(ptrace.NewResourceSpansSlice()) + return retError + } +} + func checkRecordedMetricsForTracesExporter(t *testing.T, tt componenttest.TestTelemetry, te exporter.Traces, wantError error) { td := testdata.GenerateTraces(2) const numBatches = 7