diff --git a/.chloggen/awscwl_noisypusher.yaml b/.chloggen/awscwl_noisypusher.yaml new file mode 100755 index 000000000000..7d991bae8f63 --- /dev/null +++ b/.chloggen/awscwl_noisypusher.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awscloudwatchlogsexporter/awsemfexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Reduce noisy logs emitted by CloudWatch Logs Pusher. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27774] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: The Collector logger will now write successful CloudWatch API writes at the Debug level instead of Info level. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/double-converter.yaml b/.chloggen/double-converter.yaml new file mode 100755 index 000000000000..bec559474677 --- /dev/null +++ b/.chloggen/double-converter.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: doubleconverter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Adding a double converter into pkg/ottl" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [22056] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/feat_top_n_file_sorting.yaml b/.chloggen/feat_top_n_file_sorting.yaml new file mode 100755 index 000000000000..1a4e678bae36 --- /dev/null +++ b/.chloggen/feat_top_n_file_sorting.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add a new "top_n" option to specify the number of files to track when using ordering criteria + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23788] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: ["user"] diff --git a/.chloggen/ptracetest-ignore-spanid.yaml b/.chloggen/ptracetest-ignore-spanid.yaml new file mode 100755 index 000000000000..72f99b9c9322 --- /dev/null +++ b/.chloggen/ptracetest-ignore-spanid.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/pdatatest + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "support ignore span ID in span comparisons for ptracetest" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27685] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/ptracetest-ignore-traceid.yaml b/.chloggen/ptracetest-ignore-traceid.yaml new file mode 100755 index 000000000000..4e9b7449b45e --- /dev/null +++ b/.chloggen/ptracetest-ignore-traceid.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/pdatatest + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "support ignore trace ID in span comparisons for ptracetest" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27687] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml b/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml new file mode 100644 index 000000000000..bd33138b4325 --- /dev/null +++ b/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: When async is enabled for udp receiver, separate logic into readers (only read logs from udp port and push to channel), and processors (read logs from channel and process; decode, split, add attributes, and push downstream), allowing to change concurrency level for both readers and processors separately. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27613] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/cmd/otelcontribcol/exporters_test.go b/cmd/otelcontribcol/exporters_test.go index 8ab6de82c640..949e7e41048d 100644 --- a/cmd/otelcontribcol/exporters_test.go +++ b/cmd/otelcontribcol/exporters_test.go @@ -606,11 +606,23 @@ func verifyExporterLifecycle(t *testing.T, factory exporter.Factory, getConfigFn assert.NotPanics(t, func() { switch e := exp.(type) { case exporter.Logs: - err = e.ConsumeLogs(ctx, testdata.GenerateLogsManyLogRecordsSameResource(2)) + logs := testdata.GenerateLogsManyLogRecordsSameResource(2) + if !e.Capabilities().MutatesData { + logs.MarkReadOnly() + } + err = e.ConsumeLogs(ctx, logs) case exporter.Metrics: - err = e.ConsumeMetrics(ctx, testdata.GenerateMetricsTwoMetrics()) + metrics := testdata.GenerateMetricsTwoMetrics() + if !e.Capabilities().MutatesData { + metrics.MarkReadOnly() + } + err = e.ConsumeMetrics(ctx, metrics) case exporter.Traces: - err = e.ConsumeTraces(ctx, testdata.GenerateTracesTwoSpansSameResource()) + traces := testdata.GenerateTracesTwoSpansSameResource() + if !e.Capabilities().MutatesData { + traces.MarkReadOnly() + } + err = e.ConsumeTraces(ctx, traces) } }) if !expectErr { diff --git a/exporter/fileexporter/buffered_writer.go b/exporter/fileexporter/buffered_writer.go index 64ee151fa217..6e50b6519296 100644 --- a/exporter/fileexporter/buffered_writer.go +++ b/exporter/fileexporter/buffered_writer.go @@ -5,9 +5,8 @@ package fileexporter // import "github.com/open-telemetry/opentelemetry-collecto import ( "bufio" + "errors" "io" - - "go.uber.org/multierr" ) // bufferedWriteCloser is intended to use more memory @@ -33,7 +32,7 @@ func (bwc *bufferedWriteCloser) Write(p []byte) (n int, err error) { } func (bwc *bufferedWriteCloser) Close() error { - return multierr.Combine( + return errors.Join( bwc.buffered.Flush(), bwc.wrapped.Close(), ) diff --git a/exporter/fileexporter/buffered_writer_test.go b/exporter/fileexporter/buffered_writer_test.go index a6512258d2fa..60f79a08a76b 100644 --- a/exporter/fileexporter/buffered_writer_test.go +++ b/exporter/fileexporter/buffered_writer_test.go @@ -5,6 +5,7 @@ package fileexporter import ( "bytes" + "errors" "fmt" "io" "os" @@ -13,7 +14,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/multierr" ) const ( @@ -83,7 +83,7 @@ func BenchmarkWriter(b *testing.B) { for i := 0; i < b.N; i++ { _, err = w.Write(payload) } - errBenchmark = multierr.Combine(err, w.Close()) + errBenchmark = errors.Join(err, w.Close()) }) } } diff --git a/exporter/fileexporter/go.mod b/exporter/fileexporter/go.mod index 11be5ffcb8db..6eba04c79ac8 100644 --- a/exporter/fileexporter/go.mod +++ b/exporter/fileexporter/go.mod @@ -12,7 +12,6 @@ require ( go.opentelemetry.io/collector/consumer v0.87.1-0.20231017160804-ec0725874313 go.opentelemetry.io/collector/exporter v0.87.1-0.20231017160804-ec0725874313 go.opentelemetry.io/collector/pdata v1.0.0-rcv0016.0.20231017160804-ec0725874313 - go.uber.org/multierr v1.11.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) @@ -39,6 +38,7 @@ require ( go.opentelemetry.io/otel v1.19.0 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect go.opentelemetry.io/otel/trace v1.19.0 // indirect + go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect diff --git a/internal/aws/cwlogs/pusher.go b/internal/aws/cwlogs/pusher.go index d0f940730278..08ff8f549de2 100644 --- a/internal/aws/cwlogs/pusher.go +++ b/internal/aws/cwlogs/pusher.go @@ -267,7 +267,7 @@ func (p *logPusher) pushEventBatch(req interface{}) error { return err } - p.logger.Info("logpusher: publish log events successfully.", + p.logger.Debug("logpusher: publish log events successfully.", zap.Int("NumOfLogEvents", len(putLogEventsInput.LogEvents)), zap.Float64("LogEventsSize", float64(logEventBatch.byteTotal)/float64(1024)), zap.Int64("Time", time.Since(startTime).Nanoseconds()/int64(time.Millisecond))) diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index 47c7fc7460de..ac5d7dff851f 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -294,6 +294,7 @@ Available Converters: - [ExtractPatterns](#extractpatterns) - [FNV](#fnv) - [Hours](#hours) +- [Double](#double) - [Duration](#duration) - [Int](#int) - [IsMap](#ismap) @@ -365,6 +366,29 @@ Examples: - `ConvertCase(metric.name, "snake")` +### Double + +The `Double` Converter converts an inputted `value` into a double. + +The returned type is float64. + +The input `value` types: +* float64. returns the `value` without changes. +* string. Tries to parse a double from string. If it fails then nil will be returned. +* bool. If `value` is true, then the function will return 1 otherwise 0. +* int64. The function converts the integer to a double. + +If `value` is another type or parsing failed nil is always returned. + +The `value` is either a path expression to a telemetry field to retrieve or a literal. + +Examples: + +- `Double(attributes["http.status_code"])` + + +- `Double("2.0")` + ### Duration `Duration(duration)` diff --git a/pkg/ottl/ottlfuncs/func_double.go b/pkg/ottl/ottlfuncs/func_double.go new file mode 100644 index 000000000000..c454a6363e5d --- /dev/null +++ b/pkg/ottl/ottlfuncs/func_double.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ottlfuncs // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs" + +import ( + "context" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" +) + +type DoubleArguments[K any] struct { + Target ottl.FloatLikeGetter[K] +} + +func NewDoubleFactory[K any]() ottl.Factory[K] { + return ottl.NewFactory("Double", &DoubleArguments[K]{}, createDoubleFunction[K]) +} + +func createDoubleFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[K], error) { + args, ok := oArgs.(*DoubleArguments[K]) + + if !ok { + return nil, fmt.Errorf("DoubleFactory args must be of type *DoubleArguments[K]") + } + + return doubleFunc(args.Target), nil +} + +func doubleFunc[K any](target ottl.FloatLikeGetter[K]) ottl.ExprFunc[K] { + return func(ctx context.Context, tCtx K) (interface{}, error) { + value, err := target.Get(ctx, tCtx) + if err != nil { + return nil, err + } + if value == nil { + return nil, nil + } + return *value, nil + } +} diff --git a/pkg/ottl/ottlfuncs/func_double_test.go b/pkg/ottl/ottlfuncs/func_double_test.go new file mode 100644 index 000000000000..0e81df0e6681 --- /dev/null +++ b/pkg/ottl/ottlfuncs/func_double_test.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ottlfuncs + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" +) + +func Test_Double(t *testing.T) { + tests := []struct { + name string + value interface{} + expected interface{} + err bool + }{ + { + name: "string", + value: "50", + expected: float64(50), + }, + { + name: "empty string", + value: "", + expected: nil, + err: true, + }, + { + name: "not a number string", + value: "test", + expected: nil, + err: true, + }, + { + name: "int64", + value: int64(333), + expected: float64(333), + }, + { + name: "float64", + value: float64(2.7), + expected: float64(2.7), + }, + { + name: "float64 without decimal", + value: float64(55), + expected: float64(55), + }, + { + name: "true", + value: true, + expected: float64(1), + }, + { + name: "false", + value: false, + expected: float64(0), + }, + { + name: "nil", + value: nil, + expected: nil, + }, + { + name: "some struct", + value: struct{}{}, + expected: nil, + err: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + exprFunc := doubleFunc[interface{}](&ottl.StandardFloatLikeGetter[interface{}]{ + + Getter: func(context.Context, interface{}) (interface{}, error) { + return test.value, nil + }, + }) + result, err := exprFunc(nil, nil) + if test.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, test.expected, result) + }) + } +} diff --git a/pkg/ottl/ottlfuncs/functions.go b/pkg/ottl/ottlfuncs/functions.go index 736a49313605..e43507192392 100644 --- a/pkg/ottl/ottlfuncs/functions.go +++ b/pkg/ottl/ottlfuncs/functions.go @@ -36,6 +36,7 @@ func converters[K any]() []ottl.Factory[K] { // Converters NewConcatFactory[K](), NewConvertCaseFactory[K](), + NewDoubleFactory[K](), NewDurationFactory[K](), NewExtractPatternsFactory[K](), NewFnvFactory[K](), diff --git a/pkg/pdatatest/ptracetest/options.go b/pkg/pdatatest/ptracetest/options.go index 57a278ec4dda..3a6886eb0019 100644 --- a/pkg/pdatatest/ptracetest/options.go +++ b/pkg/pdatatest/ptracetest/options.go @@ -129,6 +129,28 @@ func sortSpanSlices(ts ptrace.Traces) { } } +// IgnoreSpanID is a CompareTracesOption that clears SpanID fields on all spans. +func IgnoreSpanID() CompareTracesOption { + return compareTracesOptionFunc(func(expected, actual ptrace.Traces) { + spanID := pcommon.NewSpanIDEmpty() + maskSpanID(expected, spanID) + maskSpanID(actual, spanID) + }) +} + +func maskSpanID(traces ptrace.Traces, spanID pcommon.SpanID) { + for i := 0; i < traces.ResourceSpans().Len(); i++ { + rs := traces.ResourceSpans().At(i) + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + for k := 0; k < ss.Spans().Len(); k++ { + span := ss.Spans().At(k) + span.SetSpanID(spanID) + } + } + } +} + // IgnoreStartTimestamp is a CompareTracesOption that clears StartTimestamp fields on all spans. func IgnoreStartTimestamp() CompareTracesOption { return compareTracesOptionFunc(func(expected, actual ptrace.Traces) { @@ -172,3 +194,25 @@ func maskEndTimestamp(traces ptrace.Traces, ts pcommon.Timestamp) { } } } + +// IgnoreTraceID is a CompareTracesOption that clears TraceID fields on all spans. +func IgnoreTraceID() CompareTracesOption { + return compareTracesOptionFunc(func(expected, actual ptrace.Traces) { + traceID := pcommon.NewTraceIDEmpty() + maskTraceID(expected, traceID) + maskTraceID(actual, traceID) + }) +} + +func maskTraceID(traces ptrace.Traces, traceID pcommon.TraceID) { + for i := 0; i < traces.ResourceSpans().Len(); i++ { + rs := traces.ResourceSpans().At(i) + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + for k := 0; k < ss.Spans().Len(); k++ { + span := ss.Spans().At(k) + span.SetTraceID(traceID) + } + } + } +} diff --git a/pkg/pdatatest/ptracetest/testdata/ignore-spanid/actual.yaml b/pkg/pdatatest/ptracetest/testdata/ignore-spanid/actual.yaml new file mode 100644 index 000000000000..817c4fc938d1 --- /dev/null +++ b/pkg/pdatatest/ptracetest/testdata/ignore-spanid/actual.yaml @@ -0,0 +1,21 @@ +resourceSpans: + - resource: + attributes: + - key: host.name + value: + stringValue: node1 + scopeSpans: + - scope: + name: collector + version: v0.1.0 + spans: + - attributes: + - key: key1 + value: + stringValue: value1 + name: span1 + parentSpanId: "" + spanId: fd0da883bb27cd6b + status: {} + traceId: 8c8b1765a7b0acf0b66aa4623fcb7bd5 + diff --git a/pkg/pdatatest/ptracetest/testdata/ignore-spanid/expected.yaml b/pkg/pdatatest/ptracetest/testdata/ignore-spanid/expected.yaml new file mode 100644 index 000000000000..3b3954599639 --- /dev/null +++ b/pkg/pdatatest/ptracetest/testdata/ignore-spanid/expected.yaml @@ -0,0 +1,19 @@ +resourceSpans: + - resource: + attributes: + - key: host.name + value: + stringValue: node1 + scopeSpans: + - scope: + name: collector + version: v0.1.0 + spans: + - attributes: + - key: key1 + value: + stringValue: value1 + name: span1 + parentSpanId: "" + status: {} + traceId: 8c8b1765a7b0acf0b66aa4623fcb7bd5 diff --git a/pkg/pdatatest/ptracetest/testdata/ignore-traceid/actual.yaml b/pkg/pdatatest/ptracetest/testdata/ignore-traceid/actual.yaml new file mode 100644 index 000000000000..817c4fc938d1 --- /dev/null +++ b/pkg/pdatatest/ptracetest/testdata/ignore-traceid/actual.yaml @@ -0,0 +1,21 @@ +resourceSpans: + - resource: + attributes: + - key: host.name + value: + stringValue: node1 + scopeSpans: + - scope: + name: collector + version: v0.1.0 + spans: + - attributes: + - key: key1 + value: + stringValue: value1 + name: span1 + parentSpanId: "" + spanId: fd0da883bb27cd6b + status: {} + traceId: 8c8b1765a7b0acf0b66aa4623fcb7bd5 + diff --git a/pkg/pdatatest/ptracetest/testdata/ignore-traceid/expected.yaml b/pkg/pdatatest/ptracetest/testdata/ignore-traceid/expected.yaml new file mode 100644 index 000000000000..f01f0307bac4 --- /dev/null +++ b/pkg/pdatatest/ptracetest/testdata/ignore-traceid/expected.yaml @@ -0,0 +1,19 @@ +resourceSpans: + - resource: + attributes: + - key: host.name + value: + stringValue: node1 + scopeSpans: + - scope: + name: collector + version: v0.1.0 + spans: + - attributes: + - key: key1 + value: + stringValue: value1 + name: span1 + parentSpanId: "" + spanId: fd0da883bb27cd6b + status: {} diff --git a/pkg/pdatatest/ptracetest/traces_test.go b/pkg/pdatatest/ptracetest/traces_test.go index f088d8889bbe..ef07930a7311 100644 --- a/pkg/pdatatest/ptracetest/traces_test.go +++ b/pkg/pdatatest/ptracetest/traces_test.go @@ -48,6 +48,16 @@ func TestCompareTraces(t *testing.T) { ), withOptions: nil, }, + { + name: "ignore-spanid", + compareOptions: []CompareTracesOption{ + IgnoreSpanID(), + }, + withoutOptions: multierr.Combine( + errors.New("resource \"map[host.name:node1]\": scope \"collector\": span \"span1\": span ID doesn't match expected: fd0da883bb27cd6b, actual: "), + ), + withOptions: nil, + }, { name: "ignore-start-timestamp", compareOptions: []CompareTracesOption{ @@ -68,6 +78,16 @@ func TestCompareTraces(t *testing.T) { ), withOptions: nil, }, + { + name: "ignore-traceid", + compareOptions: []CompareTracesOption{ + IgnoreTraceID(), + }, + withoutOptions: multierr.Combine( + errors.New("resource \"map[host.name:node1]\": scope \"collector\": span \"span1\": trace ID doesn't match expected: 8c8b1765a7b0acf0b66aa4623fcb7bd5, actual: "), + ), + withOptions: nil, + }, { name: "resourcespans-amount-unequal", withoutOptions: multierr.Combine( diff --git a/pkg/pdatautil/hash.go b/pkg/pdatautil/hash.go index 043e308e9d0a..7900120cd6c3 100644 --- a/pkg/pdatautil/hash.go +++ b/pkg/pdatautil/hash.go @@ -5,7 +5,6 @@ package pdatautil // import "github.com/open-telemetry/opentelemetry-collector-c import ( "encoding/binary" - "hash" "math" "sort" "sync" @@ -28,23 +27,19 @@ var ( valMapSuffix = []byte{'\xfd'} valSlicePrefix = []byte{'\xfe'} valSliceSuffix = []byte{'\xff'} + + emptyHash = [16]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} ) type hashWriter struct { - h hash.Hash - strBuf []byte + byteBuf []byte keysBuf []string - sumHash []byte - numBuf []byte } func newHashWriter() *hashWriter { return &hashWriter{ - h: xxhash.New(), - strBuf: make([]byte, 0, 128), + byteBuf: make([]byte, 0, 512), keysBuf: make([]string, 0, 16), - sumHash: make([]byte, 0, 16), - numBuf: make([]byte, 8), } } @@ -55,10 +50,16 @@ var hashWriterPool = &sync.Pool{ // MapHash return a hash for the provided map. // Maps with the same underlying key/value pairs in different order produce the same deterministic hash value. func MapHash(m pcommon.Map) [16]byte { + if m.Len() == 0 { + return emptyHash + } + hw := hashWriterPool.Get().(*hashWriter) defer hashWriterPool.Put(hw) - hw.h.Reset() + hw.byteBuf = hw.byteBuf[:0] + hw.writeMapHash(m) + return hw.hashSum128() } @@ -66,8 +67,10 @@ func MapHash(m pcommon.Map) [16]byte { func ValueHash(v pcommon.Value) [16]byte { hw := hashWriterPool.Get().(*hashWriter) defer hashWriterPool.Put(hw) - hw.h.Reset() + hw.byteBuf = hw.byteBuf[:0] + hw.writeValueHash(v) + return hw.hashSum128() } @@ -90,10 +93,8 @@ func (hw *hashWriter) writeMapHash(m pcommon.Map) { sort.Strings(workingKeySet) for _, k := range workingKeySet { v, _ := m.Get(k) - hw.strBuf = hw.strBuf[:0] - hw.strBuf = append(hw.strBuf, keyPrefix...) - hw.strBuf = append(hw.strBuf, k...) - hw.h.Write(hw.strBuf) + hw.byteBuf = append(hw.byteBuf, keyPrefix...) + hw.byteBuf = append(hw.byteBuf, k...) hw.writeValueHash(v) } @@ -101,59 +102,54 @@ func (hw *hashWriter) writeMapHash(m pcommon.Map) { hw.keysBuf = hw.keysBuf[:nextIndex] } -func (hw *hashWriter) writeSliceHash(sl pcommon.Slice) { - for i := 0; i < sl.Len(); i++ { - hw.writeValueHash(sl.At(i)) - } -} - func (hw *hashWriter) writeValueHash(v pcommon.Value) { switch v.Type() { case pcommon.ValueTypeStr: - hw.strBuf = hw.strBuf[:0] - hw.strBuf = append(hw.strBuf, valStrPrefix...) - hw.strBuf = append(hw.strBuf, v.Str()...) - hw.h.Write(hw.strBuf) + hw.byteBuf = append(hw.byteBuf, valStrPrefix...) + hw.byteBuf = append(hw.byteBuf, v.Str()...) case pcommon.ValueTypeBool: if v.Bool() { - hw.h.Write(valBoolTrue) + hw.byteBuf = append(hw.byteBuf, valBoolTrue...) } else { - hw.h.Write(valBoolFalse) + hw.byteBuf = append(hw.byteBuf, valBoolFalse...) } case pcommon.ValueTypeInt: - hw.h.Write(valIntPrefix) - binary.LittleEndian.PutUint64(hw.numBuf, uint64(v.Int())) - hw.h.Write(hw.numBuf) + hw.byteBuf = append(hw.byteBuf, valIntPrefix...) + hw.byteBuf = binary.LittleEndian.AppendUint64(hw.byteBuf, uint64(v.Int())) case pcommon.ValueTypeDouble: - hw.h.Write(valDoublePrefix) - binary.LittleEndian.PutUint64(hw.numBuf, math.Float64bits(v.Double())) - hw.h.Write(hw.numBuf) + hw.byteBuf = append(hw.byteBuf, valDoublePrefix...) + hw.byteBuf = binary.LittleEndian.AppendUint64(hw.byteBuf, math.Float64bits(v.Double())) case pcommon.ValueTypeMap: - hw.h.Write(valMapPrefix) + hw.byteBuf = append(hw.byteBuf, valMapPrefix...) hw.writeMapHash(v.Map()) - hw.h.Write(valMapSuffix) + hw.byteBuf = append(hw.byteBuf, valMapSuffix...) case pcommon.ValueTypeSlice: - hw.h.Write(valSlicePrefix) - hw.writeSliceHash(v.Slice()) - hw.h.Write(valSliceSuffix) + sl := v.Slice() + hw.byteBuf = append(hw.byteBuf, valSlicePrefix...) + for i := 0; i < sl.Len(); i++ { + hw.writeValueHash(sl.At(i)) + } + hw.byteBuf = append(hw.byteBuf, valSliceSuffix...) case pcommon.ValueTypeBytes: - hw.h.Write(valBytesPrefix) - hw.h.Write(v.Bytes().AsRaw()) + hw.byteBuf = append(hw.byteBuf, valBytesPrefix...) + hw.byteBuf = append(hw.byteBuf, v.Bytes().AsRaw()...) case pcommon.ValueTypeEmpty: - hw.h.Write(valEmpty) + hw.byteBuf = append(hw.byteBuf, valEmpty...) } } // hashSum128 returns a [16]byte hash sum. func (hw *hashWriter) hashSum128() [16]byte { - b := hw.sumHash[:0] - b = hw.h.Sum(b) + r := [16]byte{} + res := r[:] + + h := xxhash.Sum64(hw.byteBuf) + res = binary.LittleEndian.AppendUint64(res[:0], h) // Append an extra byte to generate another part of the hash sum - _, _ = hw.h.Write(extraByte) - b = hw.h.Sum(b) + hw.byteBuf = append(hw.byteBuf, extraByte...) + h = xxhash.Sum64(hw.byteBuf) + _ = binary.LittleEndian.AppendUint64(res[8:], h) - res := [16]byte{} - copy(res[:], b) - return res + return r } diff --git a/pkg/stanza/docs/operators/udp_input.md b/pkg/stanza/docs/operators/udp_input.md index 2a08d16716bb..555ddc97faba 100644 --- a/pkg/stanza/docs/operators/udp_input.md +++ b/pkg/stanza/docs/operators/udp_input.md @@ -50,11 +50,13 @@ for other encodings available. If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently. -**note** If `async` is not set at all, a single thread will read lines synchronously. +**note** If `async` is not set at all, a single thread will read & process lines synchronously. | Field | Default | Description | | --- | --- | --- | -| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). | +| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port and push to channel (to be handled by processors). | +| `processors` | 1 | Concurrency level - Determines how many go routines read from channel (pushed by readers) and process logs before sending downstream. | +| `max_queue_length` | 100 | Determines max number of messages which may be waiting for a processor. While the queue is full, the readers will wait until there's room (readers will not drop messages, but they will not read additional incoming messages during that period). | ### Example Configurations diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 9d83118aa4bd..43171be5c96c 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -412,6 +412,16 @@ func TestUnmarshal(t *testing.T) { return newMockOperatorConfig(cfg) }(), }, + { + Name: "ordering_criteria_top_n", + Expect: func() *mockOperatorConfig { + cfg := NewConfig() + cfg.OrderingCriteria = matcher.OrderingCriteria{ + TopN: 10, + } + return newMockOperatorConfig(cfg) + }(), + }, }, }.Run(t) } diff --git a/pkg/stanza/fileconsumer/matcher/matcher.go b/pkg/stanza/fileconsumer/matcher/matcher.go index 0a7a0628edac..76cdd1bd4feb 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher.go +++ b/pkg/stanza/fileconsumer/matcher/matcher.go @@ -18,6 +18,10 @@ const ( sortTypeAlphabetical = "alphabetical" ) +const ( + defaultOrderingCriteriaTopN = 1 +) + type Criteria struct { Include []string `mapstructure:"include,omitempty"` Exclude []string `mapstructure:"exclude,omitempty"` @@ -26,6 +30,7 @@ type Criteria struct { type OrderingCriteria struct { Regex string `mapstructure:"regex,omitempty"` + TopN int `mapstructure:"top_n,omitempty"` SortBy []Sort `mapstructure:"sort_by,omitempty"` } @@ -62,6 +67,14 @@ func New(c Criteria) (*Matcher, error) { return nil, fmt.Errorf("'regex' must be specified when 'sort_by' is specified") } + if c.OrderingCriteria.TopN < 0 { + return nil, fmt.Errorf("'top_n' must be a positive integer") + } + + if c.OrderingCriteria.TopN == 0 { + c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN + } + regex, err := regexp.Compile(c.OrderingCriteria.Regex) if err != nil { return nil, fmt.Errorf("compile regex: %w", err) @@ -97,6 +110,7 @@ func New(c Criteria) (*Matcher, error) { include: c.Include, exclude: c.Exclude, regex: regex, + topN: c.OrderingCriteria.TopN, filterOpts: filterOpts, }, nil } @@ -105,6 +119,7 @@ type Matcher struct { include []string exclude []string regex *regexp.Regexp + topN int filterOpts []filter.Option } @@ -127,7 +142,9 @@ func (m Matcher) MatchFiles() ([]string, error) { return result, errors.Join(err, errs) } - // Return only the first item. - // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23788 - return result[:1], errors.Join(err, errs) + if len(result) <= m.topN { + return result, errors.Join(err, errs) + } + + return result[:m.topN], errors.Join(err, errs) } diff --git a/pkg/stanza/fileconsumer/matcher/matcher_test.go b/pkg/stanza/fileconsumer/matcher/matcher_test.go index c838962a4699..1d9de6f17f87 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher_test.go +++ b/pkg/stanza/fileconsumer/matcher/matcher_test.go @@ -98,6 +98,23 @@ func TestNew(t *testing.T) { }, expectedErr: "compile regex: error parsing regexp: missing closing ]: `[a-z`", }, + { + name: "TopN is negative", + criteria: Criteria{ + Include: []string{"*.log"}, + OrderingCriteria: OrderingCriteria{ + Regex: "[a-z]", + TopN: -1, + SortBy: []Sort{ + { + SortType: "numeric", + RegexKey: "key", + }, + }, + }, + }, + expectedErr: "'top_n' must be a positive integer", + }, { name: "SortTypeEmpty", criteria: Criteria{ @@ -249,6 +266,46 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.2023020612.log"}, }, + { + name: "TopN > number of files", + files: []string{"err.2023020611.log", "err.2023020612.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P\d{4}\d{2}\d{2}\d{2}).*log`, + TopN: 3, + SortBy: []Sort{ + { + SortType: sortTypeTimestamp, + RegexKey: "value", + Ascending: false, + Location: "UTC", + Layout: `%Y%m%d%H`, + }, + }, + }, + expected: []string{"err.2023020612.log", "err.2023020611.log"}, + }, + { + name: "TopN == number of files", + files: []string{"err.2023020611.log", "err.2023020612.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P\d{4}\d{2}\d{2}\d{2}).*log`, + TopN: 2, + SortBy: []Sort{ + { + SortType: sortTypeTimestamp, + RegexKey: "value", + Ascending: false, + Location: "UTC", + Layout: `%Y%m%d%H`, + }, + }, + }, + expected: []string{"err.2023020612.log", "err.2023020611.log"}, + }, { name: "Timestamp Sorting Ascending", files: []string{"err.2023020612.log", "err.2023020611.log", "err.2023020609.log", "err.2023020610.log"}, @@ -319,6 +376,24 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.d.log"}, }, + { + name: "Alphabetical Sorting - Top 2", + files: []string{"err.a.log", "err.d.log", "err.b.log", "err.c.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P[a-zA-Z]+).*log`, + TopN: 2, + SortBy: []Sort{ + { + SortType: sortTypeAlphabetical, + RegexKey: "value", + Ascending: false, + }, + }, + }, + expected: []string{"err.d.log", "err.c.log"}, + }, { name: "Alphabetical Sorting Ascending", files: []string{"err.b.log", "err.a.log", "err.c.log", "err.d.log"}, @@ -336,6 +411,45 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.a.log"}, }, + { + name: "Multiple Sorting - timestamp priority sort - Top 4", + files: []string{ + "err.b.1.2023020601.log", + "err.b.2.2023020601.log", + "err.a.1.2023020601.log", + "err.a.2.2023020601.log", + "err.b.1.2023020602.log", + "err.a.2.2023020602.log", + "err.b.2.2023020602.log", + "err.a.1.2023020602.log", + }, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P[a-zA-Z])\.(?P\d+)\.(?P