Skip to content

Commit

Permalink
Add logs agent pipeline performance telemetry (#30744)
Browse files Browse the repository at this point in the history
Co-authored-by: blt <[email protected]>
  • Loading branch information
gh123man and blt authored Nov 15, 2024
1 parent c3cdb22 commit de46c0a
Show file tree
Hide file tree
Showing 67 changed files with 968 additions and 226 deletions.
1 change: 1 addition & 0 deletions cmd/serverless/dependencies_linux_amd64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ github.com/DataDog/datadog-agent/pkg/util/sync
github.com/DataDog/datadog-agent/pkg/util/system
github.com/DataDog/datadog-agent/pkg/util/system/socket
github.com/DataDog/datadog-agent/pkg/util/tmplvar
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker
github.com/DataDog/datadog-agent/pkg/version
github.com/DataDog/datadog-api-client-go/v2
github.com/DataDog/datadog-api-client-go/v2/api/datadog
Expand Down
1 change: 1 addition & 0 deletions cmd/serverless/dependencies_linux_arm64.txt
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ github.com/DataDog/datadog-agent/pkg/util/sync
github.com/DataDog/datadog-agent/pkg/util/system
github.com/DataDog/datadog-agent/pkg/util/system/socket
github.com/DataDog/datadog-agent/pkg/util/tmplvar
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker
github.com/DataDog/datadog-agent/pkg/version
github.com/DataDog/datadog-api-client-go/v2
github.com/DataDog/datadog-api-client-go/v2/api/datadog
Expand Down
18 changes: 12 additions & 6 deletions comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package eventplatformimpl
import (
"context"
"fmt"
"strconv"
"strings"
"sync"

Expand All @@ -27,6 +28,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/logs/client"
logshttp "github.com/DataDog/datadog-agent/pkg/logs/client/http"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/metrics"
"github.com/DataDog/datadog-agent/pkg/logs/sender"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -393,15 +395,18 @@ func newHTTPPassthroughPipeline(coreConfig model.Reader, eventPlatformReceiver e
if endpoints.InputChanSize <= pkgconfigsetup.DefaultInputChanSize {
endpoints.InputChanSize = desc.defaultInputChanSize
}

pipelineMonitor := metrics.NewNoopPipelineMonitor(strconv.Itoa(pipelineID))

reliable := []client.Destination{}
for i, endpoint := range endpoints.GetReliableEndpoints() {
telemetryName := fmt.Sprintf("%s_%d_reliable_%d", desc.eventType, pipelineID, i)
reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, telemetryName, pkgconfigsetup.Datadog()))
destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "reliable", strconv.Itoa(i))
reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor))
}
additionals := []client.Destination{}
for i, endpoint := range endpoints.GetUnReliableEndpoints() {
telemetryName := fmt.Sprintf("%s_%d_unreliable_%d", desc.eventType, pipelineID, i)
additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, telemetryName, pkgconfigsetup.Datadog()))
destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "unreliable", strconv.Itoa(i))
additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor))
}
destinations := client.NewDestinations(reliable, additionals)
inputChan := make(chan *message.Message, endpoints.InputChanSize)
Expand All @@ -426,14 +431,15 @@ func newHTTPPassthroughPipeline(coreConfig model.Reader, eventPlatformReceiver e
endpoints.BatchMaxSize,
endpoints.BatchMaxContentSize,
desc.eventType,
encoder)
encoder,
pipelineMonitor)
}

a := auditor.NewNullAuditor()
log.Debugf("Initialized event platform forwarder pipeline. eventType=%s mainHosts=%s additionalHosts=%s batch_max_concurrent_send=%d batch_max_content_size=%d batch_max_size=%d, input_chan_size=%d",
desc.eventType, joinHosts(endpoints.GetReliableEndpoints()), joinHosts(endpoints.GetUnReliableEndpoints()), endpoints.BatchMaxConcurrentSend, endpoints.BatchMaxContentSize, endpoints.BatchMaxSize, endpoints.InputChanSize)
return &passthroughPipeline{
sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil),
sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil, pipelineMonitor),
strategy: strategy,
in: inputChan,
auditor: a,
Expand Down
4 changes: 1 addition & 3 deletions comp/logs/agent/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ package config

// Pipeline constraints
const (
ChanSize = 100
DestinationPayloadChanSize = 10
NumberOfPipelines = 4
NumberOfPipelines = 4
)

