Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent pipeline telemetry first pass #30152

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
358ecee
agent pipeline telemetry first pass
gh123man Oct 15, 2024
315050d
Fix size compute
gh123man Oct 16, 2024
5b1a9b5
Count lines not bytes
gh123man Oct 17, 2024
5d34ff0
Merge branch 'main' into logs-agent-obvs-scratch1
gh123man Oct 22, 2024
26c8a6e
utilization metric
gh123man Oct 22, 2024
e9378d8
Fix tests
gh123man Oct 22, 2024
ceef3ea
Fix math
gh123man Oct 22, 2024
0a0d3ac
Fix gauge
gh123man Oct 23, 2024
c1effdb
report later
gh123man Oct 23, 2024
0fd3f97
instrument sender
gh123man Oct 23, 2024
5941534
typo
gh123man Oct 23, 2024
10a1e29
Sample every 5 seconds
gh123man Oct 23, 2024
706d148
Immediate capacity sampling
gh123man Oct 23, 2024
76fdb13
Fix data race
gh123man Oct 23, 2024
492df47
avg sampling
gh123man Oct 23, 2024
8267acc
Lint :(
gh123man Oct 23, 2024
7d1a20c
instrument the tailer
gh123man Oct 24, 2024
8813bf4
Fix race
gh123man Oct 24, 2024
7260f95
Fix another race
gh123man Oct 24, 2024
152e2e9
Hack to fix another race
gh123man Oct 24, 2024
c509898
Cleanup.
gh123man Oct 24, 2024
650efe7
reconfigure utilization metrics
gh123man Oct 24, 2024
3cfb3b7
Formalize most pipeline telemetry
gh123man Oct 29, 2024
e174467
endpoint instance telemetry working
gh123man Oct 29, 2024
14c6696
Fix go mod
gh123man Oct 29, 2024
44a2921
Fix tests
gh123man Oct 30, 2024
f8efcb2
Fix processor reporting
gh123man Oct 30, 2024
e734d05
Fix procesor reporting
gh123man Oct 30, 2024
5707307
Configurable chan size
gh123man Oct 30, 2024
feb2afa
Fix utilization measurement
gh123man Oct 30, 2024
87c9900
tidy
gh123man Oct 30, 2024
cf72691
Cleanup + comments
gh123man Nov 4, 2024
3b6d533
lint
gh123man Nov 4, 2024
2dd42f5
lint
gh123man Nov 4, 2024
120df76
lint
gh123man Nov 4, 2024
d5c7dec
update configs
gh123man Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package eventplatformimpl
import (
"context"
"fmt"
"strconv"
"strings"
"sync"

Expand All @@ -27,6 +28,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/logs/client"
logshttp "github.com/DataDog/datadog-agent/pkg/logs/client/http"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/metrics"
"github.com/DataDog/datadog-agent/pkg/logs/sender"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -393,15 +395,18 @@ func newHTTPPassthroughPipeline(coreConfig model.Reader, eventPlatformReceiver e
if endpoints.InputChanSize <= pkgconfigsetup.DefaultInputChanSize {
endpoints.InputChanSize = desc.defaultInputChanSize
}

pipelineMonitor := metrics.NewNoopPipelineMonitor(strconv.Itoa(pipelineID))

reliable := []client.Destination{}
for i, endpoint := range endpoints.GetReliableEndpoints() {
telemetryName := fmt.Sprintf("%s_%d_reliable_%d", desc.eventType, pipelineID, i)
reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, telemetryName, pkgconfigsetup.Datadog()))
destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "reliable", strconv.Itoa(i))
reliable = append(reliable, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor))
}
additionals := []client.Destination{}
for i, endpoint := range endpoints.GetUnReliableEndpoints() {
telemetryName := fmt.Sprintf("%s_%d_unreliable_%d", desc.eventType, pipelineID, i)
additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, telemetryName, pkgconfigsetup.Datadog()))
destMeta := client.NewDestinationMetadata(desc.eventType, pipelineMonitor.ID(), "unreliable", strconv.Itoa(i))
additionals = append(additionals, logshttp.NewDestination(endpoint, desc.contentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, destMeta, pkgconfigsetup.Datadog(), pipelineMonitor))
}
destinations := client.NewDestinations(reliable, additionals)
inputChan := make(chan *message.Message, endpoints.InputChanSize)
Expand All @@ -426,14 +431,15 @@ func newHTTPPassthroughPipeline(coreConfig model.Reader, eventPlatformReceiver e
endpoints.BatchMaxSize,
endpoints.BatchMaxContentSize,
desc.eventType,
encoder)
encoder,
pipelineMonitor)
}

