Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logs agent pipeline performance telemetry #30744

Merged
merged 23 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0cea77d
Make channel sizes configurable
gh123man Nov 4, 2024
185ea96
Introduce pipeline monitor, utilization monitor, capacity monitor, an…
gh123man Nov 4, 2024
a19dbb0
Refactor logs agent to use pipeline telemetry
gh123man Nov 4, 2024
a0c07a0
cleanup go mods
gh123man Nov 4, 2024
22d27a6
Increase sample window to 15 seconds
gh123man Nov 6, 2024
7201bae
Stop processor utilization in early returns.
gh123man Nov 6, 2024
fa2cba8
Use ewma instead of time buckets
gh123man Nov 6, 2024
5fc6dbf
Merge branch 'main' into brian/logs-pipeline-telemetry-AMLII-2134
gh123man Nov 6, 2024
f9454d7
update experiments to use logrotate_FS
blt Nov 6, 2024
b7e1c64
Fix deps list
gh123man Nov 6, 2024
899c878
Merge branch 'main' into brian/logs-pipeline-telemetry-AMLII-2134
gh123man Nov 7, 2024
d9929e8
PR feedback
gh123man Nov 7, 2024
5033c78
PR feedback
gh123man Nov 7, 2024
b894b8f
PR feedback
gh123man Nov 8, 2024
f015f35
Defer processor egress
gh123man Nov 11, 2024
b1dd831
Remove dependency on 3rd party ewma library
gh123man Nov 13, 2024
6e27f61
Refactor utilization tracker
gh123man Nov 13, 2024
5c2e611
Refactor utilization monitor to use utilization_tracker
gh123man Nov 13, 2024
2b953cc
Remaining PR feedback
gh123man Nov 13, 2024
66dc166
Allow utilization monitor to call start/stop multiple times
gh123man Nov 13, 2024
beaf565
Merge branch 'main' into brian/logs-pipeline-telemetry-AMLII-2134
gh123man Nov 13, 2024
815ad2a
Merge branch 'main' into brian/logs-pipeline-telemetry-AMLII-2134
gh123man Nov 14, 2024
7f57f8c
Fix format
gh123man Nov 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package eventplatformimpl
import (
"context"
"fmt"
"strconv"
"strings"
"sync"

Expand All @@ -27,6 +28,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/logs/client"
logshttp "github.com/DataDog/datadog-agent/pkg/logs/client/http"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/metrics"
"github.com/DataDog/datadog-agent/pkg/logs/sender"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -393,15 +395,18 @@ func newHTTPPassthroughPipeline(coreConfig model.Reader, eventPlatformReceiver e
if endpoints.InputChanSize <= pkgconfigsetup.DefaultInputChanSize {
endpoints.InputChanSize = desc.defaultInputChanSize
}

pipelineMonitor := metrics.NewNoopPipelineMonitor(strconv.Itoa(pipelineID))

reliable := []client.Destination{}
for i, endpoint := range endpoints.GetReliableEndpoints() {
telemetryName := fmt.Sprintf("%s_%d_reliable_%d", desc.eventType, pipelineID, i)
reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, telemetryName, pkgconfigsetup.Datadog()))
destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "reliable", strconv.Itoa(i))
reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor))
}
additionals := []client.Destination{}
for i, endpoint := range endpoints.GetUnReliableEndpoints() {
telemetryName := fmt.Sprintf("%s_%d_unreliable_%d", desc.eventType, pipelineID, i)
additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, telemetryName, pkgconfigsetup.Datadog()))
destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "unreliable", strconv.Itoa(i))
additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor))
}
destinations := client.NewDestinations(reliable, additionals)
inputChan := make(chan *message.Message, endpoints.InputChanSize)
Expand All @@ -426,14 +431,15 @@ func newHTTPPassthroughPipeline(coreConfig model.Reader, eventPlatformReceiver e
endpoints.BatchMaxSize,
endpoints.BatchMaxContentSize,
desc.eventType,
encoder)
encoder,
pipelineMonitor)
}

a := auditor.NewNullAuditor()
log.Debugf("Initialized event platform forwarder pipeline. eventType=%s mainHosts=%s additionalHosts=%s batch_max_concurrent_send=%d batch_max_content_size=%d batch_max_size=%d, input_chan_size=%d",
desc.eventType, joinHosts(endpoints.GetReliableEndpoints()), joinHosts(endpoints.GetUnReliableEndpoints()), endpoints.BatchMaxConcurrentSend, endpoints.BatchMaxContentSize, endpoints.BatchMaxSize, endpoints.InputChanSize)
return &passthroughPipeline{
sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil),
sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil, pipelineMonitor),
strategy: strategy,
in: inputChan,
auditor: a,
Expand Down
4 changes: 1 addition & 3 deletions comp/logs/agent/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ package config