const (
Expand Down
2 changes: 2 additions & 0 deletions comp/otelcol/ddflareextension/impl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ replace (
github.com/DataDog/datadog-agent/pkg/util/system => ../../../../pkg/util/system
github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../../../pkg/util/system/socket
github.com/DataDog/datadog-agent/pkg/util/testutil => ../../../../pkg/util/testutil
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../../../pkg/util/utilizationtracker
github.com/DataDog/datadog-agent/pkg/util/winutil => ../../../../pkg/util/winutil
github.com/DataDog/datadog-agent/pkg/version => ../../../../pkg/version
github.com/coreos/go-systemd => github.com/coreos/go-systemd v0.0.0-20180202092358-40e2722dffea
Expand Down Expand Up @@ -239,6 +240,7 @@ require (
github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect
github.com/DataDog/datadog-agent/pkg/util/system v0.57.1 // indirect
github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.1 // indirect
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect
github.com/DataDog/datadog-api-client-go/v2 v2.26.0 // indirect
github.com/DataDog/datadog-go/v5 v5.5.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions comp/otelcol/logsagentpipeline/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ replace (
github.com/DataDog/datadog-agent/pkg/util/system => ../../../pkg/util/system
github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../../pkg/util/system/socket
github.com/DataDog/datadog-agent/pkg/util/testutil => ../../../pkg/util/testutil
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../../pkg/util/utilizationtracker
github.com/DataDog/datadog-agent/pkg/util/winutil => ../../../pkg/util/winutil
github.com/DataDog/datadog-agent/pkg/version => ../../../pkg/version
)
Expand Down Expand Up @@ -103,6 +104,7 @@ require (
github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect
github.com/DataDog/datadog-agent/pkg/util/system v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect
github.com/DataDog/datadog-agent/pkg/version v0.56.0-rc.3 // indirect
github.com/DataDog/dd-sensitive-data-scanner/sds-go/go v0.0.0-20240816154533-f7f9beb53a42 // indirect
Expand Down
2 changes: 2 additions & 0 deletions comp/otelcol/logsagentpipeline/logsagentpipelineimpl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ replace (
github.com/DataDog/datadog-agent/pkg/util/system => ../../../../pkg/util/system
github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../../../pkg/util/system/socket
github.com/DataDog/datadog-agent/pkg/util/testutil => ../../../../pkg/util/testutil
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../../../pkg/util/utilizationtracker
github.com/DataDog/datadog-agent/pkg/util/winutil => ../../../../pkg/util/winutil
github.com/DataDog/datadog-agent/pkg/version => ../../../../pkg/version
)
Expand Down Expand Up @@ -118,6 +119,7 @@ require (
github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect
github.com/DataDog/datadog-agent/pkg/util/system v0.57.1 // indirect
github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.1 // indirect
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect
github.com/DataDog/datadog-agent/pkg/version v0.56.0-rc.3 // indirect
github.com/DataDog/dd-sensitive-data-scanner/sds-go/go v0.0.0-20240816154533-f7f9beb53a42 // indirect
Expand Down
2 changes: 2 additions & 0 deletions comp/otelcol/otlp/components/exporter/datadogexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ replace (
github.com/DataDog/datadog-agent/pkg/util/system => ../../../../../../pkg/util/system/
github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../../../../../pkg/util/system/socket/
github.com/DataDog/datadog-agent/pkg/util/testutil => ../../../../../../pkg/util/testutil/
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../../../../../pkg/util/utilizationtracker/
github.com/DataDog/datadog-agent/pkg/util/winutil => ../../../../../../pkg/util/winutil/
github.com/DataDog/datadog-agent/pkg/version => ../../../../../../pkg/version
)
Expand Down Expand Up @@ -189,6 +190,7 @@ require (
github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect
github.com/DataDog/datadog-agent/pkg/util/system v0.57.1 // indirect
github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.1 // indirect
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect
github.com/DataDog/datadog-agent/pkg/version v0.57.1 // indirect
github.com/DataDog/datadog-api-client-go/v2 v2.26.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ replace (
github.com/DataDog/datadog-agent/pkg/util/system => ./pkg/util/system
github.com/DataDog/datadog-agent/pkg/util/system/socket => ./pkg/util/system/socket/
github.com/DataDog/datadog-agent/pkg/util/testutil => ./pkg/util/testutil
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ./pkg/util/utilizationtracker
github.com/DataDog/datadog-agent/pkg/util/uuid => ./pkg/util/uuid
github.com/DataDog/datadog-agent/pkg/util/winutil => ./pkg/util/winutil/
github.com/DataDog/datadog-agent/pkg/version => ./pkg/version
Expand Down Expand Up @@ -605,6 +606,7 @@ require (
github.com/DataDog/datadog-agent/comp/otelcol/ddflareextension/impl v0.0.0-00010101000000-000000000000
github.com/DataDog/datadog-agent/pkg/config/structure v0.60.0-devel
github.com/DataDog/datadog-agent/pkg/util/defaultpaths v0.0.0-00010101000000-000000000000
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0
github.com/NVIDIA/go-nvml v0.12.4-0
github.com/containerd/containerd/api v1.8.0
github.com/containerd/errdefs v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions modules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ modules:
used_by_otel: true
pkg/util/testutil:
used_by_otel: true
pkg/util/utilizationtracker:
used_by_otel: true
pkg/util/uuid: default
pkg/util/winutil:
used_by_otel: true
Expand Down
12 changes: 7 additions & 5 deletions pkg/collector/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/telemetry"
"github.com/DataDog/datadog-agent/pkg/util/hostname"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/utilizationtracker"
)

const (
Expand Down Expand Up @@ -122,7 +123,8 @@ func newWorkerWithOptions(
func (w *Worker) Run() {
log.Debugf("Runner %d, worker %d: Ready to process checks...", w.runnerID, w.ID)

utilizationTracker := NewUtilizationTracker(w.Name, w.utilizationTickInterval)
alpha := 0.25 // converges to 99.98% of constant input in 30 iterations.
utilizationTracker := utilizationtracker.NewUtilizationTracker(w.utilizationTickInterval, alpha)
defer utilizationTracker.Stop()

startUtilizationUpdater(w.Name, utilizationTracker)
Expand All @@ -146,12 +148,12 @@ func (w *Worker) Run() {
expvars.AddRunningCheckCount(1)
expvars.SetRunningStats(check.ID(), checkStartTime)

utilizationTracker.CheckStarted()
utilizationTracker.Started()

// Run the check
checkErr := check.Run()

utilizationTracker.CheckFinished()
utilizationTracker.Finished()

expvars.DeleteRunningStats(check.ID())

Expand Down Expand Up @@ -210,7 +212,7 @@ func (w *Worker) Run() {
log.Debugf("Runner %d, worker %d: Finished processing checks.", w.runnerID, w.ID)
}

func startUtilizationUpdater(name string, ut *UtilizationTracker) {
func startUtilizationUpdater(name string, ut *utilizationtracker.UtilizationTracker) {
expvars.SetWorkerStats(name, &expvars.WorkerStats{
Utilization: 0.0,
})
Expand All @@ -229,7 +231,7 @@ func startUtilizationUpdater(name string, ut *UtilizationTracker) {
}()
}

func startTrackerTicker(ut *UtilizationTracker, interval time.Duration) func() {
func startTrackerTicker(ut *utilizationtracker.UtilizationTracker, interval time.Duration) func() {
ticker := time.NewTicker(interval)
cancel := make(chan struct{}, 1)
done := make(chan struct{})
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,9 @@ func logsagent(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("logs_config.dev_mode_use_proto", true)
config.BindEnvAndSetDefault("logs_config.dd_url_443", "agent-443-intake.logs.datadoghq.com")
config.BindEnvAndSetDefault("logs_config.stop_grace_period", 30)
config.BindEnvAndSetDefault("logs_config.message_channel_size", 100)
config.BindEnvAndSetDefault("logs_config.payload_channel_size", 10)

// maximum time that the unix tailer will hold a log file open after it has been rotated
config.BindEnvAndSetDefault("logs_config.close_timeout", 60)
// maximum time that the windows tailer will hold a log file open, while waiting for
Expand Down
4 changes: 2 additions & 2 deletions pkg/logs/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util/log"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/logs/message"
)

Expand Down Expand Up @@ -104,7 +104,7 @@ func (a *RegistryAuditor) Stop() {
func (a *RegistryAuditor) createChannels() {
a.chansMutex.Lock()
defer a.chansMutex.Unlock()
a.inputChan = make(chan *message.Payload, config.ChanSize)
a.inputChan = make(chan *message.Payload, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size"))
a.done = make(chan struct{})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/auditor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ replace (

require (
github.com/DataDog/datadog-agent/comp/logs/agent/config v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0
github.com/DataDog/datadog-agent/pkg/logs/message v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/logs/sources v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/status/health v0.56.0-rc.3
Expand All @@ -56,7 +57,6 @@ require (
github.com/DataDog/datadog-agent/pkg/config/env v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/config/model v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/config/nodetreemodel v0.0.0-00010101000000-000000000000 // indirect
github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/config/structure v0.0.0-00010101000000-000000000000 // indirect
github.com/DataDog/datadog-agent/pkg/config/teeconfig v0.0.0-00010101000000-000000000000 // indirect
github.com/DataDog/datadog-agent/pkg/config/utils v0.56.0-rc.3 // indirect
Expand Down
7 changes: 6 additions & 1 deletion pkg/logs/client/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
//nolint:revive // TODO(AML) Fix revive linter
package client

import "github.com/DataDog/datadog-agent/pkg/logs/message"
import (
"github.com/DataDog/datadog-agent/pkg/logs/message"
)

// Destination sends a payload to a specific endpoint over a given network protocol.
type Destination interface {
Expand All @@ -16,6 +18,9 @@ type Destination interface {
// Destination target (e.g. https://agent-intake.logs.datadoghq.com)
Target() string

// Metadata returns the metadata of the destination
Metadata() *DestinationMetadata

// Start starts the destination send loop. close the intput to stop listening for payloads. stopChan is
// signaled when the destination has fully shutdown and all buffered payloads have been flushed. isRetrying is
// signaled when the retry state changes. isRetrying can be nil if you don't need to handle retries.
Expand Down
54 changes: 54 additions & 0 deletions pkg/logs/client/destination_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//nolint:revive // TODO(AML) Fix revive linter
package client

import (
"fmt"
)

// DestinationMetadata contains metadata about a destination
type DestinationMetadata struct {
componentName string
instanceID string
kind string
endpointId string
ReportingEnabled bool
}

// NewDestinationMetadata returns a new DestinationMetadata
func NewDestinationMetadata(componentName, instanceID, kind, endpointId string) *DestinationMetadata {
return &DestinationMetadata{
componentName: componentName,
instanceID: instanceID,
kind: kind,
endpointId: endpointId,
ReportingEnabled: true,
}
}

// NewNoopDestinationMetadata returns a new DestinationMetadata with reporting disabled
func NewNoopDestinationMetadata() *DestinationMetadata {
return &DestinationMetadata{
ReportingEnabled: false,
}
}

// TelemetryName returns the telemetry name for the destination
func (d *DestinationMetadata) TelemetryName() string {
if !d.ReportingEnabled {
return ""
}
return fmt.Sprintf("%s_%s_%s_%s", d.componentName, d.instanceID, d.kind, d.endpointId)
}

// MonitorTag returns the monitor tag for the destination
func (d *DestinationMetadata) MonitorTag() string {
if !d.ReportingEnabled {
return ""
}
return fmt.Sprintf("destination_%s_%s", d.kind, d.endpointId)
}
3 changes: 3 additions & 0 deletions pkg/logs/client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ replace (
github.com/DataDog/datadog-agent/pkg/util/system => ../../util/system
github.com/DataDog/datadog-agent/pkg/util/system/socket => ../../util/system/socket
github.com/DataDog/datadog-agent/pkg/util/testutil => ../../util/testutil
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker => ../../util/utilizationtracker
github.com/DataDog/datadog-agent/pkg/util/winutil => ../../util/winutil
github.com/DataDog/datadog-agent/pkg/version => ../../version
)
Expand Down Expand Up @@ -87,9 +88,11 @@ require (
github.com/DataDog/datadog-agent/pkg/util/statstracker v0.56.0-rc.3 // indirect
github.com/DataDog/datadog-agent/pkg/util/system v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/system/socket v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/utilizationtracker v0.0.0 // indirect
github.com/DataDog/datadog-agent/pkg/util/winutil v0.57.1 // indirect
github.com/DataDog/viper v1.13.5 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/logs/client/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit de46c0a

Please sign in to comment.