a := auditor.NewNullAuditor()
log.Debugf("Initialized event platform forwarder pipeline. eventType=%s mainHosts=%s additionalHosts=%s batch_max_concurrent_send=%d batch_max_content_size=%d batch_max_size=%d, input_chan_size=%d",
desc.eventType, joinHosts(endpoints.GetReliableEndpoints()), joinHosts(endpoints.GetUnReliableEndpoints()), endpoints.BatchMaxConcurrentSend, endpoints.BatchMaxContentSize, endpoints.BatchMaxSize, endpoints.InputChanSize)
return &passthroughPipeline{
sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil),
sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10, nil, nil, pipelineMonitor),
strategy: strategy,
in: inputChan,
auditor: a,
Expand Down
4 changes: 1 addition & 3 deletions comp/logs/agent/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ package config

// Pipeline constraints
const (
ChanSize = 100
DestinationPayloadChanSize = 10
NumberOfPipelines = 4
NumberOfPipelines = 4
)

const (
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,9 @@ func logsagent(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("logs_config.dev_mode_use_proto", true)
config.BindEnvAndSetDefault("logs_config.dd_url_443", "agent-443-intake.logs.datadoghq.com")
config.BindEnvAndSetDefault("logs_config.stop_grace_period", 30)
config.BindEnvAndSetDefault("logs_config.message_channel_size", 100)
config.BindEnvAndSetDefault("logs_config.payload_channel_size", 10)

// maximum time that the unix tailer will hold a log file open after it has been rotated
config.BindEnvAndSetDefault("logs_config.close_timeout", 60)
// maximum time that the windows tailer will hold a log file open, while waiting for
Expand Down
4 changes: 2 additions & 2 deletions pkg/logs/auditor/auditor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util/log"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/logs/message"
)

Expand Down Expand Up @@ -104,7 +104,7 @@ func (a *RegistryAuditor) Stop() {
func (a *RegistryAuditor) createChannels() {
a.chansMutex.Lock()
defer a.chansMutex.Unlock()
a.inputChan = make(chan *message.Payload, config.ChanSize)
a.inputChan = make(chan *message.Payload, pkgconfigsetup.Datadog().GetInt("logs_config.message_channel_size"))
a.done = make(chan struct{})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/auditor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ replace (

require (
github.com/DataDog/datadog-agent/comp/logs/agent/config v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0
github.com/DataDog/datadog-agent/pkg/logs/message v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/logs/sources v0.56.0-rc.3
github.com/DataDog/datadog-agent/pkg/status/health v0.56.0-rc.3
Expand All @@ -56,7 +57,6 @@ require (
github.com/DataDog/datadog-agent/pkg/config/env v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/config/model v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/config/nodetreemodel v0.0.0-00010101000000-000000000000 // indirect
github.com/DataDog/datadog-agent/pkg/config/setup v0.57.0 // indirect
github.com/DataDog/datadog-agent/pkg/config/structure v0.0.0-00010101000000-000000000000 // indirect
github.com/DataDog/datadog-agent/pkg/config/teeconfig v0.0.0-00010101000000-000000000000 // indirect
github.com/DataDog/datadog-agent/pkg/config/utils v0.56.0-rc.3 // indirect
Expand Down
7 changes: 6 additions & 1 deletion pkg/logs/client/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
//nolint:revive // TODO(AML) Fix revive linter
package client

import "github.com/DataDog/datadog-agent/pkg/logs/message"
import (
"github.com/DataDog/datadog-agent/pkg/logs/message"
)

// Destination sends a payload to a specific endpoint over a given network protocol.
type Destination interface {
Expand All @@ -16,6 +18,9 @@ type Destination interface {
// Destination target (e.g. https://agent-intake.logs.datadoghq.com)
Target() string

// Metadata returns the metadata of the destination
Metadata() *DestinationMetadata

// Start starts the destination send loop. close the intput to stop listening for payloads. stopChan is
// signaled when the destination has fully shutdown and all buffered payloads have been flushed. isRetrying is
// signaled when the retry state changes. isRetrying can be nil if you don't need to handle retries.
Expand Down
54 changes: 54 additions & 0 deletions pkg/logs/client/destination_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//nolint:revive // TODO(AML) Fix revive linter
package client

import (
"fmt"
)

// DestinationMetadata contains metadata about a destination
type DestinationMetadata struct {
componentName string
instanceID string
kind string
endpointId string
ReportingEnabled bool
}

// NewDestinationMetadata returns a new DestinationMetadata
func NewDestinationMetadata(componentName, instanceID, kind, endpointId string) *DestinationMetadata {
return &DestinationMetadata{
componentName: componentName,
instanceID: instanceID,
kind: kind,
endpointId: endpointId,
ReportingEnabled: true,
}
}

// NewNoopDestinationMetadata returns a new DestinationMetadata with reporting disabled
func NewNoopDestinationMetadata() *DestinationMetadata {
return &DestinationMetadata{
ReportingEnabled: false,
}
}

// TelemetryName returns the telemetry name for the destination
func (d *DestinationMetadata) TelemetryName() string {
if !d.ReportingEnabled {
return ""
}
return fmt.Sprintf("%s_%s_%s_%s", d.componentName, d.instanceID, d.kind, d.endpointId)
}

// MonitorTag returns the monitor tag for the destination
func (d *DestinationMetadata) MonitorTag() string {
if !d.ReportingEnabled {
return ""
}
return fmt.Sprintf("destination_%s_%s", d.kind, d.endpointId)
}
44 changes: 30 additions & 14 deletions pkg/logs/client/http/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ type Destination struct {
lastRetryError error

// Telemetry
expVars *expvar.Map
telemetryName string
expVars *expvar.Map
destMeta *client.DestinationMetadata
pipelineMonitor metrics.PipelineMonitor
utilization metrics.UtilizationMonitor
}

// NewDestination returns a new Destination.
Expand All @@ -94,17 +96,19 @@ func NewDestination(endpoint config.Endpoint,
destinationsContext *client.DestinationsContext,
maxConcurrentBackgroundSends int,
shouldRetry bool,
telemetryName string,
cfg pkgconfigmodel.Reader) *Destination {
destMeta *client.DestinationMetadata,
cfg pkgconfigmodel.Reader,
pipelineMonitor metrics.PipelineMonitor) *Destination {

return newDestination(endpoint,
contentType,
destinationsContext,
time.Second*10,
maxConcurrentBackgroundSends,
shouldRetry,
telemetryName,
cfg)
destMeta,
cfg,
pipelineMonitor)
}

func newDestination(endpoint config.Endpoint,
Expand All @@ -113,8 +117,9 @@ func newDestination(endpoint config.Endpoint,
timeout time.Duration,
maxConcurrentBackgroundSends int,
shouldRetry bool,
telemetryName string,
cfg pkgconfigmodel.Reader) *Destination {
destMeta *client.DestinationMetadata,
cfg pkgconfigmodel.Reader,
pipelineMonitor metrics.PipelineMonitor) *Destination {

if maxConcurrentBackgroundSends <= 0 {
maxConcurrentBackgroundSends = 1
Expand All @@ -130,8 +135,9 @@ func newDestination(endpoint config.Endpoint,
expVars := &expvar.Map{}
expVars.AddFloat(expVarIdleMsMapKey, 0)
expVars.AddFloat(expVarInUseMsMapKey, 0)
if telemetryName != "" {
metrics.DestinationExpVars.Set(telemetryName, expVars)

if destMeta.ReportingEnabled {
metrics.DestinationExpVars.Set(destMeta.TelemetryName(), expVars)
}

return &Destination{
Expand All @@ -150,8 +156,10 @@ func newDestination(endpoint config.Endpoint,
retryLock: sync.Mutex{},
shouldRetry: shouldRetry,
expVars: expVars,
telemetryName: telemetryName,
destMeta: destMeta,
isMRF: endpoint.IsMRF,
pipelineMonitor: pipelineMonitor,
utilization: pipelineMonitor.MakeUtilizationMonitor(destMeta.MonitorTag()),
}
}

Expand All @@ -175,6 +183,11 @@ func (d *Destination) Target() string {
return d.url
}

// Metadata returns the metadata of the destination
func (d *Destination) Metadata() *client.DestinationMetadata {
return d.destMeta
}

// Start starts reading the input channel
func (d *Destination) Start(input chan *message.Payload, output chan *message.Payload, isRetrying chan bool) (stopChan <-chan struct{}) {
stop := make(chan struct{})
Expand All @@ -186,17 +199,19 @@ func (d *Destination) run(input chan *message.Payload, output chan *message.Payl
var startIdle = time.Now()

for p := range input {
d.utilization.Start()
idle := float64(time.Since(startIdle) / time.Millisecond)
d.expVars.AddFloat(expVarIdleMsMapKey, idle)
tlmIdle.Add(idle, d.telemetryName)
tlmIdle.Add(idle, d.destMeta.TelemetryName())
var startInUse = time.Now()

d.sendConcurrent(p, output, isRetrying)

inUse := float64(time.Since(startInUse) / time.Millisecond)
d.expVars.AddFloat(expVarInUseMsMapKey, inUse)
tlmInUse.Add(inUse, d.telemetryName)
tlmInUse.Add(inUse, d.destMeta.TelemetryName())
startIdle = time.Now()
d.utilization.Stop()
}
// Wait for any pending concurrent sends to finish or terminate
d.wg.Wait()
Expand Down Expand Up @@ -348,6 +363,7 @@ func (d *Destination) unconditionalSend(payload *message.Payload) (err error) {
// internal error. We should retry these requests.
return client.NewRetryableError(errServer)
} else {
d.pipelineMonitor.ReportComponentEgress(payload, d.destMeta.MonitorTag())
return nil
}
}
Expand Down Expand Up @@ -422,7 +438,7 @@ func getMessageTimestamp(messages []*message.Message) int64 {
func prepareCheckConnectivity(endpoint config.Endpoint, cfg pkgconfigmodel.Reader) (*client.DestinationsContext, *Destination) {
ctx := client.NewDestinationsContext()
// Lower the timeout to 5s because HTTP connectivity test is done synchronously during the agent bootstrap sequence
destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, 0, false, "", cfg)
destination := newDestination(endpoint, JSONContentType, ctx, time.Second*5, 0, false, client.NewNoopDestinationMetadata(), cfg, metrics.NewNoopPipelineMonitor(""))
return ctx, destination
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/logs/client/http/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/metrics"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
configmock "github.com/DataDog/datadog-agent/pkg/config/mock"
Expand Down Expand Up @@ -360,7 +361,7 @@ func TestDestinationHA(t *testing.T) {
}
isEndpointMRF := endpoint.IsMRF

dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), 1, false, "test", configmock.New(t))
dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), 1, false, client.NewNoopDestinationMetadata(), configmock.New(t), metrics.NewNoopPipelineMonitor(""))
isDestMRF := dest.IsMRF()

assert.Equal(t, isEndpointMRF, isDestMRF)
Expand Down
13 changes: 9 additions & 4 deletions pkg/logs/client/http/sync_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ func NewSyncDestination(endpoint config.Endpoint,
contentType string,
destinationsContext *client.DestinationsContext,
senderDoneChan chan *sync.WaitGroup,
telemetryName string,
destMeta *client.DestinationMetadata,
cfg pkgconfigmodel.Reader) *SyncDestination {

return &SyncDestination{
destination: newDestination(endpoint, contentType, destinationsContext, time.Second*10, 1, false, telemetryName, cfg),
destination: newDestination(endpoint, contentType, destinationsContext, time.Second*10, 1, false, destMeta, cfg, metrics.NewNoopPipelineMonitor("0")),
senderDoneChan: senderDoneChan,
}
}
Expand All @@ -49,6 +49,11 @@ func (d *SyncDestination) Target() string {
return d.destination.url
}

// Metadata returns the metadata of the destination
func (d *SyncDestination) Metadata() *client.DestinationMetadata {
return d.destination.destMeta
}

// Start starts reading the input channel
func (d *SyncDestination) Start(input chan *message.Payload, output chan *message.Payload, _ chan bool) (stopChan <-chan struct{}) {
stop := make(chan struct{})
Expand All @@ -62,7 +67,7 @@ func (d *SyncDestination) run(input chan *message.Payload, output chan *message.
for p := range input {
idle := float64(time.Since(startIdle) / time.Millisecond)
d.destination.expVars.AddFloat(expVarIdleMsMapKey, idle)
tlmIdle.Add(idle, d.destination.telemetryName)
tlmIdle.Add(idle, d.destination.destMeta.TelemetryName())
var startInUse = time.Now()

err := d.destination.unconditionalSend(p)
Expand All @@ -84,7 +89,7 @@ func (d *SyncDestination) run(input chan *message.Payload, output chan *message.

inUse := float64(time.Since(startInUse) / time.Millisecond)
d.destination.expVars.AddFloat(expVarInUseMsMapKey, inUse)
tlmInUse.Add(inUse, d.destination.telemetryName)
tlmInUse.Add(inUse, d.destination.destMeta.TelemetryName())
startIdle = time.Now()
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/logs/client/http/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/metrics"
)

// StatusCodeContainer is a lock around the status code to return
Expand Down Expand Up @@ -79,7 +80,7 @@ func NewTestServerWithOptions(statusCode int, senders int, retryDestination bool
endpoint.BackoffMax = 10
endpoint.RecoveryInterval = 1

dest := NewDestination(endpoint, JSONContentType, destCtx, senders, retryDestination, "test", cfg)
dest := NewDestination(endpoint, JSONContentType, destCtx, senders, retryDestination, client.NewNoopDestinationMetadata(), cfg, metrics.NewNoopPipelineMonitor(""))
return &TestServer{
httpServer: ts,
DestCtx: destCtx,
Expand Down
Loading
Loading