// Pipeline constraints
const (
ChanSize = 100
DestinationPayloadChanSize = 10
NumberOfPipelines = 4
NumberOfPipelines = 4
)

const (
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,9 @@ func logsagent(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("logs_config.dev_mode_use_proto", true)
config.BindEnvAndSetDefault("logs_config.dd_url_443", "agent-443-intake.logs.datadoghq.com")
config.BindEnvAndSetDefault("logs_config.stop_grace_period", 30)
config.BindEnvAndSetDefault("logs_config.message_channel_size", 100)
config.BindEnvAndSetDefault("logs_config.payload_channel_size", 10)

// maximum time that the unix tailer will hold a log file open after it has been rotated
config.BindEnvAndSetDefault("logs_config.close_timeout", 60)
// maximum time that the windows tailer will hold a log file open, while waiting for
Expand Down
4 changes: 2 additions & 2 deletions pkg/logs/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util/log"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/logs/message"
)

Expand Down Expand Up @@ -104,7 +104,7 @@ func (a *RegistryAuditor) Stop() {
func (a *RegistryAuditor) createChannels() {
a.chansMutex.Lock()
defer a.chansMutex.Unlock()
a.inputChan = make(chan *message.Payload, config.ChanSize)
a.inputChan = make(chan *message.Payload, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should provide the config component as part of New function, so we do not use the pkgconfigsetup.Datadog() global 😄

Maybe something we could do on a separate PR

a.done = make(chan struct{})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/auditor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ replace (

require (
github.com/DataDog/datadog-agent/comp/logs/agent/config v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0
github.com/DataDog/datadog-agent/pkg/logs/message v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/logs/sources v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/status/health v0.56.0-rc.3
Expand All @@ -56,7 +57,6 @@ require (
github.com/DataDog/datadog-agent/pkg/config/env v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/config/model v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/config/nodetreemodel v0.0.0-00010101000000-000000000000 // indirect
github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/config/structure v0.0.0-00010101000000-000000000000 // indirect
github.com/DataDog/datadog-agent/pkg/config/teeconfig v0.0.0-00010101000000-000000000000 // indirect
github.com/DataDog/datadog-agent/pkg/config/utils v0.56.0-rc.3 // indirect
Expand Down
7 changes: 6 additions & 1 deletion pkg/logs/client/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
//nolint:revive // TODO(AML) Fix revive linter
package client

import "github.com/DataDog/datadog-agent/pkg/logs/message"
import (
"github.com/DataDog/datadog-agent/pkg/logs/message"
)

// Destination sends a payload to a specific endpoint over a given network protocol.
type Destination interface {
Expand All @@ -16,6 +18,9 @@ type Destination interface {
// Destination target (e.g. https://agent-intake.logs.datadoghq.com)
Target() string

// Metadata returns the metadata of the destination
Metadata() *DestinationMetadata

// Start starts the destination send loop. close the intput to stop listening for payloads. stopChan is
// signaled when the destination has fully shutdown and all buffered payloads have been flushed. isRetrying is
// signaled when the retry state changes. isRetrying can be nil if you don't need to handle retries.
Expand Down
54 changes: 54 additions & 0 deletions pkg/logs/client/destination_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//nolint:revive // TODO(AML) Fix revive linter
package client

import (
"fmt"
)

// DestinationMetadata contains metadata about a destination
type DestinationMetadata struct {
componentName string
instanceID string
kind string
endpointId string
ReportingEnabled bool
}

// NewDestinationMetadata returns a new DestinationMetadata
func NewDestinationMetadata(componentName, instanceID, kind, endpointId string) *DestinationMetadata {
return &DestinationMetadata{
componentName: componentName,
instanceID: instanceID,
kind: kind,
endpointId: endpointId,
ReportingEnabled: true,
}
}

// NewNoopDestinationMetadata returns a new DestinationMetadata with reporting disabled
func NewNoopDestinationMetadata() *DestinationMetadata {
return &DestinationMetadata{
ReportingEnabled: false,
}
}

// TelemetryName returns the telemetry name for the destination
func (d *DestinationMetadata) TelemetryName() string {
if !d.ReportingEnabled {
return ""
}
return fmt.Sprintf("%s_%s_%s_%s", d.componentName, d.instanceID, d.kind, d.endpointId)
}

// MonitorTag returns the monitor tag for the destination
func (d *DestinationMetadata) MonitorTag() string {
if !d.ReportingEnabled {
return ""
}
return fmt.Sprintf("destination_%s_%s", d.kind, d.endpointId)
}
44 changes: 30 additions & 14 deletions pkg/logs/client/http/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ type Destination struct {
lastRetryError error

// Telemetry
expVars *expvar.Map
telemetryName string
expVars *expvar.Map
destMeta *client.DestinationMetadata
pipelineMonitor metrics.PipelineMonitor
utilization metrics.UtilizationMonitor
}

// NewDestination returns a new Destination.
Expand All @@ -94,17 +96,19 @@ func NewDestination(endpoint config.Endpoint,
destinationsContext *client.DestinationsContext,
maxConcurrentBackgroundSends int,
shouldRetry bool,
telemetryName string,
cfg pkgconfigmodel.Reader) *Destination {
destMeta *client.DestinationMetadata,
cfg pkgconfigmodel.Reader,
pipelineMonitor metrics.PipelineMonitor) *Destination {

return newDestination(endpoint,
contentType,
destinationsContext,
time.Second*10,
maxConcurrentBackgroundSends,
shouldRetry,
telemetryName,
cfg)
destMeta,
cfg,
pipelineMonitor)
}

func newDestination(endpoint config.Endpoint,
Expand All @@ -113,8 +117,9 @@ func newDestination(endpoint config.Endpoint,
timeout time.Duration,
maxConcurrentBackgroundSends int,
shouldRetry bool,
telemetryName string,
cfg pkgconfigmodel.Reader) *Destination {
destMeta *client.DestinationMetadata,
cfg pkgconfigmodel.Reader,
pipelineMonitor metrics.PipelineMonitor) *Destination {

if maxConcurrentBackgroundSends <= 0 {
maxConcurrentBackgroundSends = 1
Expand All @@ -130,8 +135,9 @@ func newDestination(endpoint config.Endpoint,
expVars := &expvar.Map{}
expVars.AddFloat(expVarIdleMsMapKey, 0)
expVars.AddFloat(expVarInUseMsMapKey, 0)
if telemetryName != "" {
metrics.DestinationExpVars.Set(telemetryName, expVars)

if destMeta.ReportingEnabled {
metrics.DestinationExpVars.Set(destMeta.TelemetryName(), expVars)
}

return &Destination{
Expand All @@ -150,8 +156,10 @@ func newDestination(endpoint config.Endpoint,
retryLock: sync.Mutex{},
shouldRetry: shouldRetry,
expVars: expVars,
telemetryName: telemetryName,
destMeta: destMeta,
isMRF: endpoint.IsMRF,
pipelineMonitor: pipelineMonitor,
utilization: pipelineMonitor.MakeUtilizationMonitor(destMeta.MonitorTag()),
}
}

Expand All @@ -175,6 +183,11 @@ func (d *Destination) Target() string {
return d.url
}

// Metadata returns the metadata of the destination
func (d *Destination) Metadata() *client.DestinationMetadata {
return d.destMeta
}

// Start starts reading the input channel
func (d *Destination) Start(input chan *message.Payload, output chan *message.Payload, isRetrying chan bool) (stopChan <-chan struct{}) {
stop := make(chan struct{})
Expand All @@ -186,17 +199,19 @@ func (d *Destination) run(input chan *message.Payload, output chan *message.Payl
var startIdle = time.Now()

for p := range input {
d.utilization.Start()
idle := float64(time.Since(startIdle) / time.Millisecond)
d.expVars.AddFloat(expVarIdleMsMapKey, idle)
tlmIdle.Add(idle, d.telemetryName)
tlmIdle.Add(idle, d.destMeta.TelemetryName())
var startInUse = time.Now()

d.sendConcurrent(p, output, isRetrying)

inUse := float64(time.Since(startInUse) / time.Millisecond)
d.expVars.AddFloat(expVarInUseMsMapKey, inUse)
tlmInUse.Add(inUse, d.telemetryName)
tlmInUse.Add(inUse, d.destMeta.TelemetryName())
startIdle = time.Now()
d.utilization.Stop()
}
// Wait for any pending concurrent sends to finish or terminate
d.wg.Wait()
Expand Down Expand Up @@ -348,6 +363,7 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) {
// internal error. We should retry these requests.
return client.NewRetryableError(errServer)
} else {
d.pipelineMonitor.ReportComponentEgress(payload, d.destMeta.MonitorTag())
return nil
}
}
Expand Down Expand Up @@ -422,7 +438,7 @@ func getMessageTimestamp(messages []*message.Message) int64 {
func prepareCheckConnectivity(endpoint config.Endpoint, cfg pkgconfigmodel.Reader) (*client.DestinationsContext, *Destination) {
ctx := client.NewDestinationsContext()
// Lower the timeout to 5s because HTTP connectivity test is done synchronously during the agent bootstrap sequence
destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, 0, false, "", cfg)
destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, 0, false, client.NewNoopDestinationMetadata(), cfg, metrics.NewNoopPipelineMonitor(""))
return ctx, destination
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/logs/client/http/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/metrics"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
configmock "github.com/DataDog/datadog-agent/pkg/config/mock"
Expand Down Expand Up @@ -360,7 +361,7 @@ func TestDestinationHA(t *testing.T) {
}
isEndpointMRF := endpoint.IsMRF

dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), 1, false, "test", configmock.New(t))
dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), 1, false, client.NewNoopDestinationMetadata(), configmock.New(t), metrics.NewNoopPipelineMonitor(""))
isDestMRF := dest.IsMRF()

assert.Equal(t, isEndpointMRF, isDestMRF)
Expand Down
13 changes: 9 additions & 4 deletions pkg/logs/client/http/sync_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func NewSyncDestination(endpoint config.Endpoint,
contentType string,
destinationsContext *client.DestinationsContext,
senderDoneChan chan *sync.WaitGroup,
telemetryName string,
destMeta *client.DestinationMetadata,
cfg pkgconfigmodel.Reader) *SyncDestination {

return &SyncDestination{
destination: newDestination(endpoint, contentType, destinationsContext, time.Second*10, 1, false, telemetryName, cfg),
destination: newDestination(endpoint, contentType, destinationsContext, time.Second*10, 1, false, destMeta, cfg, metrics.NewNoopPipelineMonitor("0")),
senderDoneChan: senderDoneChan,
}
}
Expand All @@ -49,6 +49,11 @@ func (d *SyncDestination) Target() string {
return d.destination.url
}

// Metadata returns the metadata of the destination
func (d *SyncDestination) Metadata() *client.DestinationMetadata {
return d.destination.destMeta
}

// Start starts reading the input channel
func (d *SyncDestination) Start(input chan *message.Payload, output chan *message.Payload, _ chan bool) (stopChan <-chan struct{}) {
stop := make(chan struct{})
Expand All @@ -62,7 +67,7 @@ func (d *SyncDestination) run(input chan *message.Payload, output chan *message.
for p := range input {
idle := float64(time.Since(startIdle) / time.Millisecond)
d.destination.expVars.AddFloat(expVarIdleMsMapKey, idle)
tlmIdle.Add(idle, d.destination.telemetryName)
tlmIdle.Add(idle, d.destination.destMeta.TelemetryName())
var startInUse = time.Now()

err := d.destination.unconditionalSend(p)
Expand All @@ -84,7 +89,7 @@ func (d *SyncDestination) run(input chan *message.Payload, output chan *message.

inUse := float64(time.Since(startInUse) / time.Millisecond)
d.destination.expVars.AddFloat(expVarInUseMsMapKey, inUse)
tlmInUse.Add(inUse, d.destination.telemetryName)
tlmInUse.Add(inUse, d.destination.destMeta.TelemetryName())
startIdle = time.Now()
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/logs/client/http/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/metrics"
)

// StatusCodeContainer is a lock around the status code to return
Expand Down Expand Up @@ -79,7 +80,7 @@ func NewTestServerWithOptions(statusCode int, senders int, retryDestination bool
endpoint.BackoffMax = 10
endpoint.RecoveryInterval = 1

dest := NewDestination(endpoint, JSONContentType, destCtx, senders, retryDestination, "test", cfg)
dest := NewDestination(endpoint, JSONContentType, destCtx, senders, retryDestination, client.NewNoopDestinationMetadata(), cfg, metrics.NewNoopPipelineMonitor(""))
return &TestServer{
httpServer: ts,
DestCtx: destCtx,
Expand Down
Loading
Loading