From bf0a3f4ca94a86475f92d8a82a1831f4dbcabb91 Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Wed, 18 Oct 2023 10:24:59 -0700 Subject: [PATCH 01/14] [chore] Send readonly data to immutable exporters in lifecycle tests (#27825) This should help to catch exporters that are incorrectly claimed as not mutating. --- cmd/otelcontribcol/exporters_test.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/cmd/otelcontribcol/exporters_test.go b/cmd/otelcontribcol/exporters_test.go index 8ab6de82c640..949e7e41048d 100644 --- a/cmd/otelcontribcol/exporters_test.go +++ b/cmd/otelcontribcol/exporters_test.go @@ -606,11 +606,23 @@ func verifyExporterLifecycle(t *testing.T, factory exporter.Factory, getConfigFn assert.NotPanics(t, func() { switch e := exp.(type) { case exporter.Logs: - err = e.ConsumeLogs(ctx, testdata.GenerateLogsManyLogRecordsSameResource(2)) + logs := testdata.GenerateLogsManyLogRecordsSameResource(2) + if !e.Capabilities().MutatesData { + logs.MarkReadOnly() + } + err = e.ConsumeLogs(ctx, logs) case exporter.Metrics: - err = e.ConsumeMetrics(ctx, testdata.GenerateMetricsTwoMetrics()) + metrics := testdata.GenerateMetricsTwoMetrics() + if !e.Capabilities().MutatesData { + metrics.MarkReadOnly() + } + err = e.ConsumeMetrics(ctx, metrics) case exporter.Traces: - err = e.ConsumeTraces(ctx, testdata.GenerateTracesTwoSpansSameResource()) + traces := testdata.GenerateTracesTwoSpansSameResource() + if !e.Capabilities().MutatesData { + traces.MarkReadOnly() + } + err = e.ConsumeTraces(ctx, traces) } }) if !expectErr { From c2f343b392058e8bdc86cd191451bac994e69dfb Mon Sep 17 00:00:00 2001 From: hovavza <147598197+hovavza@users.noreply.github.com> Date: Wed, 18 Oct 2023 20:55:22 +0300 Subject: [PATCH 02/14] UDP input operator - async mode - separate between readers & processors (#27805) **Description:** adding a feature - when async mode is enabled in the UDP receiver (udp input operator), separating reading from processing operations. This is important to reduce data-loss in high scale UDP scenarios. See original issue for more details. The async config block is changed now. Instead of readers field (determining the concurrency level of how many threads the udp receiver is running, all reading from the UDP port, processing, and sending downstream), it will now have 2 fields: - readers - determines the concurrency level of threads only reading from UDP port and pushing the packets to a channel. - processors - determines the concurrency level of threads reading from the channel, processing the packets, and sending downstream. - max_queue_length - determines the max size of the channel between the readers & the processors. Setting it high enough, allows to prevent data-loss in cases of downstream temporary latency. Once channel is full, the readers thread will stop until there's room in the queue (so to prevent unlimited memory usage). This improves performance and reduces UDP packet loss in high-scale scenarios. Note that async mode only supports this separation of readers from processors. If async config block isn't included, the default state **Link to tracking Issue:** 27613 **Testing:** Local stress tests ran all types of async config (no 'async', with 'async', etc.). Updating existing udp test accordingly. Also, ran scale tests and saw improvement in data-loss. **Documentation:** Updated md file for both udplogreceiver & stanza udp_input operator with the new flags. --------- Co-authored-by: Daniel Jaglowski --- ...er-to-reader-and-processor-with-async.yaml | 27 +++ pkg/stanza/docs/operators/udp_input.md | 6 +- pkg/stanza/operator/input/udp/config_test.go | 7 +- .../operator/input/udp/testdata/config.yaml | 4 +- pkg/stanza/operator/input/udp/udp.go | 156 +++++++++++++----- pkg/stanza/operator/input/udp/udp_test.go | 7 +- receiver/udplogreceiver/README.md | 6 +- receiver/udplogreceiver/udp_test.go | 7 +- 8 files changed, 171 insertions(+), 49 deletions(-) create mode 100644 .chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml diff --git a/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml b/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml new file mode 100644 index 000000000000..bd33138b4325 --- /dev/null +++ b/.chloggen/separate-udp-receiver-to-reader-and-processor-with-async.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: When async is enabled for udp receiver, separate logic into readers (only read logs from udp port and push to channel), and processors (read logs from channel and process; decode, split, add attributes, and push downstream), allowing to change concurrency level for both readers and processors separately. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27613] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/pkg/stanza/docs/operators/udp_input.md b/pkg/stanza/docs/operators/udp_input.md index 2a08d16716bb..555ddc97faba 100644 --- a/pkg/stanza/docs/operators/udp_input.md +++ b/pkg/stanza/docs/operators/udp_input.md @@ -50,11 +50,13 @@ for other encodings available. If set, the `async` configuration block instructs the `udp_input` operator to read and process logs asynchronsouly and concurrently. -**note** If `async` is not set at all, a single thread will read lines synchronously. +**note** If `async` is not set at all, a single thread will read & process lines synchronously. | Field | Default | Description | | --- | --- | --- | -| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). | +| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port and push to channel (to be handled by processors). | +| `processors` | 1 | Concurrency level - Determines how many go routines read from channel (pushed by readers) and process logs before sending downstream. | +| `max_queue_length` | 100 | Determines max number of messages which may be waiting for a processor. While the queue is full, the readers will wait until there's room (readers will not drop messages, but they will not read additional incoming messages during that period). | ### Example Configurations diff --git a/pkg/stanza/operator/input/udp/config_test.go b/pkg/stanza/operator/input/udp/config_test.go index 7b806ed61985..a65f8ea6cc3d 100644 --- a/pkg/stanza/operator/input/udp/config_test.go +++ b/pkg/stanza/operator/input/udp/config_test.go @@ -43,8 +43,11 @@ func TestUnmarshal(t *testing.T) { cfg.Encoding = "utf-8" cfg.SplitConfig.LineStartPattern = "ABC" cfg.SplitConfig.LineEndPattern = "" - cfg.AsyncConfig = NewAsyncConfig() - cfg.AsyncConfig.Readers = 2 + cfg.AsyncConfig = &AsyncConfig{ + Readers: 2, + Processors: 2, + MaxQueueLength: 100, + } return cfg }(), }, diff --git a/pkg/stanza/operator/input/udp/testdata/config.yaml b/pkg/stanza/operator/input/udp/testdata/config.yaml index 4acbf3621df8..4353dd894ee4 100644 --- a/pkg/stanza/operator/input/udp/testdata/config.yaml +++ b/pkg/stanza/operator/input/udp/testdata/config.yaml @@ -17,4 +17,6 @@ all_with_async: line_start_pattern: ABC line_end_pattern: "" async: - readers: 2 \ No newline at end of file + readers: 2 + processors: 2 + max_queue_length: 100 diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index cc2b69952df3..4593f9ead5e5 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -27,6 +27,10 @@ const ( // Maximum UDP packet size MaxUDPSize = 64 * 1024 + + defaultReaders = 1 + defaultProcessors = 1 + defaultMaxQueueLength = 100 ) func init() { @@ -59,14 +63,9 @@ type Config struct { } type AsyncConfig struct { - Readers int `mapstructure:"readers,omitempty"` -} - -// NewAsyncConfig creates a new AsyncConfig with default values. -func NewAsyncConfig() *AsyncConfig { - return &AsyncConfig{ - Readers: 1, - } + Readers int `mapstructure:"readers,omitempty"` + Processors int `mapstructure:"processors,omitempty"` + MaxQueueLength int `mapstructure:"max_queue_length,omitempty"` } // BaseConfig is the details configuration of a udp input operator. @@ -113,12 +112,16 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { resolver = helper.NewIPResolver() } - if c.AsyncConfig == nil { - c.AsyncConfig = NewAsyncConfig() - } - - if c.AsyncConfig.Readers <= 0 { - return nil, fmt.Errorf("async readers must be greater than 0") + if c.AsyncConfig != nil { + if c.AsyncConfig.Readers <= 0 { + c.AsyncConfig.Readers = defaultReaders + } + if c.AsyncConfig.Processors <= 0 { + c.AsyncConfig.Processors = defaultProcessors + } + if c.AsyncConfig.MaxQueueLength <= 0 { + c.AsyncConfig.MaxQueueLength = defaultMaxQueueLength + } } udpInput := &Input{ @@ -132,6 +135,10 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { OneLogPerPacket: c.OneLogPerPacket, AsyncConfig: c.AsyncConfig, } + + if c.AsyncConfig != nil { + udpInput.messageQueue = make(chan messageAndAddress, c.AsyncConfig.MaxQueueLength) + } return udpInput, nil } @@ -151,6 +158,14 @@ type Input struct { encoding encoding.Encoding splitFunc bufio.SplitFunc resolver *helper.IPResolver + + messageQueue chan messageAndAddress + stopOnce sync.Once +} + +type messageAndAddress struct { + Message []byte + RemoteAddr net.Addr } // Start will start listening for messages on a socket. @@ -170,9 +185,20 @@ func (u *Input) Start(_ operator.Persister) error { // goHandleMessages will handle messages from a udp connection. func (u *Input) goHandleMessages(ctx context.Context) { - for i := 0; i < u.AsyncConfig.Readers; i++ { + if u.AsyncConfig == nil { u.wg.Add(1) go u.readAndProcessMessages(ctx) + return + } + + for i := 0; i < u.AsyncConfig.Readers; i++ { + u.wg.Add(1) + go u.readMessagesAsync(ctx) + } + + for i := 0; i < u.AsyncConfig.Processors; i++ { + u.wg.Add(1) + go u.processMessagesAsync(ctx) } } @@ -193,23 +219,69 @@ func (u *Input) readAndProcessMessages(ctx context.Context) { break } - if u.OneLogPerPacket { - log := truncateMaxLog(message) - u.handleMessage(ctx, remoteAddr, dec, log) - continue - } + u.processMessage(ctx, message, remoteAddr, dec, buf) + } +} - scanner := bufio.NewScanner(bytes.NewReader(message)) - scanner.Buffer(buf, MaxUDPSize) +func (u *Input) processMessage(ctx context.Context, message []byte, remoteAddr net.Addr, dec *decode.Decoder, buf []byte) { + if u.OneLogPerPacket { + log := truncateMaxLog(message) + u.handleMessage(ctx, remoteAddr, dec, log) + return + } + + scanner := bufio.NewScanner(bytes.NewReader(message)) + scanner.Buffer(buf, MaxUDPSize) + + scanner.Split(u.splitFunc) - scanner.Split(u.splitFunc) + for scanner.Scan() { + u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes()) + } + if err := scanner.Err(); err != nil { + u.Errorw("Scanner error", zap.Error(err)) + } +} - for scanner.Scan() { - u.handleMessage(ctx, remoteAddr, dec, scanner.Bytes()) +func (u *Input) readMessagesAsync(ctx context.Context) { + defer u.wg.Done() + + for { + message, remoteAddr, err := u.readMessage() + if err != nil { + select { + case <-ctx.Done(): + return + default: + u.Errorw("Failed reading messages", zap.Error(err)) + } + break } - if err := scanner.Err(); err != nil { - u.Errorw("Scanner error", zap.Error(err)) + + messageAndAddr := messageAndAddress{ + Message: message, + RemoteAddr: remoteAddr, } + + // Send the message to the message queue for processing + u.messageQueue <- messageAndAddr + } +} + +func (u *Input) processMessagesAsync(ctx context.Context) { + defer u.wg.Done() + + dec := decode.New(u.encoding) + buf := make([]byte, 0, MaxUDPSize) + + for { + // Read a message from the message queue. + messageAndAddr, ok := <-u.messageQueue + if !ok { + return // Channel closed, exit the goroutine. + } + + u.processMessage(ctx, messageAndAddr.Message, messageAndAddr.RemoteAddr, dec, buf) } } @@ -274,18 +346,24 @@ func (u *Input) readMessage() ([]byte, net.Addr, error) { // Stop will stop listening for udp messages. func (u *Input) Stop() error { - if u.cancel == nil { - return nil - } - u.cancel() - if u.connection != nil { - if err := u.connection.Close(); err != nil { - u.Errorf("failed to close UDP connection: %s", err) + u.stopOnce.Do(func() { + if u.AsyncConfig != nil { + close(u.messageQueue) } - } - u.wg.Wait() - if u.resolver != nil { - u.resolver.Stop() - } + + if u.cancel == nil { + return + } + u.cancel() + if u.connection != nil { + if err := u.connection.Close(); err != nil { + u.Errorf("failed to close UDP connection: %s", err) + } + } + u.wg.Wait() + if u.resolver != nil { + u.resolver.Stop() + } + }) return nil } diff --git a/pkg/stanza/operator/input/udp/udp_test.go b/pkg/stanza/operator/input/udp/udp_test.go index aba8eeb28e59..7695b734af90 100644 --- a/pkg/stanza/operator/input/udp/udp_test.go +++ b/pkg/stanza/operator/input/udp/udp_test.go @@ -143,8 +143,11 @@ func TestInput(t *testing.T) { t.Run("TrailingCRNewlines", udpInputTest([]byte("message1\r\n"), []string{"message1"}, cfg)) t.Run("NewlineInMessage", udpInputTest([]byte("message1\nmessage2\n"), []string{"message1\nmessage2"}, cfg)) - cfg.AsyncConfig = NewAsyncConfig() - cfg.AsyncConfig.Readers = 2 + cfg.AsyncConfig = &AsyncConfig{ + Readers: 2, + Processors: 2, + MaxQueueLength: 100, + } t.Run("SimpleAsync", udpInputTest([]byte("message1"), []string{"message1"}, cfg)) } diff --git a/receiver/udplogreceiver/README.md b/receiver/udplogreceiver/README.md index 4d6c9ed85751..8be89e306df1 100644 --- a/receiver/udplogreceiver/README.md +++ b/receiver/udplogreceiver/README.md @@ -24,7 +24,7 @@ Receives logs over UDP. | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | | `one_log_per_packet` | false | Skip log tokenization, set to true if logs contains one log per record and multiline is not used. This will improve performance. | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource | -| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][hhttps://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes] | +| `add_attributes` | false | Adds `net.*` attributes according to [semantic convention][https://github.com/open-telemetry/semantic-conventions/blob/cee22ec91448808ebcfa53df689c800c7171c9e1/docs/general/attributes.md#other-network-attributes] | | `multiline` | | A `multiline` configuration block. See below for details | | `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options | | `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details | @@ -78,7 +78,9 @@ If set, the `async` configuration block instructs the `udp_input` operator to re | Field | Default | Description | | --- | --- | --- | -| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port (and process logs before sending downstream). | +| `readers` | 1 | Concurrency level - Determines how many go routines read from UDP port and push to channel (to be handled by processors). | +| `processors` | 1 | Concurrency level - Determines how many go routines read from channel (pushed by readers) and process logs before sending downstream. | +| `max_queue_length` | 100 | Determines max length of channel being used by async reader routines. When channel reaches max number, reader routine will block until channel has room. | ## Example Configurations diff --git a/receiver/udplogreceiver/udp_test.go b/receiver/udplogreceiver/udp_test.go index b3cbe39d7b58..e632100a127f 100644 --- a/receiver/udplogreceiver/udp_test.go +++ b/receiver/udplogreceiver/udp_test.go @@ -32,7 +32,12 @@ func TestUdp(t *testing.T) { func TestUdpAsync(t *testing.T) { listenAddress := "127.0.0.1:29019" cfg := testdataConfigYaml(listenAddress) - cfg.InputConfig.AsyncConfig = udp.NewAsyncConfig() + cfg.InputConfig.AsyncConfig = &udp.AsyncConfig{ + Readers: 2, + Processors: 2, + MaxQueueLength: 100, + } + cfg.InputConfig.AsyncConfig.Readers = 2 testUDP(t, testdataConfigYaml(listenAddress), listenAddress) } From aa587035968e0a487242d7e16170f937d617a184 Mon Sep 17 00:00:00 2001 From: Yang Song Date: Wed, 18 Oct 2023 14:18:17 -0400 Subject: [PATCH 03/14] [connector/datadog] Allow export to traces pipelines --- connector/datadogconnector/README.md | 3 ++- connector/datadogconnector/connector.go | 15 +++++++++++---- connector/datadogconnector/connector_test.go | 10 ++++++++-- connector/datadogconnector/factory.go | 13 +++++++++++-- .../internal/metadata/generated_status.go | 1 + connector/datadogconnector/metadata.yaml | 2 +- 6 files changed, 34 insertions(+), 10 deletions(-) diff --git a/connector/datadogconnector/README.md b/connector/datadogconnector/README.md index ba6c82f892a7..457382cd830c 100644 --- a/connector/datadogconnector/README.md +++ b/connector/datadogconnector/README.md @@ -15,6 +15,7 @@ | [Exporter Pipeline Type] | [Receiver Pipeline Type] | [Stability Level] | | ------------------------ | ------------------------ | ----------------- | | traces | metrics | [beta] | +| traces | traces | [beta] | [Exporter Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type [Receiver Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type @@ -91,7 +92,7 @@ service: exporters: [datadog/connector] traces/2: # this pipeline uses sampling - receivers: [otlp] + receivers: [datadog/connector] processors: [batch, probabilistic_sampler] exporters: [datadog] diff --git a/connector/datadogconnector/connector.go b/connector/datadogconnector/connector.go index 1c089f254490..39f506e72ebd 100644 --- a/connector/datadogconnector/connector.go +++ b/connector/datadogconnector/connector.go @@ -18,7 +18,8 @@ import ( // connectorImp is the schema for connector type connectorImp struct { - metricsConsumer consumer.Metrics // the next component in the pipeline to ingest data after connector + metricsConsumer consumer.Metrics // the next component in the pipeline to ingest metrics after connector + tracesConsumer consumer.Traces // the next component in the pipeline to ingest traces after connector logger *zap.Logger // agent specifies the agent used to ingest traces and output APM Stats. @@ -40,7 +41,7 @@ type connectorImp struct { var _ component.Component = (*connectorImp)(nil) // testing that the connectorImp properly implements the type Component interface // function to create a new connector -func newConnector(logger *zap.Logger, _ component.Config, nextConsumer consumer.Metrics) (*connectorImp, error) { +func newConnector(logger *zap.Logger, _ component.Config, metricsConsumer consumer.Metrics, tracesConsumer consumer.Traces) (*connectorImp, error) { logger.Info("Building datadog connector") in := make(chan *pb.StatsPayload, 100) @@ -55,7 +56,8 @@ func newConnector(logger *zap.Logger, _ component.Config, nextConsumer consumer. agent: datadog.NewAgent(ctx, in), translator: trans, in: in, - metricsConsumer: nextConsumer, + metricsConsumer: metricsConsumer, + tracesConsumer: tracesConsumer, exit: make(chan struct{}), }, nil } @@ -64,7 +66,9 @@ func newConnector(logger *zap.Logger, _ component.Config, nextConsumer consumer. func (c *connectorImp) Start(_ context.Context, _ component.Host) error { c.logger.Info("Starting datadogconnector") c.agent.Start() - go c.run() + if c.metricsConsumer != nil { + go c.run() + } return nil } @@ -85,6 +89,9 @@ func (c *connectorImp) Capabilities() consumer.Capabilities { func (c *connectorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { c.agent.Ingest(ctx, traces) + if c.tracesConsumer != nil { + c.tracesConsumer.ConsumeTraces(ctx, traces) + } return nil } diff --git a/connector/datadogconnector/connector_test.go b/connector/datadogconnector/connector_test.go index 7f016b6776a4..d5410720be71 100644 --- a/connector/datadogconnector/connector_test.go +++ b/connector/datadogconnector/connector_test.go @@ -23,9 +23,15 @@ func TestNewConnector(t *testing.T) { creationParams := connectortest.NewNopCreateSettings() cfg := factory.CreateDefaultConfig().(*Config) - traceConnector, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, consumertest.NewNop()) + traceToMetricsConnector, err := factory.CreateTracesToMetrics(context.Background(), creationParams, cfg, consumertest.NewNop()) assert.NoError(t, err) - _, ok := traceConnector.(*connectorImp) + _, ok := traceToMetricsConnector.(*connectorImp) + assert.True(t, ok) // checks if the created connector implements the connectorImp struct + + traceToTracesConnector, err := factory.CreateTracesToTraces(context.Background(), creationParams, cfg, consumertest.NewNop()) + assert.NoError(t, err) + + _, ok = traceToTracesConnector.(*connectorImp) assert.True(t, ok) // checks if the created connector implements the connectorImp struct } diff --git a/connector/datadogconnector/factory.go b/connector/datadogconnector/factory.go index fc61ebfeff1f..5d1d6673426c 100644 --- a/connector/datadogconnector/factory.go +++ b/connector/datadogconnector/factory.go @@ -21,7 +21,8 @@ func NewFactory() connector.Factory { return connector.NewFactory( metadata.Type, createDefaultConfig, - connector.WithTracesToMetrics(createTracesToMetricsConnector, metadata.TracesToMetricsStability)) + connector.WithTracesToMetrics(createTracesToMetricsConnector, metadata.TracesToMetricsStability), + connector.WithTracesToTraces(createTracesToTracesConnector, metadata.TracesToTracesStability)) } var _ component.Config = (*Config)(nil) @@ -35,7 +36,15 @@ func createDefaultConfig() component.Config { // defines the consumer type of the connector // we want to consume traces and export metrics therefore define nextConsumer as metrics, consumer is the next component in the pipeline func createTracesToMetricsConnector(_ context.Context, params connector.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) { - c, err := newConnector(params.Logger, cfg, nextConsumer) + c, err := newConnector(params.Logger, cfg, nextConsumer, nil) + if err != nil { + return nil, err + } + return c, nil +} + +func createTracesToTracesConnector(_ context.Context, params connector.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (connector.Traces, error) { + c, err := newConnector(params.Logger, cfg, nil, nextConsumer) if err != nil { return nil, err } diff --git a/connector/datadogconnector/internal/metadata/generated_status.go b/connector/datadogconnector/internal/metadata/generated_status.go index 6ff2f2e5baa3..1e601630c741 100644 --- a/connector/datadogconnector/internal/metadata/generated_status.go +++ b/connector/datadogconnector/internal/metadata/generated_status.go @@ -9,4 +9,5 @@ import ( const ( Type = "datadog" TracesToMetricsStability = component.StabilityLevelBeta + TracesToTracesStability = component.StabilityLevelBeta ) diff --git a/connector/datadogconnector/metadata.yaml b/connector/datadogconnector/metadata.yaml index 9de47b0382df..f6b1f32221ef 100644 --- a/connector/datadogconnector/metadata.yaml +++ b/connector/datadogconnector/metadata.yaml @@ -3,7 +3,7 @@ type: datadog status: class: connector stability: - beta: [traces_to_metrics] + beta: [traces_to_metrics, traces_to_traces] distributions: [contrib] codeowners: active: [mx-psi, gbbr, dineshg13] From 7dcc6c0972570d98786ed9fefc55c543b472a7b8 Mon Sep 17 00:00:00 2001 From: Yang Song Date: Wed, 18 Oct 2023 14:22:07 -0400 Subject: [PATCH 04/14] Add changelog --- .chloggen/datadog-connector-traces.yaml | 27 +++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/datadog-connector-traces.yaml diff --git a/.chloggen/datadog-connector-traces.yaml b/.chloggen/datadog-connector-traces.yaml new file mode 100644 index 000000000000..808b388eb57a --- /dev/null +++ b/.chloggen/datadog-connector-traces.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: datadogconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow datadogconnector to be used as a traces-to-traces connector + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27846] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] From 8715a3c3ecc8fde0d01f803ea5b9ecef4813cc7c Mon Sep 17 00:00:00 2001 From: Yang Song Date: Wed, 18 Oct 2023 14:30:04 -0400 Subject: [PATCH 05/14] Fix lint --- connector/datadogconnector/connector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/datadogconnector/connector.go b/connector/datadogconnector/connector.go index 39f506e72ebd..582e45712825 100644 --- a/connector/datadogconnector/connector.go +++ b/connector/datadogconnector/connector.go @@ -90,7 +90,7 @@ func (c *connectorImp) Capabilities() consumer.Capabilities { func (c *connectorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error { c.agent.Ingest(ctx, traces) if c.tracesConsumer != nil { - c.tracesConsumer.ConsumeTraces(ctx, traces) + return c.tracesConsumer.ConsumeTraces(ctx, traces) } return nil } From c44ad3c81b7670b19edae3abef8a9416f04eecd7 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Wed, 18 Oct 2023 17:11:00 -0400 Subject: [PATCH 06/14] [receiver/filelog] Implement specifying top n files to track when ordering (#27844) **Description:** * Add a new `ordering_criteria.top_n` option, which allows a user to specify the number of files to track after ordering. * Default is 1, which was the existing behavior. **Link to tracking Issue:** #23788 **Testing:** Unit tests added. **Documentation:** Added new parameter to existing documentation. --- .chloggen/feat_top_n_file_sorting.yaml | 22 ++++ pkg/stanza/fileconsumer/config_test.go | 10 ++ pkg/stanza/fileconsumer/matcher/matcher.go | 23 +++- .../fileconsumer/matcher/matcher_test.go | 114 ++++++++++++++++++ pkg/stanza/fileconsumer/testdata/config.yaml | 4 + receiver/filelogreceiver/README.md | 1 + 6 files changed, 171 insertions(+), 3 deletions(-) create mode 100755 .chloggen/feat_top_n_file_sorting.yaml diff --git a/.chloggen/feat_top_n_file_sorting.yaml b/.chloggen/feat_top_n_file_sorting.yaml new file mode 100755 index 000000000000..1a4e678bae36 --- /dev/null +++ b/.chloggen/feat_top_n_file_sorting.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add a new "top_n" option to specify the number of files to track when using ordering criteria + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23788] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: ["user"] diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index 9d83118aa4bd..43171be5c96c 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -412,6 +412,16 @@ func TestUnmarshal(t *testing.T) { return newMockOperatorConfig(cfg) }(), }, + { + Name: "ordering_criteria_top_n", + Expect: func() *mockOperatorConfig { + cfg := NewConfig() + cfg.OrderingCriteria = matcher.OrderingCriteria{ + TopN: 10, + } + return newMockOperatorConfig(cfg) + }(), + }, }, }.Run(t) } diff --git a/pkg/stanza/fileconsumer/matcher/matcher.go b/pkg/stanza/fileconsumer/matcher/matcher.go index 0a7a0628edac..76cdd1bd4feb 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher.go +++ b/pkg/stanza/fileconsumer/matcher/matcher.go @@ -18,6 +18,10 @@ const ( sortTypeAlphabetical = "alphabetical" ) +const ( + defaultOrderingCriteriaTopN = 1 +) + type Criteria struct { Include []string `mapstructure:"include,omitempty"` Exclude []string `mapstructure:"exclude,omitempty"` @@ -26,6 +30,7 @@ type Criteria struct { type OrderingCriteria struct { Regex string `mapstructure:"regex,omitempty"` + TopN int `mapstructure:"top_n,omitempty"` SortBy []Sort `mapstructure:"sort_by,omitempty"` } @@ -62,6 +67,14 @@ func New(c Criteria) (*Matcher, error) { return nil, fmt.Errorf("'regex' must be specified when 'sort_by' is specified") } + if c.OrderingCriteria.TopN < 0 { + return nil, fmt.Errorf("'top_n' must be a positive integer") + } + + if c.OrderingCriteria.TopN == 0 { + c.OrderingCriteria.TopN = defaultOrderingCriteriaTopN + } + regex, err := regexp.Compile(c.OrderingCriteria.Regex) if err != nil { return nil, fmt.Errorf("compile regex: %w", err) @@ -97,6 +110,7 @@ func New(c Criteria) (*Matcher, error) { include: c.Include, exclude: c.Exclude, regex: regex, + topN: c.OrderingCriteria.TopN, filterOpts: filterOpts, }, nil } @@ -105,6 +119,7 @@ type Matcher struct { include []string exclude []string regex *regexp.Regexp + topN int filterOpts []filter.Option } @@ -127,7 +142,9 @@ func (m Matcher) MatchFiles() ([]string, error) { return result, errors.Join(err, errs) } - // Return only the first item. - // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/23788 - return result[:1], errors.Join(err, errs) + if len(result) <= m.topN { + return result, errors.Join(err, errs) + } + + return result[:m.topN], errors.Join(err, errs) } diff --git a/pkg/stanza/fileconsumer/matcher/matcher_test.go b/pkg/stanza/fileconsumer/matcher/matcher_test.go index c838962a4699..1d9de6f17f87 100644 --- a/pkg/stanza/fileconsumer/matcher/matcher_test.go +++ b/pkg/stanza/fileconsumer/matcher/matcher_test.go @@ -98,6 +98,23 @@ func TestNew(t *testing.T) { }, expectedErr: "compile regex: error parsing regexp: missing closing ]: `[a-z`", }, + { + name: "TopN is negative", + criteria: Criteria{ + Include: []string{"*.log"}, + OrderingCriteria: OrderingCriteria{ + Regex: "[a-z]", + TopN: -1, + SortBy: []Sort{ + { + SortType: "numeric", + RegexKey: "key", + }, + }, + }, + }, + expectedErr: "'top_n' must be a positive integer", + }, { name: "SortTypeEmpty", criteria: Criteria{ @@ -249,6 +266,46 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.2023020612.log"}, }, + { + name: "TopN > number of files", + files: []string{"err.2023020611.log", "err.2023020612.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P\d{4}\d{2}\d{2}\d{2}).*log`, + TopN: 3, + SortBy: []Sort{ + { + SortType: sortTypeTimestamp, + RegexKey: "value", + Ascending: false, + Location: "UTC", + Layout: `%Y%m%d%H`, + }, + }, + }, + expected: []string{"err.2023020612.log", "err.2023020611.log"}, + }, + { + name: "TopN == number of files", + files: []string{"err.2023020611.log", "err.2023020612.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P\d{4}\d{2}\d{2}\d{2}).*log`, + TopN: 2, + SortBy: []Sort{ + { + SortType: sortTypeTimestamp, + RegexKey: "value", + Ascending: false, + Location: "UTC", + Layout: `%Y%m%d%H`, + }, + }, + }, + expected: []string{"err.2023020612.log", "err.2023020611.log"}, + }, { name: "Timestamp Sorting Ascending", files: []string{"err.2023020612.log", "err.2023020611.log", "err.2023020609.log", "err.2023020610.log"}, @@ -319,6 +376,24 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.d.log"}, }, + { + name: "Alphabetical Sorting - Top 2", + files: []string{"err.a.log", "err.d.log", "err.b.log", "err.c.log"}, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P[a-zA-Z]+).*log`, + TopN: 2, + SortBy: []Sort{ + { + SortType: sortTypeAlphabetical, + RegexKey: "value", + Ascending: false, + }, + }, + }, + expected: []string{"err.d.log", "err.c.log"}, + }, { name: "Alphabetical Sorting Ascending", files: []string{"err.b.log", "err.a.log", "err.c.log", "err.d.log"}, @@ -336,6 +411,45 @@ func TestMatcher(t *testing.T) { }, expected: []string{"err.a.log"}, }, + { + name: "Multiple Sorting - timestamp priority sort - Top 4", + files: []string{ + "err.b.1.2023020601.log", + "err.b.2.2023020601.log", + "err.a.1.2023020601.log", + "err.a.2.2023020601.log", + "err.b.1.2023020602.log", + "err.a.2.2023020602.log", + "err.b.2.2023020602.log", + "err.a.1.2023020602.log", + }, + include: []string{"err.*.log"}, + exclude: []string{}, + filterCriteria: OrderingCriteria{ + Regex: `err\.(?P[a-zA-Z])\.(?P\d+)\.(?P