diff --git a/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go index 7e0f51f5096897..9c4fb0d451ed5c 100644 --- a/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go +++ b/comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go @@ -9,6 +9,7 @@ package eventplatformimpl import ( "context" "fmt" + "strconv" "strings" "sync" @@ -27,6 +28,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/client" logshttp "github.com/DataDog/datadog-agent/pkg/logs/client/http" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sender" "github.com/DataDog/datadog-agent/pkg/util/fxutil" "github.com/DataDog/datadog-agent/pkg/util/log" @@ -393,15 +395,18 @@ func newHTTPPassthroughPipeline(coreConfig model.Reader, eventPlatformReceiver e if endpoints.InputChanSize <= pkgconfigsetup.DefaultInputChanSize { endpoints.InputChanSize = desc.defaultInputChanSize } + + pipelineMonitor := metrics.NewNoopPipelineMonitor(strconv.Itoa(pipelineID)) + reliable := []client.Destination{} for i, endpoint := range endpoints.GetReliableEndpoints() { - telemetryName := fmt.Sprintf("%s_%d_reliable_%d", desc.eventType, pipelineID, i) - reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, telemetryName, pkgconfigsetup.Datadog())) + destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "reliable", strconv.Itoa(i)) + reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor)) } additionals := []client.Destination{} for i, endpoint := range endpoints.GetUnReliableEndpoints() { - telemetryName := fmt.Sprintf("%s_%d_unreliable_%d", desc.eventType, pipelineID, i) - additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, telemetryName, pkgconfigsetup.Datadog())) + destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "unreliable", strconv.Itoa(i)) + additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor)) } destinations := client.NewDestinations(reliable, additionals) inputChan := make(chan *message.Message, endpoints.InputChanSize) @@ -426,14 +431,15 @@ func newHTTPPassthroughPipeline(coreConfig model.Reader, eventPlatformReceiver e endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, desc.eventType, - encoder) + encoder, + pipelineMonitor) } a := auditor.NewNullAuditor() log.Debugf("Initialized event platform forwarder pipeline. eventType=%s mainHosts=%s additionalHosts=%s batch_max_concurrent_send=%d batch_max_content_size=%d batch_max_size=%d, input_chan_size=%d", desc.eventType, joinHosts(endpoints.GetReliableEndpoints()), joinHosts(endpoints.GetUnReliableEndpoints()), endpoints.BatchMaxConcurrentSend, endpoints.BatchMaxContentSize, endpoints.BatchMaxSize, endpoints.InputChanSize) return &passthroughPipeline{ - sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil), + sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil, pipelineMonitor), strategy: strategy, in: inputChan, auditor: a, diff --git a/comp/logs/agent/config/constants.go b/comp/logs/agent/config/constants.go index 2fd39527b3b90a..ae9a0d74680f0b 100644 --- a/comp/logs/agent/config/constants.go +++ b/comp/logs/agent/config/constants.go @@ -7,9 +7,7 @@ package config // Pipeline constraints const ( - ChanSize = 100 - DestinationPayloadChanSize = 10 - NumberOfPipelines = 4 + NumberOfPipelines = 4 ) const ( diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 78f93cd122c6ab..a833c2e24694f4 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -1512,6 +1512,9 @@ func logsagent(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("logs_config.dev_mode_use_proto", true) config.BindEnvAndSetDefault("logs_config.dd_url_443", "agent-443-intake.logs.datadoghq.com") config.BindEnvAndSetDefault("logs_config.stop_grace_period", 30) + config.BindEnvAndSetDefault("logs_config.message_channel_size", 100) + config.BindEnvAndSetDefault("logs_config.payload_channel_size", 10) + // maximum time that the unix tailer will hold a log file open after it has been rotated config.BindEnvAndSetDefault("logs_config.close_timeout", 60) // maximum time that the windows tailer will hold a log file open, while waiting for diff --git a/pkg/logs/auditor/auditor.go b/pkg/logs/auditor/auditor.go index cad651a7c7d270..05a196125cf03f 100644 --- a/pkg/logs/auditor/auditor.go +++ b/pkg/logs/auditor/auditor.go @@ -16,7 +16,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/status/health" "github.com/DataDog/datadog-agent/pkg/util/log" - "github.com/DataDog/datadog-agent/comp/logs/agent/config" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/message" ) @@ -104,7 +104,7 @@ func (a *RegistryAuditor) Stop() { func (a *RegistryAuditor) createChannels() { a.chansMutex.Lock() defer a.chansMutex.Unlock() - a.inputChan = make(chan *message.Payload, config.ChanSize) + a.inputChan = make(chan *message.Payload, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) a.done = make(chan struct{}) } diff --git a/pkg/logs/auditor/go.mod b/pkg/logs/auditor/go.mod index 350a008e3e91c5..59d61c086e1eff 100644 --- a/pkg/logs/auditor/go.mod +++ b/pkg/logs/auditor/go.mod @@ -43,6 +43,7 @@ replace ( require ( github.com/DataDog/datadog-agent/comp/logs/agent/config v0.56.0-rc.3 + github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 github.com/DataDog/datadog-agent/pkg/logs/message v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/sources v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/status/health v0.56.0-rc.3 @@ -56,7 +57,6 @@ require ( github.com/DataDog/datadog-agent/pkg/config/env v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/model v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/nodetreemodel v0.0.0-00010101000000-000000000000 // indirect - github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/structure v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/teeconfig v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/utils v0.56.0-rc.3 // indirect diff --git a/pkg/logs/client/destination.go b/pkg/logs/client/destination.go index b1bfa151bff9c3..affb2bc6b76514 100644 --- a/pkg/logs/client/destination.go +++ b/pkg/logs/client/destination.go @@ -6,7 +6,9 @@ //nolint:revive // TODO(AML) Fix revive linter package client -import "github.com/DataDog/datadog-agent/pkg/logs/message" +import ( + "github.com/DataDog/datadog-agent/pkg/logs/message" +) // Destination sends a payload to a specific endpoint over a given network protocol. type Destination interface { @@ -16,6 +18,9 @@ type Destination interface { // Destination target (e.g. https://agent-intake.logs.datadoghq.com) Target() string + // Metadata returns the metadata of the destination + Metadata() *DestinationMetadata + // Start starts the destination send loop. close the intput to stop listening for payloads. stopChan is // signaled when the destination has fully shutdown and all buffered payloads have been flushed. isRetrying is // signaled when the retry state changes. isRetrying can be nil if you don't need to handle retries. diff --git a/pkg/logs/client/destination_metadata.go b/pkg/logs/client/destination_metadata.go new file mode 100644 index 00000000000000..1c4eaa429a559d --- /dev/null +++ b/pkg/logs/client/destination_metadata.go @@ -0,0 +1,54 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//nolint:revive // TODO(AML) Fix revive linter +package client + +import ( + "fmt" +) + +// DestinationMetadata contains metadata about a destination +type DestinationMetadata struct { + componentName string + instanceID string + kind string + endpointId string + ReportingEnabled bool +} + +// NewDestinationMetadata returns a new DestinationMetadata +func NewDestinationMetadata(componentName, instanceID, kind, endpointId string) *DestinationMetadata { + return &DestinationMetadata{ + componentName: componentName, + instanceID: instanceID, + kind: kind, + endpointId: endpointId, + ReportingEnabled: true, + } +} + +// NewNoopDestinationMetadata returns a new DestinationMetadata with reporting disabled +func NewNoopDestinationMetadata() *DestinationMetadata { + return &DestinationMetadata{ + ReportingEnabled: false, + } +} + +// TelemetryName returns the telemetry name for the destination +func (d *DestinationMetadata) TelemetryName() string { + if !d.ReportingEnabled { + return "" + } + return fmt.Sprintf("%s_%s_%s_%s", d.componentName, d.instanceID, d.kind, d.endpointId) +} + +// MonitorTag returns the monitor tag for the destination +func (d *DestinationMetadata) MonitorTag() string { + if !d.ReportingEnabled { + return "" + } + return fmt.Sprintf("destination_%s_%s", d.kind, d.endpointId) +} diff --git a/pkg/logs/client/http/destination.go b/pkg/logs/client/http/destination.go index 954397f9882e7b..a307a64cc49b21 100644 --- a/pkg/logs/client/http/destination.go +++ b/pkg/logs/client/http/destination.go @@ -81,8 +81,10 @@ type Destination struct { lastRetryError error // Telemetry - expVars *expvar.Map - telemetryName string + expVars *expvar.Map + destMeta *client.DestinationMetadata + pipelineMonitor metrics.PipelineMonitor + utilization metrics.UtilizationMonitor } // NewDestination returns a new Destination. @@ -94,8 +96,9 @@ func NewDestination(endpoint config.Endpoint, destinationsContext *client.DestinationsContext, maxConcurrentBackgroundSends int, shouldRetry bool, - telemetryName string, - cfg pkgconfigmodel.Reader) *Destination { + destMeta *client.DestinationMetadata, + cfg pkgconfigmodel.Reader, + pipelineMonitor metrics.PipelineMonitor) *Destination { return newDestination(endpoint, contentType, @@ -103,8 +106,9 @@ func NewDestination(endpoint config.Endpoint, time.Second*10, maxConcurrentBackgroundSends, shouldRetry, - telemetryName, - cfg) + destMeta, + cfg, + pipelineMonitor) } func newDestination(endpoint config.Endpoint, @@ -113,8 +117,9 @@ func newDestination(endpoint config.Endpoint, timeout time.Duration, maxConcurrentBackgroundSends int, shouldRetry bool, - telemetryName string, - cfg pkgconfigmodel.Reader) *Destination { + destMeta *client.DestinationMetadata, + cfg pkgconfigmodel.Reader, + pipelineMonitor metrics.PipelineMonitor) *Destination { if maxConcurrentBackgroundSends <= 0 { maxConcurrentBackgroundSends = 1 @@ -130,8 +135,9 @@ func newDestination(endpoint config.Endpoint, expVars := &expvar.Map{} expVars.AddFloat(expVarIdleMsMapKey, 0) expVars.AddFloat(expVarInUseMsMapKey, 0) - if telemetryName != "" { - metrics.DestinationExpVars.Set(telemetryName, expVars) + + if destMeta.ReportingEnabled { + metrics.DestinationExpVars.Set(destMeta.TelemetryName(), expVars) } return &Destination{ @@ -150,8 +156,10 @@ func newDestination(endpoint config.Endpoint, retryLock: sync.Mutex{}, shouldRetry: shouldRetry, expVars: expVars, - telemetryName: telemetryName, + destMeta: destMeta, isMRF: endpoint.IsMRF, + pipelineMonitor: pipelineMonitor, + utilization: pipelineMonitor.MakeUtilizationMonitor(destMeta.MonitorTag()), } } @@ -175,6 +183,11 @@ func (d *Destination) Target() string { return d.url } +// Metadata returns the metadata of the destination +func (d *Destination) Metadata() *client.DestinationMetadata { + return d.destMeta +} + // Start starts reading the input channel func (d *Destination) Start(input chan *message.Payload, output chan *message.Payload, isRetrying chan bool) (stopChan <-chan struct{}) { stop := make(chan struct{}) @@ -186,17 +199,19 @@ func (d *Destination) run(input chan *message.Payload, output chan *message.Payl var startIdle = time.Now() for p := range input { + d.utilization.Start() idle := float64(time.Since(startIdle) / time.Millisecond) d.expVars.AddFloat(expVarIdleMsMapKey, idle) - tlmIdle.Add(idle, d.telemetryName) + tlmIdle.Add(idle, d.destMeta.TelemetryName()) var startInUse = time.Now() d.sendConcurrent(p, output, isRetrying) inUse := float64(time.Since(startInUse) / time.Millisecond) d.expVars.AddFloat(expVarInUseMsMapKey, inUse) - tlmInUse.Add(inUse, d.telemetryName) + tlmInUse.Add(inUse, d.destMeta.TelemetryName()) startIdle = time.Now() + d.utilization.Stop() } // Wait for any pending concurrent sends to finish or terminate d.wg.Wait() @@ -348,6 +363,7 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) { // internal error. We should retry these requests. return client.NewRetryableError(errServer) } else { + d.pipelineMonitor.ReportComponentEgress(payload, d.destMeta.MonitorTag()) return nil } } @@ -422,7 +438,7 @@ func getMessageTimestamp(messages []*message.Message) int64 { func prepareCheckConnectivity(endpoint config.Endpoint, cfg pkgconfigmodel.Reader) (*client.DestinationsContext, *Destination) { ctx := client.NewDestinationsContext() // Lower the timeout to 5s because HTTP connectivity test is done synchronously during the agent bootstrap sequence - destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, 0, false, "", cfg) + destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, 0, false, client.NewNoopDestinationMetadata(), cfg, metrics.NewNoopPipelineMonitor("")) return ctx, destination } diff --git a/pkg/logs/client/http/destination_test.go b/pkg/logs/client/http/destination_test.go index 085845ff8f2ed3..6adf3e7d3148fd 100644 --- a/pkg/logs/client/http/destination_test.go +++ b/pkg/logs/client/http/destination_test.go @@ -16,6 +16,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/comp/logs/agent/config" configmock "github.com/DataDog/datadog-agent/pkg/config/mock" @@ -360,7 +361,7 @@ func TestDestinationHA(t *testing.T) { } isEndpointMRF := endpoint.IsMRF - dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), 1, false, "test", configmock.New(t)) + dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), 1, false, client.NewNoopDestinationMetadata(), configmock.New(t), metrics.NewNoopPipelineMonitor("")) isDestMRF := dest.IsMRF() assert.Equal(t, isEndpointMRF, isDestMRF) diff --git a/pkg/logs/client/http/sync_destination.go b/pkg/logs/client/http/sync_destination.go index 62625e6da611ba..ed134f6896e8cc 100644 --- a/pkg/logs/client/http/sync_destination.go +++ b/pkg/logs/client/http/sync_destination.go @@ -30,11 +30,11 @@ func NewSyncDestination(endpoint config.Endpoint, contentType string, destinationsContext *client.DestinationsContext, senderDoneChan chan *sync.WaitGroup, - telemetryName string, + destMeta *client.DestinationMetadata, cfg pkgconfigmodel.Reader) *SyncDestination { return &SyncDestination{ - destination: newDestination(endpoint, contentType, destinationsContext, time.Second*10, 1, false, telemetryName, cfg), + destination: newDestination(endpoint, contentType, destinationsContext, time.Second*10, 1, false, destMeta, cfg, metrics.NewNoopPipelineMonitor("0")), senderDoneChan: senderDoneChan, } } @@ -49,6 +49,11 @@ func (d *SyncDestination) Target() string { return d.destination.url } +// Metadata returns the metadata of the destination +func (d *SyncDestination) Metadata() *client.DestinationMetadata { + return d.destination.destMeta +} + // Start starts reading the input channel func (d *SyncDestination) Start(input chan *message.Payload, output chan *message.Payload, _ chan bool) (stopChan <-chan struct{}) { stop := make(chan struct{}) @@ -62,7 +67,7 @@ func (d *SyncDestination) run(input chan *message.Payload, output chan *message. for p := range input { idle := float64(time.Since(startIdle) / time.Millisecond) d.destination.expVars.AddFloat(expVarIdleMsMapKey, idle) - tlmIdle.Add(idle, d.destination.telemetryName) + tlmIdle.Add(idle, d.destination.destMeta.TelemetryName()) var startInUse = time.Now() err := d.destination.unconditionalSend(p) @@ -84,7 +89,7 @@ func (d *SyncDestination) run(input chan *message.Payload, output chan *message. inUse := float64(time.Since(startInUse) / time.Millisecond) d.destination.expVars.AddFloat(expVarInUseMsMapKey, inUse) - tlmInUse.Add(inUse, d.destination.telemetryName) + tlmInUse.Add(inUse, d.destination.destMeta.TelemetryName()) startIdle = time.Now() } diff --git a/pkg/logs/client/http/test_utils.go b/pkg/logs/client/http/test_utils.go index 98dea192077fbe..c082ec06ed47ac 100644 --- a/pkg/logs/client/http/test_utils.go +++ b/pkg/logs/client/http/test_utils.go @@ -15,6 +15,7 @@ import ( "github.com/DataDog/datadog-agent/comp/logs/agent/config" pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" "github.com/DataDog/datadog-agent/pkg/logs/client" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" ) // StatusCodeContainer is a lock around the status code to return @@ -79,7 +80,7 @@ func NewTestServerWithOptions(statusCode int, senders int, retryDestination bool endpoint.BackoffMax = 10 endpoint.RecoveryInterval = 1 - dest := NewDestination(endpoint, JSONContentType, destCtx, senders, retryDestination, "test", cfg) + dest := NewDestination(endpoint, JSONContentType, destCtx, senders, retryDestination, client.NewNoopDestinationMetadata(), cfg, metrics.NewNoopPipelineMonitor("")) return &TestServer{ httpServer: ts, DestCtx: destCtx, diff --git a/pkg/logs/client/tcp/destination.go b/pkg/logs/client/tcp/destination.go index f0ec9c15206495..1934ea2b3c9304 100644 --- a/pkg/logs/client/tcp/destination.go +++ b/pkg/logs/client/tcp/destination.go @@ -58,6 +58,11 @@ func (d *Destination) Target() string { return d.connManager.address() } +// Metadata is not supported for TCP destinations +func (d *Destination) Metadata() *client.DestinationMetadata { + return client.NewNoopDestinationMetadata() +} + // Start reads from the input, transforms a message into a frame and sends it to a remote server, func (d *Destination) Start(input chan *message.Payload, output chan *message.Payload, isRetrying chan bool) (stopChan <-chan struct{}) { stop := make(chan struct{}) diff --git a/pkg/logs/diagnostic/go.mod b/pkg/logs/diagnostic/go.mod index 3a16868bf29c80..0ea147ef97297a 100644 --- a/pkg/logs/diagnostic/go.mod +++ b/pkg/logs/diagnostic/go.mod @@ -46,6 +46,7 @@ replace ( require ( github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface v0.56.0-rc.3 github.com/DataDog/datadog-agent/comp/logs/agent/config v0.56.0-rc.3 + github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 github.com/DataDog/datadog-agent/pkg/logs/message v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/sources v0.56.0-rc.3 github.com/stretchr/testify v1.9.0 @@ -58,7 +59,6 @@ require ( github.com/DataDog/datadog-agent/pkg/config/env v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/model v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/nodetreemodel v0.0.0-00010101000000-000000000000 // indirect - github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/structure v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/teeconfig v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/utils v0.56.0-rc.3 // indirect diff --git a/pkg/logs/diagnostic/message_receiver.go b/pkg/logs/diagnostic/message_receiver.go index 6a08dddc229d10..3559130757c073 100644 --- a/pkg/logs/diagnostic/message_receiver.go +++ b/pkg/logs/diagnostic/message_receiver.go @@ -9,7 +9,7 @@ import ( "sync" "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface" - "github.com/DataDog/datadog-agent/comp/logs/agent/config" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/message" ) @@ -49,14 +49,14 @@ func NewBufferedMessageReceiver(f Formatter, hostname hostnameinterface.Componen } } return &BufferedMessageReceiver{ - inputChan: make(chan messagePair, config.ChanSize), + inputChan: make(chan messagePair, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")), formatter: f, } } // Start opens new input channel func (b *BufferedMessageReceiver) Start() { - b.inputChan = make(chan messagePair, config.ChanSize) + b.inputChan = make(chan messagePair, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) } // Stop closes the input channel @@ -109,7 +109,7 @@ func (b *BufferedMessageReceiver) HandleMessage(m *message.Message, rendered []b // Filter writes the buffered events from the input channel formatted as a string to the output channel func (b *BufferedMessageReceiver) Filter(filters *Filters, done <-chan struct{}) <-chan string { - out := make(chan string, config.ChanSize) + out := make(chan string, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) go func() { defer close(out) for { diff --git a/pkg/logs/launchers/file/launcher.go b/pkg/logs/launchers/file/launcher.go index e0c44ca20e6e1d..a1e0f363334b4c 100644 --- a/pkg/logs/launchers/file/launcher.go +++ b/pkg/logs/launchers/file/launcher.go @@ -21,6 +21,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/launchers" fileprovider "github.com/DataDog/datadog-agent/pkg/logs/launchers/file/provider" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/pipeline" "github.com/DataDog/datadog-agent/pkg/logs/sources" status "github.com/DataDog/datadog-agent/pkg/logs/status/utils" @@ -309,7 +310,8 @@ func (s *Launcher) startNewTailer(file *tailer.File, m config.TailingMode) bool return false } - tailer := s.createTailer(file, s.pipelineProvider.NextPipelineChan()) + channel, monitor := s.pipelineProvider.NextPipelineChanWithMonitor() + tailer := s.createTailer(file, channel, monitor) var offset int64 var whence int @@ -380,16 +382,17 @@ func (s *Launcher) restartTailerAfterFileRotation(oldTailer *tailer.Tailer, file } // createTailer returns a new initialized tailer -func (s *Launcher) createTailer(file *tailer.File, outputChan chan *message.Message) *tailer.Tailer { +func (s *Launcher) createTailer(file *tailer.File, outputChan chan *message.Message, pipelineMonitor metrics.PipelineMonitor) *tailer.Tailer { tailerInfo := status.NewInfoRegistry() tailerOptions := &tailer.TailerOptions{ - OutputChan: outputChan, - File: file, - SleepDuration: s.tailerSleepDuration, - Decoder: decoder.NewDecoderFromSource(file.Source, tailerInfo), - Info: tailerInfo, - TagAdder: s.tagger, + OutputChan: outputChan, + File: file, + SleepDuration: s.tailerSleepDuration, + Decoder: decoder.NewDecoderFromSource(file.Source, tailerInfo), + Info: tailerInfo, + TagAdder: s.tagger, + PipelineMonitor: pipelineMonitor, } return tailer.NewTailer(tailerOptions) diff --git a/pkg/logs/message/message.go b/pkg/logs/message/message.go index e852a97d70ae2e..0ad1f53a74b078 100644 --- a/pkg/logs/message/message.go +++ b/pkg/logs/message/message.go @@ -43,6 +43,20 @@ type Payload struct { UnencodedSize int } +// Count returns the number of messages +func (m *Payload) Count() int64 { + return int64(len(m.Messages)) +} + +// Size returns the size of the message. +func (m *Payload) Size() int64 { + var size int64 = 0 + for _, m := range m.Messages { + size += m.Size() + } + return size +} + // Message represents a log line sent to datadog, with its metadata type Message struct { MessageContent @@ -51,7 +65,9 @@ type Message struct { Status string IngestionTimestamp int64 // RawDataLen tracks the original size of the message content before any trimming/transformation. - // This is used when calculating the tailer offset - so this will NOT always be equal to `len(Content)`. + // This is used when calculating the tailer offset - so this will NOT always be equal to `len(Content)` + // This is also used to track the original content size before the message is processed and encoded later + // in the pipeline. RawDataLen int // Tags added on processing ProcessingTags []string @@ -210,6 +226,7 @@ func NewMessage(content []byte, origin *Origin, status string, ingestionTimestam }, Origin: origin, Status: status, + RawDataLen: len(content), IngestionTimestamp: ingestionTimestamp, } } @@ -355,6 +372,16 @@ func (m *Message) TagsToString() string { return m.Origin.TagsToString(m.ProcessingTags) } +// Count returns the number of messages +func (m *Message) Count() int64 { + return 1 +} + +// Size returns the size of the message. +func (m *Message) Size() int64 { + return int64(m.RawDataLen) +} + // TruncatedReasonTag returns a tag with the reason for truncation. func TruncatedReasonTag(reason string) string { return fmt.Sprintf("truncated:%s", reason) diff --git a/pkg/logs/metrics/capacity_monitor.go b/pkg/logs/metrics/capacity_monitor.go new file mode 100644 index 00000000000000..b479c910242ea2 --- /dev/null +++ b/pkg/logs/metrics/capacity_monitor.go @@ -0,0 +1,75 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package metrics + +import ( + "sync" + "time" +) + +// CapacityMonitor samples the average capacity of a component over a given interval. +// Capacity is calculated as the difference between the ingress and egress of a payload. +// Because data moves very quickly through components, we need to sample and aggregate this value over time. +type CapacityMonitor struct { + sync.Mutex + ingress int64 + ingressBytes int64 + egress int64 + egressBytes int64 + avg float64 + avgBytes float64 + samples float64 + name string + instance string + ticker *time.Ticker +} + +// NewCapacityMonitor creates a new CapacityMonitor +func NewCapacityMonitor(name, instance string, interval time.Duration) *CapacityMonitor { + return &CapacityMonitor{ + name: name, + instance: instance, + ticker: time.NewTicker(interval), + } +} + +// AddIngress records the ingress of a payload +func (i *CapacityMonitor) AddIngress(pl MeasurablePayload) { + i.Lock() + defer i.Unlock() + i.ingress += pl.Count() + i.ingressBytes += pl.Size() + i.sample() +} + +// AddEgress records the egress of a payload +func (i *CapacityMonitor) AddEgress(pl MeasurablePayload) { + i.Lock() + defer i.Unlock() + i.egress += pl.Count() + i.egressBytes += pl.Size() + i.sample() + +} + +func (i *CapacityMonitor) sample() { + i.samples++ + i.avg = (i.avg*(i.samples-1) + float64(i.ingress-i.egress)) / i.samples + i.avgBytes = (i.avgBytes*(i.samples-1) + float64(i.ingressBytes-i.egressBytes)) / i.samples + i.reportIfNeeded() +} + +func (i *CapacityMonitor) reportIfNeeded() { + select { + case <-i.ticker.C: + TlmCapacity.Set(float64(i.avg), i.name, i.instance) + TlmCapacityBytes.Set(float64(i.avgBytes), i.name, i.instance) + i.avg = 0 + i.avgBytes = 0 + i.samples = 0 + default: + } +} diff --git a/pkg/logs/metrics/metrics.go b/pkg/logs/metrics/metrics.go index 49a6b30bbd597f..8464c544e72938 100644 --- a/pkg/logs/metrics/metrics.go +++ b/pkg/logs/metrics/metrics.go @@ -81,6 +81,10 @@ var ( // TlmLogsDiscardedFromSDSBuffer how many messages were dropped when waiting for an SDS configuration because the buffer is full TlmLogsDiscardedFromSDSBuffer = telemetry.NewCounter("logs", "sds__dropped_from_buffer", nil, "Count of messages dropped from the buffer while waiting for an SDS configuration") + + TlmUtilization = telemetry.NewGauge("logs_component", "utilization", []string{"name", "instance"}, "") + TlmCapacity = telemetry.NewGauge("logs_component", "capacity", []string{"name", "instance"}, "") + TlmCapacityBytes = telemetry.NewGauge("logs_component", "capacity_bytes", []string{"name", "instance"}, "") ) func init() { diff --git a/pkg/logs/metrics/pipeline_monitor.go b/pkg/logs/metrics/pipeline_monitor.go new file mode 100644 index 00000000000000..4c166b91c6d5f0 --- /dev/null +++ b/pkg/logs/metrics/pipeline_monitor.go @@ -0,0 +1,108 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package metrics + +import ( + "sync" + "time" +) + +// MeasurablePayload represents a payload that can be measured in bytes and count +type MeasurablePayload interface { + Size() int64 + Count() int64 +} + +// PipelineMonitor is an interface for monitoring the capacity of a pipeline. +// Pipeline monitors are used to measure both capacity and utilization of components. +type PipelineMonitor interface { + ID() string + ReportComponentIngress(size MeasurablePayload, name string) + ReportComponentEgress(size MeasurablePayload, name string) + MakeUtilizationMonitor(name string) UtilizationMonitor +} + +// NoopPipelineMonitor is a no-op implementation of PipelineMonitor. +// Some instances of logs components do not need to report capacity metrics and +// should use this implementation. +type NoopPipelineMonitor struct { + instanceID string +} + +// NewNoopPipelineMonitor creates a new no-op pipeline monitor +func NewNoopPipelineMonitor(id string) *NoopPipelineMonitor { + return &NoopPipelineMonitor{ + instanceID: id, + } +} + +// ID returns the instance id of the monitor +func (n *NoopPipelineMonitor) ID() string { + return n.instanceID +} + +// ReportComponentIngress does nothing. +func (n *NoopPipelineMonitor) ReportComponentIngress(_ MeasurablePayload, _ string) {} + +// ReportComponentEgress does nothing. +func (n *NoopPipelineMonitor) ReportComponentEgress(_ MeasurablePayload, _ string) {} + +// MakeUtilizationMonitor returns a no-op utilization monitor. +func (n *NoopPipelineMonitor) MakeUtilizationMonitor(_ string) UtilizationMonitor { + return &NoopUtilizationMonitor{} +} + +// TelemetryPipelineMonitor is a PipelineMonitor that reports capacity metrics to telemetry +type TelemetryPipelineMonitor struct { + monitors map[string]*CapacityMonitor + interval time.Duration + instanceID string + lock sync.RWMutex +} + +// NewTelemetryPipelineMonitor creates a new pipeline monitort that reports capacity and utiilization metrics as telemetry +func NewTelemetryPipelineMonitor(interval time.Duration, instanceID string) *TelemetryPipelineMonitor { + return &TelemetryPipelineMonitor{ + monitors: make(map[string]*CapacityMonitor), + interval: interval, + instanceID: instanceID, + lock: sync.RWMutex{}, + } +} + +func (c *TelemetryPipelineMonitor) getMonitor(name string) *CapacityMonitor { + key := name + c.instanceID + c.lock.RLock() + if c.monitors[key] == nil { + c.lock.RUnlock() + c.lock.Lock() + c.monitors[key] = NewCapacityMonitor(name, c.instanceID, c.interval) + c.lock.Unlock() + c.lock.RLock() + } + defer c.lock.RUnlock() + return c.monitors[key] +} + +// ID returns the instance id of the monitor +func (c *TelemetryPipelineMonitor) ID() string { + return c.instanceID +} + +// MakeUtilizationMonitor creates a new utilization monitor for a component. +func (c *TelemetryPipelineMonitor) MakeUtilizationMonitor(name string) UtilizationMonitor { + return NewTelemetryUtilizationMonitor(name, c.instanceID, c.interval) +} + +// ReportComponentIngress reports the ingress of a payload to a component. +func (c *TelemetryPipelineMonitor) ReportComponentIngress(pl MeasurablePayload, name string) { + c.getMonitor(name).AddIngress(pl) +} + +// ReportComponentEgress reports the egress of a payload from a component. +func (c *TelemetryPipelineMonitor) ReportComponentEgress(pl MeasurablePayload, name string) { + c.getMonitor(name).AddEgress(pl) +} diff --git a/pkg/logs/metrics/utilization_monitor.go b/pkg/logs/metrics/utilization_monitor.go new file mode 100644 index 00000000000000..2de1e407a338f4 --- /dev/null +++ b/pkg/logs/metrics/utilization_monitor.go @@ -0,0 +1,67 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package metrics + +import "time" + +// UtilizationMonitor is an interface for monitoring the utilization of a component. +type UtilizationMonitor interface { + Start() + Stop() +} + +// NoopUtilizationMonitor is a no-op implementation of UtilizationMonitor. +type NoopUtilizationMonitor struct{} + +// Start does nothing. +func (n *NoopUtilizationMonitor) Start() {} + +// Stop does nothing. +func (n *NoopUtilizationMonitor) Stop() {} + +// TelemetryUtilizationMonitor is a UtilizationMonitor that reports utilization metrics as telemetry. +// Utilization is calculated as the ratio of time spent in use to the total time. +// Utilization can change rapidly over time based on the workload. So the monitor samples the utilization over a given interval. +type TelemetryUtilizationMonitor struct { + inUse time.Duration + idle time.Duration + startIdle time.Time + startInUse time.Time + name string + instance string + ticker *time.Ticker +} + +// NewTelemetryUtilizationMonitor creates a new TelemetryUtilizationMonitor. +func NewTelemetryUtilizationMonitor(name, instance string, interval time.Duration) *TelemetryUtilizationMonitor { + return &TelemetryUtilizationMonitor{ + startIdle: time.Now(), + startInUse: time.Now(), + name: name, + instance: instance, + ticker: time.NewTicker(interval), + } +} + +// Start starts recording in-use time. +func (u *TelemetryUtilizationMonitor) Start() { + u.idle += time.Since(u.startIdle) + u.startInUse = time.Now() +} + +// Stop stops recording in-use time and reports the utilization if the sample window is met. +func (u *TelemetryUtilizationMonitor) Stop() { + u.inUse += time.Since(u.startInUse) + u.startIdle = time.Now() + select { + case <-u.ticker.C: + TlmUtilization.Set(float64(u.inUse)/(float64(u.idle+u.inUse)), u.name, u.instance) + u.idle = 0 + u.inUse = 0 + default: + } + +} diff --git a/pkg/logs/pipeline/go.mod b/pkg/logs/pipeline/go.mod index bcabc871490b55..a8318af35af741 100644 --- a/pkg/logs/pipeline/go.mod +++ b/pkg/logs/pipeline/go.mod @@ -59,10 +59,12 @@ require ( github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface v0.56.0-rc.3 github.com/DataDog/datadog-agent/comp/logs/agent/config v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/config/model v0.57.0 + github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 github.com/DataDog/datadog-agent/pkg/logs/auditor v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/client v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/diagnostic v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/message v0.56.0-rc.3 + github.com/DataDog/datadog-agent/pkg/logs/metrics v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/processor v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/sds v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/sender v0.56.0-rc.3 @@ -83,11 +85,9 @@ require ( github.com/DataDog/datadog-agent/pkg/collector/check/defaults v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/env v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/nodetreemodel v0.0.0-00010101000000-000000000000 // indirect - github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 // indirect github.com/DataDog/datadog-agent/pkg/config/structure v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/teeconfig v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/utils v0.56.0-rc.3 // indirect - github.com/DataDog/datadog-agent/pkg/logs/metrics v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/logs/sources v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/telemetry v0.56.0-rc.3 // indirect diff --git a/pkg/logs/pipeline/mock/mock.go b/pkg/logs/pipeline/mock/mock.go index 3d07560754a797..448ea1fb2416fb 100644 --- a/pkg/logs/pipeline/mock/mock.go +++ b/pkg/logs/pipeline/mock/mock.go @@ -10,6 +10,7 @@ import ( "context" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/pipeline" ) @@ -52,3 +53,8 @@ func (p *mockProvider) Flush(_ context.Context) {} func (p *mockProvider) NextPipelineChan() chan *message.Message { return p.msgChan } + +// NextPipelineChanWithInstance returns the next pipeline +func (p *mockProvider) NextPipelineChanWithMonitor() (chan *message.Message, metrics.PipelineMonitor) { + return p.msgChan, metrics.NewNoopPipelineMonitor("") +} diff --git a/pkg/logs/pipeline/pipeline.go b/pkg/logs/pipeline/pipeline.go index 0a050d38481adb..7ce76c4d7f4f32 100644 --- a/pkg/logs/pipeline/pipeline.go +++ b/pkg/logs/pipeline/pipeline.go @@ -8,17 +8,20 @@ package pipeline import ( "context" - "fmt" + "strconv" "sync" + "time" "github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface" "github.com/DataDog/datadog-agent/comp/logs/agent/config" pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/client/http" "github.com/DataDog/datadog-agent/pkg/logs/client/tcp" "github.com/DataDog/datadog-agent/pkg/logs/diagnostic" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/processor" "github.com/DataDog/datadog-agent/pkg/logs/sender" "github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface" @@ -26,13 +29,14 @@ import ( // Pipeline processes and sends messages to the backend type Pipeline struct { - InputChan chan *message.Message - flushChan chan struct{} - processor *processor.Processor - strategy sender.Strategy - sender *sender.Sender - serverless bool - flushWg *sync.WaitGroup + InputChan chan *message.Message + flushChan chan struct{} + processor *processor.Processor + strategy sender.Strategy + sender *sender.Sender + serverless bool + flushWg *sync.WaitGroup + pipelineMonitor metrics.PipelineMonitor } // NewPipeline returns a new Pipeline @@ -53,10 +57,11 @@ func NewPipeline(outputChan chan *message.Payload, senderDoneChan = make(chan *sync.WaitGroup) flushWg = &sync.WaitGroup{} } + pipelineMonitor := metrics.NewTelemetryPipelineMonitor(5*time.Second, strconv.Itoa(pipelineID)) - mainDestinations := getDestinations(endpoints, destinationsContext, pipelineID, serverless, senderDoneChan, status, cfg) + mainDestinations := getDestinations(endpoints, destinationsContext, pipelineMonitor, serverless, senderDoneChan, status, cfg) - strategyInput := make(chan *message.Message, config.ChanSize) + strategyInput := make(chan *message.Message, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) senderInput := make(chan *message.Payload, 1) // Only buffer 1 message since payloads can be large flushChan := make(chan struct{}) @@ -73,22 +78,23 @@ func NewPipeline(outputChan chan *message.Payload, encoder = processor.RawEncoder } - strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, flushWg, pipelineID) - logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, config.DestinationPayloadChanSize, senderDoneChan, flushWg) + strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, flushWg, pipelineMonitor) + logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, pkgconfigsetup.Datadog().GetInt("logs_config.payload_channel_size"), senderDoneChan, flushWg, pipelineMonitor) - inputChan := make(chan *message.Message, config.ChanSize) + inputChan := make(chan *message.Message, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size")) processor := processor.New(cfg, inputChan, strategyInput, processingRules, - encoder, diagnosticMessageReceiver, hostname, pipelineID) + encoder, diagnosticMessageReceiver, hostname, pipelineMonitor) return &Pipeline{ - InputChan: inputChan, - flushChan: flushChan, - processor: processor, - strategy: strategy, - sender: logsSender, - serverless: serverless, - flushWg: flushWg, + InputChan: inputChan, + flushChan: flushChan, + processor: processor, + strategy: strategy, + sender: logsSender, + serverless: serverless, + flushWg: flushWg, + pipelineMonitor: pipelineMonitor, } } @@ -117,25 +123,25 @@ func (p *Pipeline) Flush(ctx context.Context) { } } -func getDestinations(endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, pipelineID int, serverless bool, senderDoneChan chan *sync.WaitGroup, status statusinterface.Status, cfg pkgconfigmodel.Reader) *client.Destinations { +func getDestinations(endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, pipelineMonitor metrics.PipelineMonitor, serverless bool, senderDoneChan chan *sync.WaitGroup, status statusinterface.Status, cfg pkgconfigmodel.Reader) *client.Destinations { reliable := []client.Destination{} additionals := []client.Destination{} if endpoints.UseHTTP { for i, endpoint := range endpoints.GetReliableEndpoints() { - telemetryName := fmt.Sprintf("logs_%d_reliable_%d", pipelineID, i) + destMeta := client.NewDestinationMetadata("logs", pipelineMonitor.ID(), "reliable", strconv.Itoa(i)) if serverless { - reliable = append(reliable, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, telemetryName, cfg)) + reliable = append(reliable, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, destMeta, cfg)) } else { - reliable = append(reliable, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, telemetryName, cfg)) + reliable = append(reliable, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, destMeta, cfg, pipelineMonitor)) } } for i, endpoint := range endpoints.GetUnReliableEndpoints() { - telemetryName := fmt.Sprintf("logs_%d_unreliable_%d", pipelineID, i) + destMeta := client.NewDestinationMetadata("logs", pipelineMonitor.ID(), "unreliable", strconv.Itoa(i)) if serverless { - additionals = append(additionals, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, telemetryName, cfg)) + additionals = append(additionals, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, destMeta, cfg)) } else { - additionals = append(additionals, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, telemetryName, cfg)) + additionals = append(additionals, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, destMeta, cfg, pipelineMonitor)) } } return client.NewDestinations(reliable, additionals) @@ -151,13 +157,13 @@ func getDestinations(endpoints *config.Endpoints, destinationsContext *client.De } //nolint:revive // TODO(AML) Fix revive linter -func getStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, endpoints *config.Endpoints, serverless bool, flushWg *sync.WaitGroup, _ int) sender.Strategy { +func getStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, endpoints *config.Endpoints, serverless bool, flushWg *sync.WaitGroup, pipelineMonitor metrics.PipelineMonitor) sender.Strategy { if endpoints.UseHTTP || serverless { encoder := sender.IdentityContentType if endpoints.Main.UseCompression { encoder = sender.NewGzipContentEncoding(endpoints.Main.CompressionLevel) } - return sender.NewBatchStrategy(inputChan, outputChan, flushChan, serverless, flushWg, sender.ArraySerializer, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder) + return sender.NewBatchStrategy(inputChan, outputChan, flushChan, serverless, flushWg, sender.ArraySerializer, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder, pipelineMonitor) } return sender.NewStreamStrategy(inputChan, outputChan, sender.IdentityContentType) } diff --git a/pkg/logs/pipeline/provider.go b/pkg/logs/pipeline/provider.go index 54d3b947a1313c..9ee6ec8a5dfa05 100644 --- a/pkg/logs/pipeline/provider.go +++ b/pkg/logs/pipeline/provider.go @@ -18,6 +18,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/diagnostic" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sds" "github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface" "github.com/DataDog/datadog-agent/pkg/util/log" @@ -32,6 +33,7 @@ type Provider interface { ReconfigureSDSAgentConfig(config []byte) (bool, error) StopSDSProcessing() error NextPipelineChan() chan *message.Message + NextPipelineChanWithMonitor() (chan *message.Message, metrics.PipelineMonitor) // Flush flushes all pipeline contained in this Provider Flush(ctx context.Context) } @@ -181,6 +183,17 @@ func (p *provider) NextPipelineChan() chan *message.Message { return nextPipeline.InputChan } +// NextPipelineChanWithMonitor returns the next pipeline input channel with it's monitor. +func (p *provider) NextPipelineChanWithMonitor() (chan *message.Message, metrics.PipelineMonitor) { + pipelinesLen := len(p.pipelines) + if pipelinesLen == 0 { + return nil, nil + } + index := p.currentPipelineIndex.Inc() % uint32(pipelinesLen) + nextPipeline := p.pipelines[index] + return nextPipeline.InputChan, nextPipeline.pipelineMonitor +} + // Flush flushes synchronously all the contained pipeline of this provider. func (p *provider) Flush(ctx context.Context) { for _, p := range p.pipelines { diff --git a/pkg/logs/processor/processor.go b/pkg/logs/processor/processor.go index f66a8c0c48a4d6..1712ea321ab910 100644 --- a/pkg/logs/processor/processor.go +++ b/pkg/logs/processor/processor.go @@ -26,7 +26,6 @@ const UnstructuredProcessingMetricName = "datadog.logs_agent.tailer.unstructured // A Processor updates messages from an inputChan and pushes // in an outputChan. type Processor struct { - pipelineID int inputChan chan *message.Message outputChan chan *message.Message // strategy input // ReconfigChan transports rules to use in order to reconfigure @@ -40,6 +39,10 @@ type Processor struct { hostname hostnameinterface.Component sds sdsProcessor + + // Telemetry + pipelineMonitor metrics.PipelineMonitor + utilization metrics.UtilizationMonitor } type sdsProcessor struct { @@ -58,13 +61,12 @@ type sdsProcessor struct { // New returns an initialized Processor. func New(cfg pkgconfigmodel.Reader, inputChan, outputChan chan *message.Message, processingRules []*config.ProcessingRule, encoder Encoder, diagnosticMessageReceiver diagnostic.MessageReceiver, hostname hostnameinterface.Component, - pipelineID int) *Processor { + pipelineMonitor metrics.PipelineMonitor) *Processor { waitForSDSConfig := sds.ShouldBufferUntilSDSConfiguration(cfg) maxBufferSize := sds.WaitForConfigurationBufferMaxSize(cfg) return &Processor{ - pipelineID: pipelineID, inputChan: inputChan, outputChan: outputChan, // strategy input ReconfigChan: make(chan sds.ReconfigureOrder), @@ -73,12 +75,14 @@ func New(cfg pkgconfigmodel.Reader, inputChan, outputChan chan *message.Message, done: make(chan struct{}), diagnosticMessageReceiver: diagnosticMessageReceiver, hostname: hostname, + pipelineMonitor: pipelineMonitor, + utilization: pipelineMonitor.MakeUtilizationMonitor("processor"), sds: sdsProcessor{ // will immediately starts buffering if it has been configured as so buffering: waitForSDSConfig, maxBufferSize: maxBufferSize, - scanner: sds.CreateScanner(pipelineID), + scanner: sds.CreateScanner(pipelineMonitor.ID()), }, } } @@ -115,6 +119,7 @@ func (p *Processor) Flush(ctx context.Context) { return } msg := <-p.inputChan + p.pipelineMonitor.ReportComponentIngress(msg, "processor") p.processMessage(msg) } } @@ -217,6 +222,7 @@ func (s *sdsProcessor) resetBuffer() { } func (p *Processor) processMessage(msg *message.Message) { + p.utilization.Start() metrics.LogsDecoded.Add(1) metrics.TlmLogsDecoded.Inc() @@ -241,8 +247,14 @@ func (p *Processor) processMessage(msg *message.Message) { return } + p.utilization.Stop() p.outputChan <- msg + p.pipelineMonitor.ReportComponentIngress(msg, "strategy") + } else { + p.utilization.Stop() } + p.pipelineMonitor.ReportComponentEgress(msg, "processor") + } // applyRedactingRules returns given a message if we should process it or not, diff --git a/pkg/logs/processor/processor_test.go b/pkg/logs/processor/processor_test.go index bb2ff56b024616..236246c174c14a 100644 --- a/pkg/logs/processor/processor_test.go +++ b/pkg/logs/processor/processor_test.go @@ -17,6 +17,7 @@ import ( "github.com/DataDog/datadog-agent/comp/logs/agent/config" "github.com/DataDog/datadog-agent/pkg/logs/diagnostic" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sds" "github.com/DataDog/datadog-agent/pkg/logs/sources" ) @@ -314,6 +315,7 @@ func TestBuffering(t *testing.T) { } hostnameComponent, _ := hostnameinterface.NewMock("testHostnameFromEnvVar") + pm := metrics.NewNoopPipelineMonitor("") p := &Processor{ encoder: JSONEncoder, @@ -326,8 +328,10 @@ func TestBuffering(t *testing.T) { sds: sdsProcessor{ maxBufferSize: len("hello1world") + len("hello2world") + len("hello3world") + 1, buffering: true, - scanner: sds.CreateScanner(42), + scanner: sds.CreateScanner("42"), }, + pipelineMonitor: pm, + utilization: pm.MakeUtilizationMonitor("processor"), } var processedMessages atomic.Int32 diff --git a/pkg/logs/sds/scanner.go b/pkg/logs/sds/scanner.go index 581fe810a7fbb3..b0caf689efbfd7 100644 --- a/pkg/logs/sds/scanner.go +++ b/pkg/logs/sds/scanner.go @@ -11,15 +11,15 @@ package sds import ( "encoding/json" "fmt" - "strconv" "strings" "sync" "time" + sds "github.com/DataDog/dd-sensitive-data-scanner/sds-go/go" + "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" - sds "github.com/DataDog/dd-sensitive-data-scanner/sds-go/go" ) const ScannedTag = "sds_agent:true" @@ -34,7 +34,7 @@ var ( tlmSDSReconfigSuccess = telemetry.NewCounterWithOpts("sds", "reconfiguration_success", []string{"pipeline", "type"}, "Count of SDS reconfiguration success.", telemetry.Options{DefaultMetric: true}) tlmSDSProcessingLatency = telemetry.NewSimpleHistogram("sds", "processing_latency", "Processing latency histogram", - []float64{10, 250, 500, 2000, 5000, 10000}) // unit: us + []float64{10, 250, 500, 2000, 5000, 10000}) // unit: us ) // Scanner wraps an SDS Scanner implementation, adds reconfiguration @@ -63,8 +63,8 @@ type Scanner struct { // CreateScanner creates an SDS scanner. // Use `Reconfigure` to configure it manually. -func CreateScanner(pipelineID int) *Scanner { - scanner := &Scanner{pipelineID: strconv.Itoa(pipelineID)} +func CreateScanner(pipelineID string) *Scanner { + scanner := &Scanner{pipelineID: pipelineID} log.Debugf("creating a new SDS scanner (internal id: %p)", scanner) return scanner } diff --git a/pkg/logs/sds/scanner_nosds.go b/pkg/logs/sds/scanner_nosds.go index 0f1d256f6917ac..c1db02cdea4b76 100644 --- a/pkg/logs/sds/scanner_nosds.go +++ b/pkg/logs/sds/scanner_nosds.go @@ -24,7 +24,7 @@ type Match struct { } // CreateScanner creates a scanner for unsupported platforms/architectures. -func CreateScanner(_ int) *Scanner { +func CreateScanner(_ string) *Scanner { return nil } diff --git a/pkg/logs/sds/scanner_test.go b/pkg/logs/sds/scanner_test.go index bf27ea97ae8e06..4e099d2aec7cb3 100644 --- a/pkg/logs/sds/scanner_test.go +++ b/pkg/logs/sds/scanner_test.go @@ -13,9 +13,10 @@ import ( "testing" "time" - "github.com/DataDog/datadog-agent/pkg/logs/message" sds "github.com/DataDog/dd-sensitive-data-scanner/sds-go/go" "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/pkg/logs/message" ) func TestCreateScanner(t *testing.T) { @@ -68,7 +69,7 @@ func TestCreateScanner(t *testing.T) { // scanner creation // ----- - s := CreateScanner(0) + s := CreateScanner("") require.NotNil(s, "the scanner should not be nil after a creation") @@ -245,7 +246,7 @@ func TestEmptyConfiguration(t *testing.T) { ]} `) - s := CreateScanner(0) + s := CreateScanner("") require.NotNil(s, "the scanner should not be nil after a creation") @@ -350,7 +351,7 @@ func TestIsReady(t *testing.T) { // scanner creation // ----- - s := CreateScanner(0) + s := CreateScanner("") require.NotNil(s, "the scanner should not be nil after a creation") require.False(s.IsReady(), "at this stage, the scanner should not be considered ready, no definitions received") @@ -420,7 +421,7 @@ func TestScan(t *testing.T) { // scanner creation // ----- - s := CreateScanner(0) + s := CreateScanner("") require.NotNil(s, "the returned scanner should not be nil") isActive, _ := s.Reconfigure(ReconfigureOrder{ @@ -509,7 +510,7 @@ func TestCloseCycleScan(t *testing.T) { // ----- for i := 0; i < 10; i++ { - s := CreateScanner(0) + s := CreateScanner("") require.NotNil(s, "the returned scanner should not be nil") _, _ = s.Reconfigure(ReconfigureOrder{ diff --git a/pkg/logs/sender/batch_strategy.go b/pkg/logs/sender/batch_strategy.go index 4949f4a4e708fa..85570cc35e23b8 100644 --- a/pkg/logs/sender/batch_strategy.go +++ b/pkg/logs/sender/batch_strategy.go @@ -13,6 +13,7 @@ import ( "github.com/benbjohnson/clock" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/telemetry" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -36,6 +37,10 @@ type batchStrategy struct { contentEncoding ContentEncoding stopChan chan struct{} // closed when the goroutine has finished clock clock.Clock + + // Telemtry + pipelineMonitor metrics.PipelineMonitor + utilization metrics.UtilizationMonitor } // NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits @@ -49,8 +54,9 @@ func NewBatchStrategy(inputChan chan *message.Message, maxBatchSize int, maxContentSize int, pipelineName string, - contentEncoding ContentEncoding) Strategy { - return newBatchStrategyWithClock(inputChan, outputChan, flushChan, serverless, flushWg, serializer, batchWait, maxBatchSize, maxContentSize, pipelineName, clock.New(), contentEncoding) + contentEncoding ContentEncoding, + pipelineMonitor metrics.PipelineMonitor) Strategy { + return newBatchStrategyWithClock(inputChan, outputChan, flushChan, serverless, flushWg, serializer, batchWait, maxBatchSize, maxContentSize, pipelineName, clock.New(), contentEncoding, pipelineMonitor) } func newBatchStrategyWithClock(inputChan chan *message.Message, @@ -64,7 +70,8 @@ func newBatchStrategyWithClock(inputChan chan *message.Message, maxContentSize int, pipelineName string, clock clock.Clock, - contentEncoding ContentEncoding) Strategy { + contentEncoding ContentEncoding, + pipelineMonitor metrics.PipelineMonitor) Strategy { return &batchStrategy{ inputChan: inputChan, @@ -79,6 +86,8 @@ func newBatchStrategyWithClock(inputChan chan *message.Message, stopChan: make(chan struct{}), pipelineName: pipelineName, clock: clock, + pipelineMonitor: pipelineMonitor, + utilization: pipelineMonitor.MakeUtilizationMonitor("strategy"), } } @@ -141,6 +150,7 @@ func (s *batchStrategy) processMessage(m *message.Message, outputChan chan *mess // flushBuffer sends all the messages that are stored in the buffer and forwards them // to the next stage of the pipeline. func (s *batchStrategy) flushBuffer(outputChan chan *message.Payload) { + s.utilization.Start() if s.buffer.IsEmpty() { return } @@ -169,10 +179,14 @@ func (s *batchStrategy) sendMessages(messages []*message.Message, outputChan cha s.flushWg.Add(1) } - outputChan <- &message.Payload{ + p := &message.Payload{ Messages: messages, Encoded: encodedPayload, Encoding: s.contentEncoding.name(), UnencodedSize: len(serializedMessage), } + s.utilization.Stop() + outputChan <- p + s.pipelineMonitor.ReportComponentEgress(p, "strategy") + s.pipelineMonitor.ReportComponentIngress(p, "sender") } diff --git a/pkg/logs/sender/batch_strategy_test.go b/pkg/logs/sender/batch_strategy_test.go index ff1f6bae1b1077..34cb6be7aa4e9e 100644 --- a/pkg/logs/sender/batch_strategy_test.go +++ b/pkg/logs/sender/batch_strategy_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" ) func TestBatchStrategySendsPayloadWhenBufferIsFull(t *testing.T) { @@ -20,7 +21,7 @@ func TestBatchStrategySendsPayloadWhenBufferIsFull(t *testing.T) { output := make(chan *message.Payload) flushChan := make(chan struct{}) - s := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{}) + s := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{}, metrics.NewNoopPipelineMonitor("")) s.Start() message1 := message.NewMessage([]byte("a"), nil, "", 0) @@ -52,7 +53,7 @@ func TestBatchStrategySendsPayloadWhenBufferIsOutdated(t *testing.T) { timerInterval := 100 * time.Millisecond clk := clock.NewMock() - s := newBatchStrategyWithClock(input, output, flushChan, false, nil, LineSerializer, timerInterval, 100, 100, "test", clk, &identityContentType{}) + s := newBatchStrategyWithClock(input, output, flushChan, false, nil, LineSerializer, timerInterval, 100, 100, "test", clk, &identityContentType{}, metrics.NewNoopPipelineMonitor("")) s.Start() for round := 0; round < 3; round++ { @@ -77,7 +78,7 @@ func TestBatchStrategySendsPayloadWhenClosingInput(t *testing.T) { flushChan := make(chan struct{}) clk := clock.NewMock() - s := newBatchStrategyWithClock(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", clk, &identityContentType{}) + s := newBatchStrategyWithClock(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", clk, &identityContentType{}, metrics.NewNoopPipelineMonitor("")) s.Start() message := message.NewMessage([]byte("a"), nil, "", 0) @@ -102,7 +103,7 @@ func TestBatchStrategyShouldNotBlockWhenStoppingGracefully(t *testing.T) { output := make(chan *message.Payload) flushChan := make(chan struct{}) - s := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{}) + s := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{}, metrics.NewNoopPipelineMonitor("")) s.Start() message := message.NewMessage([]byte{}, nil, "", 0) @@ -126,7 +127,7 @@ func TestBatchStrategySynchronousFlush(t *testing.T) { // batch size is large so it will not flush until we trigger it manually // flush time is large so it won't automatically trigger during this test - strategy := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{}) + strategy := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{}, metrics.NewNoopPipelineMonitor("")) strategy.Start() // all of these messages will get buffered @@ -171,7 +172,7 @@ func TestBatchStrategyFlushChannel(t *testing.T) { // batch size is large so it will not flush until we trigger it manually // flush time is large so it won't automatically trigger during this test - strategy := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{}) + strategy := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{}, metrics.NewNoopPipelineMonitor("")) strategy.Start() // all of these messages will get buffered diff --git a/pkg/logs/sender/destination_sender_test.go b/pkg/logs/sender/destination_sender_test.go index d2ab54715a4f02..3aa930e437e548 100644 --- a/pkg/logs/sender/destination_sender_test.go +++ b/pkg/logs/sender/destination_sender_test.go @@ -13,6 +13,7 @@ import ( configmock "github.com/DataDog/datadog-agent/pkg/config/mock" pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" + "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/message" ) @@ -32,6 +33,10 @@ func (m *mockDestination) Target() string { return "mock-dest" } +func (m *mockDestination) Metadata() *client.DestinationMetadata { + return client.NewNoopDestinationMetadata() +} + func (m *mockDestination) Start(input chan *message.Payload, output chan *message.Payload, isRetrying chan bool) (stopChan <-chan struct{}) { m.input = input m.output = output diff --git a/pkg/logs/sender/go.mod b/pkg/logs/sender/go.mod index 750b501605d928..3e29d53452af90 100644 --- a/pkg/logs/sender/go.mod +++ b/pkg/logs/sender/go.mod @@ -54,6 +54,7 @@ require ( github.com/DataDog/datadog-agent/pkg/config/model v0.57.0 github.com/DataDog/datadog-agent/pkg/logs/client v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/message v0.56.0-rc.3 + github.com/DataDog/datadog-agent/pkg/logs/metrics v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/sources v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface v0.56.0-rc.3 github.com/DataDog/datadog-agent/pkg/telemetry v0.56.0-rc.3 @@ -73,7 +74,6 @@ require ( github.com/DataDog/datadog-agent/pkg/config/structure v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/teeconfig v0.0.0-00010101000000-000000000000 // indirect github.com/DataDog/datadog-agent/pkg/config/utils v0.56.0-rc.3 // indirect - github.com/DataDog/datadog-agent/pkg/logs/metrics v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/logs/status/utils v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/backoff v0.56.0-rc.3 // indirect github.com/DataDog/datadog-agent/pkg/util/executable v0.57.1 // indirect diff --git a/pkg/logs/sender/sender.go b/pkg/logs/sender/sender.go index 31fa4db0bb382a..51ec837383db48 100644 --- a/pkg/logs/sender/sender.go +++ b/pkg/logs/sender/sender.go @@ -13,6 +13,7 @@ import ( pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" "github.com/DataDog/datadog-agent/pkg/logs/client" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/telemetry" ) @@ -38,10 +39,13 @@ type Sender struct { bufferSize int senderDoneChan chan *sync.WaitGroup flushWg *sync.WaitGroup + + pipelineMonitor metrics.PipelineMonitor + utilization metrics.UtilizationMonitor } // NewSender returns a new sender. -func NewSender(config pkgconfigmodel.Reader, inputChan chan *message.Payload, outputChan chan *message.Payload, destinations *client.Destinations, bufferSize int, senderDoneChan chan *sync.WaitGroup, flushWg *sync.WaitGroup) *Sender { +func NewSender(config pkgconfigmodel.Reader, inputChan chan *message.Payload, outputChan chan *message.Payload, destinations *client.Destinations, bufferSize int, senderDoneChan chan *sync.WaitGroup, flushWg *sync.WaitGroup, pipelineMonitor metrics.PipelineMonitor) *Sender { return &Sender{ config: config, inputChan: inputChan, @@ -51,6 +55,10 @@ func NewSender(config pkgconfigmodel.Reader, inputChan chan *message.Payload, ou bufferSize: bufferSize, senderDoneChan: senderDoneChan, flushWg: flushWg, + + // Telemetry + pipelineMonitor: pipelineMonitor, + utilization: pipelineMonitor.MakeUtilizationMonitor("sender"), } } @@ -73,6 +81,7 @@ func (s *Sender) run() { unreliableDestinations := buildDestinationSenders(s.config, s.destinations.Unreliable, sink, s.bufferSize) for payload := range s.inputChan { + s.utilization.Start() var startInUse = time.Now() senderDoneWg := &sync.WaitGroup{} @@ -80,6 +89,9 @@ func (s *Sender) run() { for !sent { for _, destSender := range reliableDestinations { if destSender.Send(payload) { + if destSender.destination.Metadata().ReportingEnabled { + s.pipelineMonitor.ReportComponentIngress(payload, destSender.destination.Metadata().MonitorTag()) + } sent = true if s.senderDoneChan != nil { senderDoneWg.Add(1) @@ -121,6 +133,7 @@ func (s *Sender) run() { inUse := float64(time.Since(startInUse) / time.Millisecond) tlmSendWaitTime.Add(inUse) + s.utilization.Stop() if s.senderDoneChan != nil && s.flushWg != nil { // Wait for all destinations to finish sending the payload @@ -128,6 +141,7 @@ func (s *Sender) run() { // Decrement the wait group when this payload has been sent s.flushWg.Done() } + s.pipelineMonitor.ReportComponentEgress(payload, "sender") } // Cleanup the destinations diff --git a/pkg/logs/sender/sender_test.go b/pkg/logs/sender/sender_test.go index 5fd09caf501d4f..4f35558a469749 100644 --- a/pkg/logs/sender/sender_test.go +++ b/pkg/logs/sender/sender_test.go @@ -17,6 +17,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/logs/client/mock" "github.com/DataDog/datadog-agent/pkg/logs/client/tcp" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sources" "github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface" ) @@ -45,7 +46,7 @@ func TestSender(t *testing.T) { destinations := client.NewDestinations([]client.Destination{destination}, nil) cfg := configmock.New(t) - sender := NewSender(cfg, input, output, destinations, 0, nil, nil) + sender := NewSender(cfg, input, output, destinations, 0, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() expectedMessage := newMessage([]byte("fake line"), source, "") @@ -73,7 +74,7 @@ func TestSenderSingleDestination(t *testing.T) { destinations := client.NewDestinations([]client.Destination{server.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} @@ -103,7 +104,7 @@ func TestSenderDualReliableDestination(t *testing.T) { destinations := client.NewDestinations([]client.Destination{server1.Destination, server2.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} @@ -138,7 +139,7 @@ func TestSenderUnreliableAdditionalDestination(t *testing.T) { destinations := client.NewDestinations([]client.Destination{server1.Destination}, []client.Destination{server2.Destination}) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} @@ -170,7 +171,7 @@ func TestSenderUnreliableStopsWhenMainFails(t *testing.T) { destinations := client.NewDestinations([]client.Destination{reliableServer.Destination}, []client.Destination{unreliableServer.Destination}) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} @@ -219,7 +220,7 @@ func TestSenderReliableContinuseWhenOneFails(t *testing.T) { destinations := client.NewDestinations([]client.Destination{reliableServer1.Destination, reliableServer2.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} @@ -265,7 +266,7 @@ func TestSenderReliableWhenOneFailsAndRecovers(t *testing.T) { destinations := client.NewDestinations([]client.Destination{reliableServer1.Destination, reliableServer2.Destination}, nil) - sender := NewSender(cfg, input, output, destinations, 10, nil, nil) + sender := NewSender(cfg, input, output, destinations, 10, nil, nil, metrics.NewNoopPipelineMonitor("")) sender.Start() input <- &message.Payload{} diff --git a/pkg/logs/tailers/file/tailer.go b/pkg/logs/tailers/file/tailer.go index 4cd584b6e509e9..bf89002288f463 100644 --- a/pkg/logs/tailers/file/tailer.go +++ b/pkg/logs/tailers/file/tailer.go @@ -116,20 +116,22 @@ type Tailer struct { // blocked sending to the tailer's outputChan. stopForward context.CancelFunc - info *status.InfoRegistry - bytesRead *status.CountInfo - movingSum *util.MovingSum + info *status.InfoRegistry + bytesRead *status.CountInfo + movingSum *util.MovingSum + PipelineMonitor metrics.PipelineMonitor } // TailerOptions holds all possible parameters that NewTailer requires in addition to optional parameters that can be optionally passed into. This can be used for more optional parameters if required in future type TailerOptions struct { - OutputChan chan *message.Message // Required - File *File // Required - SleepDuration time.Duration // Required - Decoder *decoder.Decoder // Required - Info *status.InfoRegistry // Required - Rotated bool // Optional - TagAdder tag.EntityTagAdder // Required + OutputChan chan *message.Message // Required + File *File // Required + SleepDuration time.Duration // Required + Decoder *decoder.Decoder // Required + Info *status.InfoRegistry // Required + Rotated bool // Optional + TagAdder tag.EntityTagAdder // Required + PipelineMonitor metrics.PipelineMonitor // Required } // NewTailer returns an initialized Tailer, read to be started. @@ -182,6 +184,7 @@ func NewTailer(opts *TailerOptions) *Tailer { info: opts.Info, bytesRead: bytesRead, movingSum: movingSum, + PipelineMonitor: opts.PipelineMonitor, } if fileRotated { @@ -202,13 +205,14 @@ func addToTailerInfo(k, m string, tailerInfo *status.InfoRegistry) { // messages to the same channel but using an updated file and decoder. func (t *Tailer) NewRotatedTailer(file *File, decoder *decoder.Decoder, info *status.InfoRegistry, tagAdder tag.EntityTagAdder) *Tailer { options := &TailerOptions{ - OutputChan: t.outputChan, - File: file, - SleepDuration: t.sleepDuration, - Decoder: decoder, - Info: info, - Rotated: true, - TagAdder: tagAdder, + OutputChan: t.outputChan, + File: file, + SleepDuration: t.sleepDuration, + Decoder: decoder, + Info: info, + Rotated: true, + TagAdder: tagAdder, + PipelineMonitor: t.PipelineMonitor, } return NewTailer(options) @@ -339,6 +343,8 @@ func (t *Tailer) forwardMessages() { close(t.done) }() for output := range t.decoder.OutputChan { + // metrics.ReportComponentEgress(output, "decoder", strconv.Itoa(t.pipelineID)) + // t.decoderMonitor.Stop() offset := t.decodedOffset.Load() + int64(output.RawDataLen) identifier := t.Identifier() if t.didFileRotate.Load() { @@ -359,13 +365,16 @@ func (t *Tailer) forwardMessages() { if len(output.GetContent()) == 0 { continue } + + msg := message.NewMessage(output.GetContent(), origin, output.Status, output.IngestionTimestamp) // Make the write to the output chan cancellable to be able to stop the tailer // after a file rotation when it is stuck on it. // We don't return directly to keep the same shutdown sequence that in the // normal case. select { // XXX(remy): is it ok recreating a message like this here? - case t.outputChan <- message.NewMessage(output.GetContent(), origin, output.Status, output.IngestionTimestamp): + case t.outputChan <- msg: + t.PipelineMonitor.ReportComponentIngress(msg, "processor") case <-t.forwardContext.Done(): } } diff --git a/pkg/logs/tailers/file/tailer_nix.go b/pkg/logs/tailers/file/tailer_nix.go index a4af026781133d..df0b938c392df9 100644 --- a/pkg/logs/tailers/file/tailer_nix.go +++ b/pkg/logs/tailers/file/tailer_nix.go @@ -56,6 +56,9 @@ func (t *Tailer) read() (int, error) { return 0, nil } t.lastReadOffset.Add(int64(n)) - t.decoder.InputChan <- decoder.NewInput(inBuf[:n]) + msg := decoder.NewInput(inBuf[:n]) + t.decoder.InputChan <- msg + // t.decoderMonitor.Start() + // metrics.ReportComponentIngress(msg, "decoder", strconv.Itoa(t.pipelineID)) return n, nil } diff --git a/pkg/logs/tailers/file/tailer_test.go b/pkg/logs/tailers/file/tailer_test.go index d26c17d25224ce..794a6df4557f9b 100644 --- a/pkg/logs/tailers/file/tailer_test.go +++ b/pkg/logs/tailers/file/tailer_test.go @@ -21,6 +21,7 @@ import ( pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/logs/internal/decoder" "github.com/DataDog/datadog-agent/pkg/logs/message" + "github.com/DataDog/datadog-agent/pkg/logs/metrics" "github.com/DataDog/datadog-agent/pkg/logs/sources" status "github.com/DataDog/datadog-agent/pkg/logs/status/utils" ) @@ -57,11 +58,12 @@ func (suite *TailerTestSuite) SetupTest() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -111,11 +113,12 @@ func (suite *TailerTestSuite) TestTailerTimeDurationConfig() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } tailer := NewTailer(tailerOptions) @@ -278,11 +281,12 @@ func (suite *TailerTestSuite) TestDirTagWhenTailingFiles() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, dirTaggedSource, true), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, dirTaggedSource, true), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -308,11 +312,12 @@ func (suite *TailerTestSuite) TestBuildTagsFileOnly() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, dirTaggedSource, false), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, dirTaggedSource, false), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -335,11 +340,12 @@ func (suite *TailerTestSuite) TestBuildTagsFileDir() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, dirTaggedSource, true), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, dirTaggedSource, true), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -366,11 +372,12 @@ func (suite *TailerTestSuite) TestTruncatedTag() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, source, true), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, source, true), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -398,11 +405,12 @@ func (suite *TailerTestSuite) TestMutliLineAutoDetect() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, suite.source.UnderlyingSource(), true), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, suite.source.UnderlyingSource(), true), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } suite.tailer = NewTailer(tailerOptions) @@ -433,11 +441,12 @@ func (suite *TailerTestSuite) TestDidRotateNilFullpath() { info := status.NewInfoRegistry() tailerOptions := &TailerOptions{ - OutputChan: suite.outputChan, - File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), - SleepDuration: sleepDuration, - Decoder: decoder.NewDecoderFromSource(suite.source, info), - Info: info, + OutputChan: suite.outputChan, + File: NewFile(suite.testPath, suite.source.UnderlyingSource(), false), + SleepDuration: sleepDuration, + Decoder: decoder.NewDecoderFromSource(suite.source, info), + Info: info, + PipelineMonitor: metrics.NewNoopPipelineMonitor(""), } tailer := NewTailer(tailerOptions)