Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "inserted" metrics to processors #10372

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/processorhelper-inserted-api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: component/componenttest

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added additional "inserted" count to `TestTelemetry.CheckProcessor*` methods.

# One or more tracking issues or pull requests related to the change
issues: [10353]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
29 changes: 29 additions & 0 deletions .chloggen/processorhelper-inserted.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processorhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add "inserted" metrics for processors.

# One or more tracking issues or pull requests related to the change
issues: [10353]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This includes the following metrics for processors:
- `processor_inserted_spans`
- `processor_inserted_metric_points`
- `processor_inserted_log_records`

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
12 changes: 6 additions & 6 deletions component/componenttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,20 @@ func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64) err

// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error {
return tts.prometheusChecker.checkProcessorTraces(tts.id, acceptedSpans, refusedSpans, droppedSpans)
func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans, insertedSpans int64) error {
return tts.prometheusChecker.checkProcessorTraces(tts.id, acceptedSpans, refusedSpans, droppedSpans, insertedSpans)
}

// CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error {
return tts.prometheusChecker.checkProcessorMetrics(tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints)
func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints, insertedMetricPoints int64) error {
return tts.prometheusChecker.checkProcessorMetrics(tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints, insertedMetricPoints)
}

// CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error {
return tts.prometheusChecker.checkProcessorLogs(tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords)
func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords, insertedLogRecords int64) error {
return tts.prometheusChecker.checkProcessorLogs(tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords, insertedLogRecords)
}

// CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values.
Expand Down
18 changes: 10 additions & 8 deletions component/componenttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,26 @@ func (pc *prometheusChecker) checkReceiver(receiver component.ID, datatype, prot
pc.checkCounter(fmt.Sprintf("receiver_refused_%s", datatype), droppedMetricPoints, receiverAttrs))
}

func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "spans", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, accepted, refused, dropped, inserted int64) error {
return pc.checkProcessor(processor, "spans", accepted, refused, dropped, inserted)
}

func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "metric_points", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, accepted, refused, dropped, inserted int64) error {
return pc.checkProcessor(processor, "metric_points", accepted, refused, dropped, inserted)
}

func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "log_records", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, accepted, refused, dropped, inserted int64) error {
return pc.checkProcessor(processor, "log_records", accepted, refused, dropped, inserted)
}

func (pc *prometheusChecker) checkProcessor(processor component.ID, datatype string, accepted, refused, dropped int64) error {
func (pc *prometheusChecker) checkProcessor(processor component.ID, datatype string, accepted, refused, dropped, inserted int64) error {
processorAttrs := attributesForProcessorMetrics(processor)
return multierr.Combine(
pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, processorAttrs))
pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_inserted_%s", datatype), inserted, processorAttrs),
)
}

