From eb1ca235b71e775f75b21634773a53de8caa4850 Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Thu, 13 Jun 2024 10:06:53 -0700 Subject: [PATCH 1/2] [exporterhelper] record metric should log the number of log records before the data are sent to the consumers downstream --- .chloggen/exporterhelper_metric_fix.yaml | 20 ++++++++++++++++++++ exporter/exporterhelper/logs.go | 3 ++- exporter/exporterhelper/logs_test.go | 22 ++++++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 .chloggen/exporterhelper_metric_fix.yaml diff --git a/.chloggen/exporterhelper_metric_fix.yaml b/.chloggen/exporterhelper_metric_fix.yaml new file mode 100644 index 00000000000..d453390be74 --- /dev/null +++ b/.chloggen/exporterhelper_metric_fix.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: There is no guarantee that after the exporterhelper sends the plog data downstream that the data won't be mutated in some way. (e.g by the batch_sender) This mutation could result in the proceeding call to req.ItemsCount() to provide inaccurate information to be logged. + +# One or more tracking issues or pull requests related to the change +issues: [10033] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index c8505f5b97c..a996cb932e6 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -155,7 +155,8 @@ func newLogsExporterWithObservability(obsrep *ObsReport) requestSender { func (lewo *logsExporterWithObservability) send(ctx context.Context, req Request) error { c := lewo.obsrep.StartLogsOp(ctx) + numLogRecords := req.ItemsCount() err := lewo.nextSender.send(c, req) - lewo.obsrep.EndLogsOp(c, req.ItemsCount(), err) + lewo.obsrep.EndLogsOp(c, numLogRecords, err) return err } diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index c6b48ebdf41..aa50cf1f623 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -191,6 +191,21 @@ func TestLogsExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForLogsExporter(t, tt, le, nil) } +func TestLogsExporter_pLogModifiedDownStream_WithRecordMetrics(t *testing.T) { + tt, err := componenttest.SetupTelemetry(fakeLogsExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + le, err := NewLogsExporter(context.Background(), exporter.Settings{ID: fakeLogsExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeLogsExporterConfig, newPushLogsDataModifiedDownstream(nil), WithCapabilities(consumer.Capabilities{MutatesData: true})) + assert.NotNil(t, le) + assert.NoError(t, err) + ld := testdata.GenerateLogs(2) + + assert.NoError(t, le.ConsumeLogs(context.Background(), ld)) + assert.Equal(t, 0, ld.LogRecordCount()) + require.NoError(t, tt.CheckExporterLogs(int64(2), 0)) +} + func TestLogsRequestExporter_WithRecordMetrics(t *testing.T) { tt, err := componenttest.SetupTelemetry(fakeLogsExporterName) require.NoError(t, err) @@ -359,6 +374,13 @@ func TestLogsRequestExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, le.Shutdown(context.Background()), want) } +func newPushLogsDataModifiedDownstream(retError error) consumer.ConsumeLogsFunc { + return func(_ context.Context, log plog.Logs) error { + log.ResourceLogs().MoveAndAppendTo(plog.NewResourceLogsSlice()) + return retError + } +} + func newPushLogsData(retError error) consumer.ConsumeLogsFunc { return func(_ context.Context, _ plog.Logs) error { return retError From 3aa6181f461d9cebffca51e0d32da848433dbef6 Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Wed, 24 Jul 2024 01:17:08 -0700 Subject: [PATCH 2/2] add trace and metric data type --- .chloggen/exporterhelper_metric_fix.yaml | 2 +- exporter/exporterhelper/metrics.go | 3 ++- exporter/exporterhelper/metrics_test.go | 21 +++++++++++++++++++++ exporter/exporterhelper/traces.go | 3 ++- exporter/exporterhelper/traces_test.go | 22 ++++++++++++++++++++++ 5 files changed, 48 insertions(+), 3 deletions(-) diff --git a/.chloggen/exporterhelper_metric_fix.yaml b/.chloggen/exporterhelper_metric_fix.yaml index d453390be74..e83849a4603 100644 --- a/.chloggen/exporterhelper_metric_fix.yaml +++ b/.chloggen/exporterhelper_metric_fix.yaml @@ -7,7 +7,7 @@ change_type: bug_fix component: exporterhelper # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: There is no guarantee that after the exporterhelper sends the plog data downstream that the data won't be mutated in some way. (e.g by the batch_sender) This mutation could result in the proceeding call to req.ItemsCount() to provide inaccurate information to be logged. +note: There is no guarantee that after the exporterhelper sends the plog/pmetric/ptrace data downstream that the data won't be mutated in some way. (e.g by the batch_sender) This mutation could result in the proceeding call to req.ItemsCount() to provide inaccurate information to be logged. # One or more tracking issues or pull requests related to the change issues: [10033] diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 683dc576f7a..e0412230ed9 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -155,7 +155,8 @@ func newMetricsSenderWithObservability(obsrep *ObsReport) requestSender { func (mewo *metricsSenderWithObservability) send(ctx context.Context, req Request) error { c := mewo.obsrep.StartMetricsOp(ctx) + numMetricDataPoints := req.ItemsCount() err := mewo.nextSender.send(c, req) - mewo.obsrep.EndMetricsOp(c, req.ItemsCount(), err) + mewo.obsrep.EndMetricsOp(c, numMetricDataPoints, err) return err } diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 062c3af7297..f849ded612c 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -192,6 +192,21 @@ func TestMetricsExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForMetricsExporter(t, tt, me, nil) } +func TestMetricsExporter_pMetricModifiedDownStream_WithRecordMetrics(t *testing.T) { + tt, err := componenttest.SetupTelemetry(fakeMetricsExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + me, err := NewMetricsExporter(context.Background(), exporter.Settings{ID: fakeMetricsExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeMetricsExporterConfig, newPushMetricsDataModifiedDownstream(nil), WithCapabilities(consumer.Capabilities{MutatesData: true})) + require.NoError(t, err) + require.NotNil(t, me) + md := testdata.GenerateMetrics(2) + + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.Equal(t, 0, md.MetricCount()) + require.NoError(t, tt.CheckExporterMetrics(int64(4), 0)) +} + func TestMetricsRequestExporter_WithRecordMetrics(t *testing.T) { tt, err := componenttest.SetupTelemetry(fakeMetricsExporterName) require.NoError(t, err) @@ -370,6 +385,12 @@ func newPushMetricsData(retError error) consumer.ConsumeMetricsFunc { return retError } } +func newPushMetricsDataModifiedDownstream(retError error) consumer.ConsumeMetricsFunc { + return func(_ context.Context, metric pmetric.Metrics) error { + metric.ResourceMetrics().MoveAndAppendTo(pmetric.NewResourceMetricsSlice()) + return retError + } +} func checkRecordedMetricsForMetricsExporter(t *testing.T, tt componenttest.TestTelemetry, me exporter.Metrics, wantError error) { md := testdata.GenerateMetrics(2) diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index e510bae3dd3..6017516c2ea 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -155,8 +155,9 @@ func newTracesExporterWithObservability(obsrep *ObsReport) requestSender { func (tewo *tracesExporterWithObservability) send(ctx context.Context, req Request) error { c := tewo.obsrep.StartTracesOp(ctx) + numTraceSpans := req.ItemsCount() // Forward the data to the next consumer (this pusher is the next). err := tewo.nextSender.send(c, req) - tewo.obsrep.EndTracesOp(c, req.ItemsCount(), err) + tewo.obsrep.EndTracesOp(c, numTraceSpans, err) return err } diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 60f0006a10c..82baa5c229c 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -189,6 +189,21 @@ func TestTracesExporter_WithRecordMetrics(t *testing.T) { checkRecordedMetricsForTracesExporter(t, tt, te, nil) } +func TestTracesExporter_pLogModifiedDownStream_WithRecordMetrics(t *testing.T) { + tt, err := componenttest.SetupTelemetry(fakeTracesExporterName) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + te, err := NewTracesExporter(context.Background(), exporter.Settings{ID: fakeTracesExporterName, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, &fakeTracesExporterConfig, newTraceDataPusherModifiedDownstream(nil), WithCapabilities(consumer.Capabilities{MutatesData: true})) + assert.NotNil(t, te) + assert.NoError(t, err) + td := testdata.GenerateTraces(2) + + assert.NoError(t, te.ConsumeTraces(context.Background(), td)) + assert.Equal(t, 0, td.SpanCount()) + require.NoError(t, tt.CheckExporterTraces(int64(2), 0)) +} + func TestTracesRequestExporter_WithRecordMetrics(t *testing.T) { tt, err := componenttest.SetupTelemetry(fakeTracesExporterName) require.NoError(t, err) @@ -372,6 +387,13 @@ func newTraceDataPusher(retError error) consumer.ConsumeTracesFunc { } } +func newTraceDataPusherModifiedDownstream(retError error) consumer.ConsumeTracesFunc { + return func(_ context.Context, trace ptrace.Traces) error { + trace.ResourceSpans().MoveAndAppendTo(ptrace.NewResourceSpansSlice()) + return retError + } +} + func checkRecordedMetricsForTracesExporter(t *testing.T, tt componenttest.TestTelemetry, te exporter.Traces, wantError error) { td := testdata.GenerateTraces(2) const numBatches = 7