diff --git a/.chloggen/fix_probabilisticsamplerprocessor-panic.yaml b/.chloggen/fix_probabilisticsamplerprocessor-panic.yaml new file mode 100755 index 000000000000..3db78c61e888 --- /dev/null +++ b/.chloggen/fix_probabilisticsamplerprocessor-panic.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: probabilisticsamplerprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix panic in probabilisticsampler processor if a log attribute to be sampled on is of type String. + +# One or more tracking issues related to the change +issues: [18222] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/processor/probabilisticsamplerprocessor/logsprocessor.go b/processor/probabilisticsamplerprocessor/logsprocessor.go index 4b42e76e4ce8..3701679744fb 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor.go @@ -52,7 +52,6 @@ func (lsp *logSamplerProcessor) processLogs(ctx context.Context, ld plog.Logs) ( ld.ResourceLogs().RemoveIf(func(rl plog.ResourceLogs) bool { rl.ScopeLogs().RemoveIf(func(ill plog.ScopeLogs) bool { ill.LogRecords().RemoveIf(func(l plog.LogRecord) bool { - tagPolicyValue := "always_sampling" // pick the sampling source. var lidBytes []byte @@ -64,7 +63,19 @@ func (lsp *logSamplerProcessor) processLogs(ctx context.Context, ld plog.Logs) ( if lidBytes == nil && lsp.samplingSource != "" { if value, ok := l.Attributes().Get(lsp.samplingSource); ok { tagPolicyValue = lsp.samplingSource - lidBytes = value.Bytes().AsRaw() + + switch value.Type() { + case pcommon.ValueTypeStr: + lidBytes = []byte(value.Str()) + case pcommon.ValueTypeBytes: + lidBytes = value.Bytes().AsRaw() + default: + lsp.logger.Warn("Incompatible log record attribute, only String or Bytes supported; skipping log record", + zap.String("log_record_attribute", lsp.samplingSource), zap.Stringer("attribute_type", value.Type())) + + return true + } + } } priority := lsp.scaledSamplingRate diff --git a/processor/probabilisticsamplerprocessor/logsprocessor_test.go b/processor/probabilisticsamplerprocessor/logsprocessor_test.go index 1c5e6f5a6647..5b2b0213e820 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor_test.go @@ -134,6 +134,15 @@ func TestLogsSampling(t *testing.T) { }, received: 25, }, + { + name: "sampling a string attribute", + cfg: &Config{ + SamplingPercentage: 50, + AttributeSource: recordAttributeSource, + FromAttribute: "bar", + }, + received: 22, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -149,10 +158,13 @@ func TestLogsSampling(t *testing.T) { ib := byte(i) traceID := [16]byte{0, 0, 0, 0, 0, 0, 0, 0, ib, ib, ib, ib, ib, ib, ib, ib} record.SetTraceID(traceID) - // set half of records with a foo attribute + // set half of records with a foo bytes attribute if i%2 == 0 { b := record.Attributes().PutEmptyBytes("foo") b.FromRaw(traceID[:]) + } else { + // set the other half of records with a bar string attribute + record.Attributes().PutStr("bar", string(traceID[:])) } // set a fourth of records with a priority attribute if i%4 == 0 { @@ -170,3 +182,24 @@ func TestLogsSampling(t *testing.T) { }) } } + +func TestLogsSamplingInvalidType(t *testing.T) { + sink := new(consumertest.LogsSink) + cfg := &Config{ + SamplingPercentage: 100, + AttributeSource: recordAttributeSource, + FromAttribute: "bool_attr", + } + processor, err := newLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), sink, cfg) + require.NoError(t, err) + logs := plog.NewLogs() + lr := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + record := lr.AppendEmpty() + record.Attributes().PutBool("bool_attr", true) + + // When processing a log event with an unsupported type, the event is not processed. + err = processor.ConsumeLogs(context.Background(), logs) + require.NoError(t, err) + sunk := sink.AllLogs() + assert.Equal(t, 0, len(sunk)) +}