func (pc *prometheusChecker) checkExporterTraces(exporter component.ID, sent, sendFailed int64) error {
Expand Down
6 changes: 3 additions & 3 deletions component/componenttest/otelprometheuschecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,17 @@ func TestPromChecker(t *testing.T) {
)

assert.NoError(t,
pc.checkProcessorTraces(processor, 42, 13, 7),
pc.checkProcessorTraces(processor, 42, 13, 7, 5),
"metrics from Receiver Traces should be valid",
)

assert.NoError(t,
pc.checkProcessorMetrics(processor, 7, 41, 13),
pc.checkProcessorMetrics(processor, 7, 41, 13, 4),
"metrics from Receiver Metrics should be valid",
)

assert.NoError(t,
pc.checkProcessorLogs(processor, 102, 35, 14),
pc.checkProcessorLogs(processor, 102, 35, 14, 3),
"metrics from Receiver Logs should be valid",
)

Expand Down
9 changes: 9 additions & 0 deletions component/componenttest/testdata/prometheus_response
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ processor_refused_spans{processor="fakeProcessor"} 13
# HELP processor_dropped_spans Number of spans that were dropped.
# TYPE processor_dropped_spans counter
processor_dropped_spans{processor="fakeProcessor"} 7
# HELP processor_inserted_spans Number of spans that were inserted.
# TYPE processor_inserted_spans counter
processor_inserted_spans{processor="fakeProcessor"} 5
# HELP processor_accepted_metric_points Number of metric points successfully pushed into the next component in the pipeline.
# TYPE processor_accepted_metric_points counter
processor_accepted_metric_points{processor="fakeProcessor"} 7
Expand All @@ -34,6 +37,9 @@ processor_refused_metric_points{processor="fakeProcessor"} 41
# HELP processor_dropped_metric_points Number of metric points that were dropped.
# TYPE processor_dropped_metric_points counter
processor_dropped_metric_points{processor="fakeProcessor"} 13
# HELP processor_inserted_metric_points Number of metric points that were inserted.
# TYPE processor_inserted_metric_points counter
processor_inserted_metric_points{processor="fakeProcessor"} 4
# HELP processor_accepted_log_records Number of log records successfully pushed into the next component in the pipeline.
# TYPE processor_accepted_log_records counter
processor_accepted_log_records{processor="fakeProcessor"} 102
Expand All @@ -43,6 +49,9 @@ processor_refused_log_records{processor="fakeProcessor"} 35
# HELP processor_dropped_log_records Number of log records that were dropped.
# TYPE processor_dropped_log_records counter
processor_dropped_log_records{processor="fakeProcessor"} 14
# HELP processor_inserted_log_records Number of log records that were inserted.
# TYPE processor_inserted_log_records counter
processor_inserted_log_records{processor="fakeProcessor"} 3
# HELP receiver_accepted_log_records Number of log records successfully pushed into the pipeline.
# TYPE receiver_accepted_log_records counter
receiver_accepted_log_records{receiver="fakeReceiver",transport="fakeTransport"} 102
Expand Down
24 changes: 24 additions & 0 deletions processor/processorhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,30 @@ Number of spans that were dropped.
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_inserted_log_records

Number of log records that were inserted.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_inserted_metric_points

Number of metric points that were inserted.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_inserted_spans

Number of spans that were inserted.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_refused_log_records

Number of log records that were rejected by the next component in the pipeline.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion processor/processorhelper/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ telemetry:
value_type: int
monotonic: true

processor_inserted_spans:
enabled: true
description: Number of spans that were inserted.
unit: 1
sum:
value_type: int
monotonic: true

processor_accepted_metric_points:
enabled: true
description: Number of metric points successfully pushed into the next component in the pipeline.
Expand All @@ -57,6 +65,14 @@ telemetry:
value_type: int
monotonic: true

processor_inserted_metric_points:
enabled: true
description: Number of metric points that were inserted.
unit: 1
sum:
value_type: int
monotonic: true

processor_accepted_log_records:
enabled: true
description: Number of log records successfully pushed into the next component in the pipeline.
Expand All @@ -79,4 +95,12 @@ telemetry:
unit: 1
sum:
value_type: int
monotonic: true
monotonic: true

processor_inserted_log_records:
enabled: true
description: Number of log records that were inserted.
unit: 1
sum:
value_type: int
monotonic: true
41 changes: 30 additions & 11 deletions processor/processorhelper/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,69 +60,88 @@ func newObsReport(cfg ObsReportSettings) (*ObsReport, error) {
}, nil
}

