From bf0a3f4ca94a86475f92d8a82a1831f4dbcabb91 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Wed, 18 Oct 2023 10:24:59 -0700 Subject: [PATCH 01/11] [chore] Send readonly data to immutable exporters in lifecycle tests (#27825) This should help to catch exporters that are incorrectly claimed as not mutating. --- cmd/otelcontribcol/exporters_test.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/cmd/otelcontribcol/exporters_test.go b/cmd/otelcontribcol/exporters_test.go index 8ab6de82c640..949e7e41048d 100644 --- a/cmd/otelcontribcol/exporters_test.go +++ b/cmd/otelcontribcol/exporters_test.go @@ -606,11 +606,23 @@ func verifyExporterLifecycle(t *testing.T, factory exporter.Factory, getConfigFn assert.NotPanics(t, func() { switch e := exp.(type) { case exporter.Logs: - err = e.ConsumeLogs(ctx, testdata.GenerateLogsManyLogRecordsSameResource(2)) + logs := testdata.GenerateLogsManyLogRecordsSameResource(2) + if !e.Capabilities().MutatesData { + logs.MarkReadOnly() + } + err = e.ConsumeLogs(ctx, logs) case exporter.Metrics: - err = e.ConsumeMetrics(ctx, testdata.GenerateMetricsTwoMetrics()) + metrics := testdata.GenerateMetricsTwoMetrics() + if !e.Capabilities().MutatesData { + metrics.MarkReadOnly() + } + err = e.ConsumeMetrics(ctx, metrics) case exporter.Traces: - err = e.ConsumeTraces(ctx, testdata.GenerateTracesTwoSpansSameResource()) + traces := testdata.GenerateTracesTwoSpansSameResource() + if !e.Capabilities().MutatesData { + traces.MarkReadOnly() + } + err = e.ConsumeTraces(ctx, traces) } }) if !expectErr { From c2f343b392058e8bdc86cd191451bac994e69dfb Mon Sep 17 00:00:00 2001 From: hovavza <147598197+hovavza@users.noreply.github.com> Date: Wed, 18 Oct 2023 20:55:22 +0300 Subject: [PATCH 02/11] UDP input operator - async mode - separate between readers & processors (#27805) **Description:** adding a feature - when async mode is enabled in the UDP receiver (udp input operator), separating reading from processing operations. This is important to reduce data-loss in high scale UDP scenarios. See original issue for more details. The async config block is changed now. Instead of readers field (determining the concurrency level of how many threads the udp receiver is running, all reading from the UDP port, processing, and sending downstream), it will now have 2 fields: - readers - determines the concurrency level of threads only reading from UDP port and pushing the packets to a channel. - processors - determines the concurrency level of threads reading from the channel, processing the packets, and sending downstream. - max_queue_length - determines the max size of the channel between the readers & the processors. Setting it high enough, allows to prevent data-loss in cases of downstream temporary latency. Once channel is full, the readers thread will stop until there's room in the queue (so to prevent unlimited memory usage). This improves performance and reduces UDP packet loss in high-scale scenarios. Note that async mode only supports this separation of readers from processors. If async config block isn't included, the default state **Link to tracking Issue:** 27613 **Testing:** Local stress tests ran all types of async config (no 'async', with 'async', etc.). Updating existing udp test accordingly. Also, ran scale tests and saw improvement in data-loss. **Documentation:** Updated md file for both udplogreceiver & stanza udp_input operator with the new flags. --------- Co-authored-by: Daniel Jaglowski --- ...er-to-reader-and-processor-with-async.yaml | 27 +++ pkg/stanza/docs/operators/udp_input.md | 6 +- pkg/stanza/operator/input/udp/config_test.go | 7 +- .../operator/input/udp/testdata/config.yaml | 4 +- pkg/stanza/operator/input/udp/udp.go | 156 +++++++++++++----- pkg/stanza/operator/input/udp/udp_test.go | 7 +- receiver/udplogreceiver/README.md | 6 +- receiver/udplogreceiver/udp_test.go | 7 +- 8 files changed, 171 insertions(+), 49 deletions(-) create mode 100644 .chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml diff --git a/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml b/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml new file mode 100644 index 000000000000..bd33138b4325 --- /dev/null +++ b/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: When async is enabled for udp receiver, separate logic into readers (only read logs from udp port and push to channel), and processors (read logs from channel and process; decode, split, add attributes, and push downstream), allowing to change concurrency level for both readers and processors separately. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27613] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/pkg/stanza/docs/operators/udp_input.md b/pkg/stanza/docs/operators/udp_input.md index 2a08d16716bb..555ddc97faba 100644 --- a/pkg/stanza/docs/operators/udp_input.md +++ b/pkg/stanza/docs/operators/udp_input.md @@ -50,11 +50,13 @@ for other encodings available. If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently. -**note** If `async` is not set at all, a single thread will read lines synchronously. +**note** If `async` is not set at all, a single thread will read & process lines synchronously. | Field | Default | Description | | --- | --- | --- | -| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). | +| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port and push to channel (to be handled by processors). | +| `processors` | 1 | Concurrency level - Determines how many go routines read from channel (pushed by readers) and process logs before sending downstream. | +| `max_queue_length` | 100 | Determines max number of messages which may be waiting for a processor. While the queue is full, the readers will wait until there's room (readers will not drop messages, but they will not read additional incoming messages during that period). | ### Example Configurations diff --git a/pkg/stanza/operator/input/udp/config_test.go b/pkg/stanza/operator/input/udp/config_test.go index 7b806ed61985..a65f8ea6cc3d 100644 --- a/pkg/stanza/operator/input/udp/config_test.go +++ b/pkg/stanza/operator/input/udp/config_test.go @@ -43,8 +43,11 @@ func TestUnmarshal(t *testing.T) { cfg.Encoding = "utf-8" cfg.SplitConfig.LineStartPattern = "ABC" cfg.SplitConfig.LineEndPattern = "" - cfg.AsyncConfig = NewAsyncConfig() - cfg.AsyncConfig.Readers = 2 + cfg.AsyncConfig = &AsyncConfig{ + Readers: 2, + Processors: 2, + MaxQueueLength: 100, + } return cfg }(), }, diff --git a/pkg/stanza/operator/input/udp/testdata/config.yaml b/pkg/stanza/operator/input/udp/testdata/config.yaml index 4acbf3621df8..4353dd894ee4 100644 --- a/pkg/stanza/operator/input/udp/testdata/config.yaml +++ b/pkg/stanza/operator/input/udp/testdata/config.yaml @@ -17,4 +17,6 @@ all_with_async: line_start_pattern: ABC line_end_pattern: "" async: - readers: 2 \ No newline at end of file + readers: 2 + processors: 2 + max_queue_length: 100 diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index cc2b69952df3..4593f9ead5e5 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -27,6 +27,10 @@ const ( // Maximum UDP packet size MaxUDPSize = 64 * 1024 + + defaultReaders = 1 + defaultProcessors = 1 + defaultMaxQueueLength = 100 ) func init() { @@ -59,14 +63,9 @@ type Config struct { } type AsyncConfig struct { - Readers int `mapstructure:"readers,omitempty"` -} - -// NewAsyncConfig creates a new AsyncConfig with default values. -func NewAsyncConfig() *AsyncConfig { - return &AsyncConfig{ - Readers: 1, - } + Readers int `mapstructure:"readers,omitempty"` + Processors int `mapstructure:"processors,omitempty"` + MaxQueueLength int `mapstructure:"max_queue_length,omitempty"` } // BaseConfig is the details configuration of a udp input operator. @@ -113,12 +112,16 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { resolver = helper.NewIPResolver() } - if c.AsyncConfig == nil { - c.AsyncConfig = NewAsyncConfig() - } - - if c.AsyncConfig.Readers <= 0 { - return nil, fmt.Errorf("async readers must be greater than 0") + if c.AsyncConfig != nil { + if c.AsyncConfig.Readers <= 0 { + c.AsyncConfig.Readers = defaultReaders + } + if c.AsyncConfig.Processors <= 0 { + c.AsyncConfig.Processors = defaultProcessors + } + if c.AsyncConfig.MaxQueueLength <= 0 { + c.AsyncConfig.MaxQueueLength = defaultMaxQueueLength + } } udpInput := &Input{ @@ -132,6 +135,10 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { OneLogPerPacket: c.OneLogPerPacket, AsyncConfig: c.AsyncConfig, } + + if c.AsyncConfig != nil { + udpInput.messageQueue = make(chan messageAndAddress, c.AsyncConfig.MaxQueueLength) + } return udpInput, nil } @@ -151,6 +158,14 @@ type Input struct { encoding encoding.Encoding splitFunc bufio.SplitFunc resolver *helper.IPResolver + + messageQueue chan messageAndAddress + stopOnce sync.Once +} + +type messageAndAddress struct { + Message []byte + RemoteAddr net.Addr } // Start will start listening for messages on a socket. @@ -170,9 +185,20 @@ func (u *Input) Start(_ operator.Persister) error { // goHandleMessages will handle messages from a udp connection. func (u *Input) goHandleMessages(ctx context.Context) { - for i := 0; i < u.AsyncConfig.Readers; i++ { + if u.AsyncConfig == nil { u.wg.Add(1) go u.readAndProcessMessages(ctx) + return + } + + for i := 0; i < u.AsyncConfig.Readers; i++ { + u.wg.Add(1) + go u.readMessagesAsync(ctx) + } + + for i := 0; i < u.AsyncConfig.Processors; i++ { + u.wg.Add(1) + go u.processMessagesAsync(ctx) } } @@ -193,23 +219,69 @@ func (u *Input) readAndProcessMessages(ctx context.Context) { break } - if u.OneLogPerPacket { - log := truncateMaxLog(message) - u.handleMessage(ctx, remoteAddr, dec, log) - continue - } + u.processMessage(ctx, message, remoteAddr, dec, buf) + } +} - scanner := bufio.NewScanner(bytes.NewReader(message)) - scanner.Buffer(buf, MaxUDPSize) +func (u *Input) processMessage(ctx context.Context, message []byte, remoteAddr net.Addr, dec *decode.Decoder, buf []byte) { + if u.OneLogPerPacket { + log := truncateMaxLog(message) + u.handleMessage(ctx, remoteAddr, dec, log) + return + } + + scanner := bufio.NewScanner(bytes.NewReader(message)) + scanner.Buffer(buf, MaxUDPSize) + + scanner.Split(u.splitFunc) - scanner.Split(u.splitFunc) + for scanner.Scan() { + u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes()) + } + if err := scanner.Err(); err != nil { + u.Errorw("Scanner error", zap.Error(err)) + } +} - for scanner.Scan() { - u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes()) +func (u *Input) readMessagesAsync(ctx context.Context) { + defer u.wg.Done() + + for { + message, remoteAddr, err := u.readMessage() + if err != nil { + select { + case <-ctx.Done(): + return + default: + u.Errorw("Failed reading messages", zap.Error(err)) + } + break } - if err := scanner.Err(); err != nil { - u.Errorw("Scanner error", zap.Error(err)) + + messageAndAddr := messageAndAddress{ + Message: message, + RemoteAddr: remoteAddr, } + + // Send the message to the message queue for processing + u.messageQueue <- messageAndAddr + } +} + +func (u *Input) processMessagesAsync(ctx context.Context) { + defer u.wg.Done() + + dec := decode.New(u.encoding) + buf := make([]byte, 0, MaxUDPSize) + + for { + // Read a message from the message queue. + messageAndAddr, ok := <-u.messageQueue + if !ok { + return // Channel closed, exit the goroutine. + } + + u.processMessage(ctx, messageAndAddr.Message, messageAndAddr.RemoteAddr, dec, buf) } } @@ -274,18 +346,24 @@ func (u *Input) readMessage() ([]byte, net.Addr, error) { // Stop will stop listening for udp messages. func (u *Input) Stop() error { - if u.cancel == nil { - return nil - } - u.cancel() - if u.connection != nil { - if err := u.connection.Close(); err != nil { - u.Errorf("failed to close UDP connection: %s", err) + u.stopOnce.Do(func() { + if u.AsyncConfig != nil { + close(u.messageQueue) } - } - u.wg.Wait() - if u.resolver != nil { - u.resolver.Stop() - } + + if u.cancel == nil { + return + } + u.cancel() + if u.connection != nil { + if err := u.connection.Close(); err != nil { + u.Errorf("failed to close UDP connection: %s", err) + } + } + u.wg.Wait() + if u.resolver != nil { + u.resolver.Stop() + } + }) return nil } diff --git a/pkg/stanza/operator/input/udp/udp_test.go b/pkg/stanza/operator/input/udp/udp_test.go index aba8eeb28e59..7695b734af90 100644 --- a/pkg/stanza/operator/input/udp/udp_test.go +++ b/pkg/stanza/operator/input/udp/udp_test.go @@ -143,8 +143,11 @@ func TestInput(t *testing.T) { t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}, cfg)) t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}, cfg)) - cfg.AsyncConfig = NewAsyncConfig() - cfg.AsyncConfig.Readers = 2 + cfg.AsyncConfig = &AsyncConfig{ + Readers: 2, + Processors: 2, + MaxQueueLength: 100, + } t.Run("SimpleAsync", udpInputTest([]byte("message1"), []string{"message1"}, cfg)) } diff --git a/receiver/udplogreceiver/README.md b/receiver/udplogreceiver/README.md index 4d6c9ed85751..8be89e306df1 100644 --- a/receiver/udplogreceiver/README.md +++ b/receiver/udplogreceiver/README.md @@ -24,7 +24,7 @@ Receives logs over UDP. | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | | `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource | -| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][hhttps://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes] | +| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes] | | `multiline` | | A `multiline` configuration block. See below for details | | `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options | | `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details | @@ -78,7 +78,9 @@ If set, the `async` configuration block instructs the `udp_input` operator to re | Field | Default | Description | | --- | --- | --- | -| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). | +| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port and push to channel (to be handled by processors). | +| `processors` | 1 | Concurrency level - Determines how many go routines read from channel (pushed by readers) and process logs before sending downstream. | +| `max_queue_length` | 100 | Determines max length of channel being used by async reader routines. When channel reaches max number, reader routine will block until channel has room. | ## Example Configurations diff --git a/receiver/udplogreceiver/udp_test.go b/receiver/udplogreceiver/udp_test.go index b3cbe39d7b58..e632100a127f 100644 --- a/receiver/udplogreceiver/udp_test.go +++ b/receiver/udplogreceiver/udp_test.go @@ -32,7 +32,12 @@ func TestUdp(t *testing.T) { func TestUdpAsync(t *testing.T) { listenAddress := "127.0.0.1:29019" cfg := testdataConfigYaml(listenAddress) - cfg.InputConfig.AsyncConfig = udp.NewAsyncConfig() + cfg.InputConfig.AsyncConfig = &udp.AsyncConfig{ + Readers: 2, + Processors: 2, + MaxQueueLength: 100, + } + cfg.InputConfig.AsyncConfig.Readers = 2 testUDP(t, testdataConfigYaml(listenAddress), listenAddress) } From c44ad3c81b7670b19edae3abef8a9416f04eecd7 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 18 Oct 2023 17:11:00 -0400 Subject: [PATCH 03/11] [receiver/filelog] Implement specifying top n files to track when ordering (#27844) **Description:** * Add a new `ordering_criteria.top_n` option, which allows a user to specify the number of files to track after ordering. * Default is 1, which was the existing behavior. **Link to tracking Issue:** #23788 **Testing:** Unit tests added. **Documentation:** Added new parameter to existing documentation. --- .chloggen/feat_top_n_file_sorting.yaml | 22 ++++ pkg/stanza/fileconsumer/config_test.go | 10 ++ pkg/stanza/fileconsumer/matcher/matcher.go | 23 +++- .../fileconsumer/matcher/matcher_test.go | 114 ++++++++++++++++++ pkg/stanza/fileconsumer/testdata/config.yaml | 4 + receiver/filelogreceiver/README.md | 1 + 6 files changed, 171 insertions(+), 3 deletions(-) create mode 100755 .chloggen/feat_top_n_file_sorting.yaml diff --git a/.chloggen/feat_top_n_file_sorting.yaml b/.chloggen/feat_top_n_file_sorting.yaml new file mode 100755 index 000000000000..1a4e678bae36 --- /dev/null +++ b/.chloggen/feat_top_n_file_sorting.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add a new "top_n" option to specify the number of files to track when using ordering criteria + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23788] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: ["user"] diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 9d83118aa4bd..43171be5c96c 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -412,6 +412,16 @@ func TestUnmarshal(t *testing.T) { return newMockOperatorConfig(cfg) }(), }, + { + Name: "ordering_criteria_top_n", + Expect: func() *mockOperatorConfig { + cfg := NewConfig() + cfg.OrderingCriteria = matcher.OrderingCriteria{ + TopN: 10, + } + return newMockOperatorConfig(cfg) + }(), + }, }, }.Run(t) } diff --git a/pkg/stanza/fileconsumer/matcher/matcher.go b/pkg/stanza/fileconsumer/matcher/matcher.go index 0a7a0628edac..76cdd1bd4feb 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher.go +++ b/pkg/stanza/fileconsumer/matcher/matcher.go @@ -18,6 +18,10 @@ const ( sortTypeAlphabetical = "alphabetical" ) +const ( + defaultOrderingCriteriaTopN = 1 +) + type Criteria struct { Include []string `mapstructure:"include,omitempty"` Exclude []string `mapstructure:"exclude,omitempty"` @@ -26,6 +30,7 @@ type Criteria struct { type OrderingCriteria struct { Regex string `mapstructure:"regex,omitempty"` + TopN int `mapstructure:"top_n,omitempty"` SortBy []Sort `mapstructure:"sort_by,omitempty"` } @@ -62,6 +67,14 @@ func New(c Criteria) (*Matcher, error) { return nil, fmt.Errorf("'regex' must be specified when 'sort_by' is specified") } + if c.OrderingCriteria.TopN < 0 { + return nil, fmt.Errorf("'top_n' must be a positive integer") + } + + if c.OrderingCriteria.TopN == 0 { + c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN + } + regex, err := regexp.Compile(c.OrderingCriteria.Regex) if err != nil { return nil, fmt.Errorf("compile regex: %w", err) @@ -97,6 +110,7 @@ func New(c Criteria) (*Matcher, error) { include: c.Include, exclude: c.Exclude, regex: regex, + topN: c.OrderingCriteria.TopN, filterOpts: filterOpts, }, nil } @@ -105,6 +119,7 @@ type Matcher struct { include []string exclude []string regex *regexp.Regexp + topN int filterOpts []filter.Option } @@ -127,7 +142,9 @@ func (m Matcher) MatchFiles() ([]string, error) { return result, errors.Join(err, errs) } - // Return only the first item. - // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23788 - return result[:1], errors.Join(err, errs) + if len(result) <= m.topN { + return result, errors.Join(err, errs) + } + + return result[:m.topN], errors.Join(err, errs) } diff --git a/pkg/stanza/fileconsumer/matcher/matcher_test.go b/pkg/stanza/fileconsumer/matcher/matcher_test.go index c838962a4699..1d9de6f17f87 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher_test.go +++ b/pkg/stanza/fileconsumer/matcher/matcher_test.go @@ -98,6 +98,23 @@ func TestNew(t *testing.T) { }, expectedErr: "compile regex: error parsing regexp: missing closing ]: `[a-z`", }, + { + name: "TopN is negative", + criteria: Criteria{ + Include: []string{"*.log"}, + OrderingCriteria: OrderingCriteria{ + Regex: "[a-z]", + TopN: -1, + SortBy: []Sort{ + { + SortType: "numeric", + RegexKey: "key", + }, + }, + }, + }, + expectedErr: "'top_n' must be a positive integer", + }, { name: "SortTypeEmpty", criteria: Criteria{ @@ -249,6 +266,46 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.2023020612.log"}, }, + { + name: "TopN > number of files", + files: []string{"err.2023020611.log", "err.2023020612.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P\d{4}\d{2}\d{2}\d{2}).*log`, + TopN: 3, + SortBy: []Sort{ + { + SortType: sortTypeTimestamp, + RegexKey: "value", + Ascending: false, + Location: "UTC", + Layout: `%Y%m%d%H`, + }, + }, + }, + expected: []string{"err.2023020612.log", "err.2023020611.log"}, + }, + { + name: "TopN == number of files", + files: []string{"err.2023020611.log", "err.2023020612.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P\d{4}\d{2}\d{2}\d{2}).*log`, + TopN: 2, + SortBy: []Sort{ + { + SortType: sortTypeTimestamp, + RegexKey: "value", + Ascending: false, + Location: "UTC", + Layout: `%Y%m%d%H`, + }, + }, + }, + expected: []string{"err.2023020612.log", "err.2023020611.log"}, + }, { name: "Timestamp Sorting Ascending", files: []string{"err.2023020612.log", "err.2023020611.log", "err.2023020609.log", "err.2023020610.log"}, @@ -319,6 +376,24 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.d.log"}, }, + { + name: "Alphabetical Sorting - Top 2", + files: []string{"err.a.log", "err.d.log", "err.b.log", "err.c.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P[a-zA-Z]+).*log`, + TopN: 2, + SortBy: []Sort{ + { + SortType: sortTypeAlphabetical, + RegexKey: "value", + Ascending: false, + }, + }, + }, + expected: []string{"err.d.log", "err.c.log"}, + }, { name: "Alphabetical Sorting Ascending", files: []string{"err.b.log", "err.a.log", "err.c.log", "err.d.log"}, @@ -336,6 +411,45 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.a.log"}, }, + { + name: "Multiple Sorting - timestamp priority sort - Top 4", + files: []string{ + "err.b.1.2023020601.log", + "err.b.2.2023020601.log", + "err.a.1.2023020601.log", + "err.a.2.2023020601.log", + "err.b.1.2023020602.log", + "err.a.2.2023020602.log", + "err.b.2.2023020602.log", + "err.a.1.2023020602.log", + }, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P[a-zA-Z])\.(?P\d+)\.(?P