func (or *ObsReport) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) {
var acceptedCount, refusedCount, droppedCount metric.Int64Counter
func (or *ObsReport) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped, inserted int64) {
var acceptedCount, refusedCount, droppedCount, insertedCount metric.Int64Counter
switch dataType {
case component.DataTypeTraces:
acceptedCount = or.telemetryBuilder.ProcessorAcceptedSpans
refusedCount = or.telemetryBuilder.ProcessorRefusedSpans
droppedCount = or.telemetryBuilder.ProcessorDroppedSpans
insertedCount = or.telemetryBuilder.ProcessorInsertedSpans
case component.DataTypeMetrics:
acceptedCount = or.telemetryBuilder.ProcessorAcceptedMetricPoints
refusedCount = or.telemetryBuilder.ProcessorRefusedMetricPoints
droppedCount = or.telemetryBuilder.ProcessorDroppedMetricPoints
insertedCount = or.telemetryBuilder.ProcessorInsertedMetricPoints
case component.DataTypeLogs:
acceptedCount = or.telemetryBuilder.ProcessorAcceptedLogRecords
refusedCount = or.telemetryBuilder.ProcessorRefusedLogRecords
droppedCount = or.telemetryBuilder.ProcessorDroppedLogRecords
insertedCount = or.telemetryBuilder.ProcessorInsertedLogRecords
}

acceptedCount.Add(ctx, accepted, metric.WithAttributes(or.otelAttrs...))
refusedCount.Add(ctx, refused, metric.WithAttributes(or.otelAttrs...))
droppedCount.Add(ctx, dropped, metric.WithAttributes(or.otelAttrs...))
insertedCount.Add(ctx, inserted, metric.WithAttributes(or.otelAttrs...))
}

// TracesAccepted reports that the trace data was accepted.
func (or *ObsReport) TracesAccepted(ctx context.Context, numSpans int) {
or.recordData(ctx, component.DataTypeTraces, int64(numSpans), int64(0), int64(0))
or.recordData(ctx, component.DataTypeTraces, int64(numSpans), int64(0), int64(0), int64(0))
}

// TracesRefused reports that the trace data was refused.
func (or *ObsReport) TracesRefused(ctx context.Context, numSpans int) {
or.recordData(ctx, component.DataTypeTraces, int64(0), int64(numSpans), int64(0))
or.recordData(ctx, component.DataTypeTraces, int64(0), int64(numSpans), int64(0), int64(0))
}

// TracesDropped reports that the trace data was dropped.
func (or *ObsReport) TracesDropped(ctx context.Context, numSpans int) {
or.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(numSpans))
or.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(numSpans), int64(0))
}

// TracesInserted reports that the trace data was inserted.
func (or *ObsReport) TracesInserted(ctx context.Context, numSpans int) {
or.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(0), int64(numSpans))
}

// MetricsAccepted reports that the metrics were accepted.
func (or *ObsReport) MetricsAccepted(ctx context.Context, numPoints int) {
or.recordData(ctx, component.DataTypeMetrics, int64(numPoints), int64(0), int64(0))
or.recordData(ctx, component.DataTypeMetrics, int64(numPoints), int64(0), int64(0), int64(0))
}

// MetricsRefused reports that the metrics were refused.
func (or *ObsReport) MetricsRefused(ctx context.Context, numPoints int) {
or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(numPoints), int64(0))
or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(numPoints), int64(0), int64(0))
}

// MetricsDropped reports that the metrics were dropped.
func (or *ObsReport) MetricsDropped(ctx context.Context, numPoints int) {
or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(numPoints))
or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(numPoints), int64(0))
}

// MetricsInserted reports that the metrics were inserted.
func (or *ObsReport) MetricsInserted(ctx context.Context, numPoints int) {
or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(0), int64(numPoints))
}

// LogsAccepted reports that the logs were accepted.
func (or *ObsReport) LogsAccepted(ctx context.Context, numRecords int) {
or.recordData(ctx, component.DataTypeLogs, int64(numRecords), int64(0), int64(0))
or.recordData(ctx, component.DataTypeLogs, int64(numRecords), int64(0), int64(0), int64(0))
}

// LogsRefused reports that the logs were refused.
func (or *ObsReport) LogsRefused(ctx context.Context, numRecords int) {
or.recordData(ctx, component.DataTypeLogs, int64(0), int64(numRecords), int64(0))
or.recordData(ctx, component.DataTypeLogs, int64(0), int64(numRecords), int64(0), int64(0))
}

// LogsDropped reports that the logs were dropped.
func (or *ObsReport) LogsDropped(ctx context.Context, numRecords int) {
or.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(numRecords))
or.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(numRecords), int64(0))
}

// LogsInserted reports that the logs were inserted.
func (or *ObsReport) LogsInserted(ctx context.Context, numRecords int) {
or.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(0), int64(numRecords))
}
Loading
Loading