Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APR-55] Add support for failing over logs in High Availability mode. #23502

Merged
merged 10 commits into from
Mar 13, 2024
14 changes: 8 additions & 6 deletions comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"go.uber.org/fx"

configcomp "github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver"
Expand Down Expand Up @@ -344,7 +345,7 @@ type passthroughPipelineDesc struct {

// newHTTPPassthroughPipeline creates a new HTTP-only event platform pipeline that sends messages directly to intake
// without any of the processing that exists in regular logs pipelines.
func newHTTPPassthroughPipeline(eventPlatformReceiver eventplatformreceiver.Component, desc passthroughPipelineDesc, destinationsContext *client.DestinationsContext, pipelineID int) (p *passthroughPipeline, err error) {
func newHTTPPassthroughPipeline(coreConfig pkgconfig.Reader, eventPlatformReceiver eventplatformreceiver.Component, desc passthroughPipelineDesc, destinationsContext *client.DestinationsContext, pipelineID int) (p *passthroughPipeline, err error) {
configKeys := config.NewLogsConfigKeys(desc.endpointsConfigPrefix, pkgconfig.Datadog)
endpoints, err := config.BuildHTTPEndpointsWithConfig(pkgconfig.Datadog, configKeys, desc.hostnameEndpointPrefix, desc.intakeTrackType, config.DefaultIntakeProtocol, config.DefaultIntakeOrigin)
if err != nil {
Expand Down Expand Up @@ -404,7 +405,7 @@ func newHTTPPassthroughPipeline(eventPlatformReceiver eventplatformreceiver.Comp
log.Debugf("Initialized event platform forwarder pipeline. eventType=%s mainHosts=%s additionalHosts=%s batch_max_concurrent_send=%d batch_max_content_size=%d batch_max_size=%d, input_chan_size=%d",
desc.eventType, joinHosts(endpoints.GetReliableEndpoints()), joinHosts(endpoints.GetUnReliableEndpoints()), endpoints.BatchMaxConcurrentSend, endpoints.BatchMaxContentSize, endpoints.BatchMaxSize, endpoints.InputChanSize)
return &passthroughPipeline{
sender: sender.NewSender(senderInput, a.Channel(), destinations, 10),
sender: sender.NewSender(coreConfig, senderInput, a.Channel(), destinations, 10),
strategy: strategy,
in: inputChan,
auditor: a,
Expand Down Expand Up @@ -436,12 +437,12 @@ func joinHosts(endpoints []config.Endpoint) string {
return strings.Join(additionalHosts, ",")
}

func newDefaultEventPlatformForwarder(eventPlatformReceiver eventplatformreceiver.Component) *defaultEventPlatformForwarder {
func newDefaultEventPlatformForwarder(config pkgconfig.Reader, eventPlatformReceiver eventplatformreceiver.Component) *defaultEventPlatformForwarder {
destinationsCtx := client.NewDestinationsContext()
destinationsCtx.Start()
pipelines := make(map[string]*passthroughPipeline)
for i, desc := range passthroughPipelineDescs {
p, err := newHTTPPassthroughPipeline(eventPlatformReceiver, desc, destinationsCtx, i)
p, err := newHTTPPassthroughPipeline(config, eventPlatformReceiver, desc, destinationsCtx, i)
if err != nil {
log.Errorf("Failed to initialize event platform forwarder pipeline. eventType=%s, error=%s", desc.eventType, err.Error())
continue
Expand All @@ -457,6 +458,7 @@ func newDefaultEventPlatformForwarder(eventPlatformReceiver eventplatformreceive
type dependencies struct {
fx.In
Params Params
Config configcomp.Component
EventPlatformReceiver eventplatformreceiver.Component
Hostname hostnameinterface.Component
}
Expand All @@ -468,7 +470,7 @@ func newEventPlatformForwarder(deps dependencies) eventplatform.Component {
if deps.Params.UseNoopEventPlatformForwarder {
forwarder = NewNoopEventPlatformForwarder(deps.Hostname)
} else if deps.Params.UseEventPlatformForwarder {
forwarder = newDefaultEventPlatformForwarder(deps.EventPlatformReceiver)
forwarder = newDefaultEventPlatformForwarder(deps.Config, deps.EventPlatformReceiver)
}
if forwarder == nil {
return optional.NewNoneOptionPtr[eventplatform.Forwarder]()
Expand All @@ -479,7 +481,7 @@ func newEventPlatformForwarder(deps dependencies) eventplatform.Component {
// NewNoopEventPlatformForwarder returns the standard event platform forwarder with sending disabled, meaning events
// will build up in each pipeline channel without being forwarded to the intake
func NewNoopEventPlatformForwarder(hostname hostnameinterface.Component) eventplatform.Forwarder {
f := newDefaultEventPlatformForwarder(eventplatformreceiverimpl.NewReceiver(hostname))
f := newDefaultEventPlatformForwarder(pkgconfig.Datadog, eventplatformreceiverimpl.NewReceiver(hostname))
// remove the senders
for _, p := range f.pipelines {
p.strategy = nil
Expand Down
38 changes: 37 additions & 1 deletion comp/logs/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func BuildEndpointsWithConfig(coreConfig pkgconfigmodel.Reader, logsConfig *Logs
log.Warnf("Use of illegal configuration parameter, if you need to send your logs to a proxy, "+
"please use '%s' and '%s' instead", logsConfig.getConfigKey("logs_dd_url"), logsConfig.getConfigKey("logs_no_ssl"))
}
if logsConfig.isForceHTTPUse() || logsConfig.obsPipelineWorkerEnabled() || (bool(httpConnectivity) && !(logsConfig.isForceTCPUse() || logsConfig.isSocks5ProxySet() || logsConfig.hasAdditionalEndpoints())) {

haEnabled := coreConfig.GetBool("ha.enabled")
if logsConfig.isForceHTTPUse() || logsConfig.obsPipelineWorkerEnabled() || haEnabled || (bool(httpConnectivity) && !(logsConfig.isForceTCPUse() || logsConfig.isSocks5ProxySet() || logsConfig.hasAdditionalEndpoints())) {
return BuildHTTPEndpointsWithConfig(coreConfig, logsConfig, endpointPrefix, intakeTrackType, intakeProtocol, intakeOrigin)
}
log.Warnf("You are currently sending Logs to Datadog through TCP (either because %s or %s is set or the HTTP connectivity test has failed) "+
Expand Down Expand Up @@ -275,6 +277,40 @@ func BuildHTTPEndpointsWithConfig(coreConfig pkgconfigmodel.Reader, logsConfig *
}
}

// Add in the HAMR endpoint if HA is enabled.
if coreConfig.GetBool("ha.enabled") {
haURL, err := pkgconfigutils.GetHAEndpoint(coreConfig, endpointPrefix, "ha.dd_url")
if err != nil {
return nil, fmt.Errorf("cannot construct HA endpoint: %s", err)
}

haHost, haPort, haUseSSL, err := parseAddressWithScheme(haURL, defaultNoSSL, parseAddressAsHost)
if err != nil {
return nil, fmt.Errorf("could not parse %s: %v", haURL, err)
}

// HA endpoint is always reliable
additionals = append(additionals, Endpoint{
IsHA: true,
IsReliable: pointer.Ptr(true),
APIKey: coreConfig.GetString("ha.api_key"),
Host: haHost,
Port: haPort,
UseSSL: pointer.Ptr(haUseSSL),
UseCompression: main.UseCompression,
CompressionLevel: main.CompressionLevel,
BackoffBase: main.BackoffBase,
BackoffMax: main.BackoffMax,
BackoffFactor: main.BackoffFactor,
RecoveryInterval: main.RecoveryInterval,
RecoveryReset: main.RecoveryReset,
Version: main.Version,
TrackType: intakeTrackType,
Protocol: intakeProtocol,
Origin: intakeOrigin,
tobz marked this conversation as resolved.
Show resolved Hide resolved
})
}

batchWait := logsConfig.batchWait()
batchMaxConcurrentSend := logsConfig.batchMaxConcurrentSend()
batchMaxSize := logsConfig.batchMaxSize()
Expand Down
1 change: 1 addition & 0 deletions comp/logs/agent/config/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Endpoint struct {
CompressionLevel int `mapstructure:"compression_level" json:"compression_level"`
ProxyAddress string
IsReliable *bool `mapstructure:"is_reliable" json:"is_reliable"`
IsHA bool `mapstructure:"-" json:"-"`
ConnectionResetInterval time.Duration

BackoffFactor float64
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package rcservicehaimpl
import (
"context"
"fmt"

"github.com/DataDog/datadog-agent/comp/core/log"

cfgcomp "github.com/DataDog/datadog-agent/comp/core/config"
Expand Down Expand Up @@ -62,7 +63,10 @@ func newHaRemoteConfigServiceOptional(deps dependencies) optional.Option[rcservi
// newHaRemoteConfigServiceOptional creates and configures a new service that receives remote config updates from the configured DD failover DC
func newHaRemoteConfigService(deps dependencies) (rcserviceha.Component, error) {
apiKey := configUtils.SanitizeAPIKey(deps.Cfg.GetString("ha.api_key"))
baseRawURL := configUtils.GetHAEndpoint(deps.Cfg, "https://config.", "ha.rc_dd_url")
baseRawURL, err := configUtils.GetHAEndpoint(deps.Cfg, "https://config.", "ha.rc_dd_url")
if err != nil {
return nil, fmt.Errorf("unable to get HA remote config endpoint: %s", err)
}
traceAgentEnv := configUtils.GetTraceAgentDefaultEnv(deps.Cfg)
configuredTags := configUtils.GetConfiguredTags(deps.Cfg, false)
options := []remoteconfig.Option{
Expand Down
21 changes: 13 additions & 8 deletions pkg/config/utils/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func GetMultipleEndpoints(c pkgconfigmodel.Reader) (map[string][]string, error)

// populate with HA endpoints too
if c.GetBool("ha.enabled") {
site := c.GetString("ha.site")
if site != "" {
url := BuildURLWithPrefix(InfraURLPrefix, site)
additionalEndpoints[url] = []string{c.GetString("ha.api_key")}
haURL, err := GetHAInfraEndpoint(c)
if err != nil {
return nil, fmt.Errorf("could not parse HA endpoint: %s", err)
}
additionalEndpoints[haURL] = []string{c.GetString("ha.api_key")}
}
return mergeAdditionalEndpoints(keysPerDomain, additionalEndpoints)
}
Expand All @@ -127,14 +127,14 @@ func GetMainEndpoint(c pkgconfigmodel.Reader, prefix string, ddURLKey string) st
}

// GetHAEndpoint returns the HA DD URL defined in the config, based on `ha.site` and the prefix, or ddHaURLKey
func GetHAEndpoint(c pkgconfigmodel.Reader, prefix, ddHaURLKey string) string {
func GetHAEndpoint(c pkgconfigmodel.Reader, prefix, ddHaURLKey string) (string, error) {
// value under ddURLKey takes precedence over 'ha.site'
if c.IsSet(ddHaURLKey) && c.GetString(ddHaURLKey) != "" {
return getResolvedHaDdURL(c, ddHaURLKey)
return getResolvedHaDdURL(c, ddHaURLKey), nil
} else if c.GetString("ha.site") != "" {
return prefix + strings.TrimSpace(c.GetString("ha.site"))
return BuildURLWithPrefix(prefix, c.GetString("ha.site")), nil
}
return prefix + pkgconfigsetup.DefaultSite
return "", fmt.Errorf("`ha.site` or `ha.dd_url` must be set when High Availability is enabled")
}

func getResolvedHaDdURL(c pkgconfigmodel.Reader, haURLKey string) string {
Expand All @@ -150,6 +150,11 @@ func GetInfraEndpoint(c pkgconfigmodel.Reader) string {
return GetMainEndpoint(c, InfraURLPrefix, "dd_url")
}

// GetHAInfraEndpoint returns the HA DD Infra URL defined in config, based on the value of `ha.site` and `ha.dd_url`
func GetHAInfraEndpoint(c pkgconfigmodel.Reader) (string, error) {
return GetHAEndpoint(c, InfraURLPrefix, "ha.dd_url")
}

// ddURLRegexp determines if an URL belongs to Datadog or not. If the URL belongs to Datadog it's prefixed with the Agent
// version (see AddAgentVersionToDomain).
var ddURLRegexp = regexp.MustCompile(`^app(\.[a-z]{2}\d)?\.(datad(oghq|0g)\.(com|eu)|ddog-gov\.com)$`)
Expand Down
5 changes: 5 additions & 0 deletions pkg/logs/client/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import "github.com/DataDog/datadog-agent/pkg/logs/message"

// Destination sends a payload to a specific endpoint over a given network protocol.
type Destination interface {
// Whether or not destination is used for High Availability mode
IsHA() bool

// Destination target (e.g. https://agent-intake.logs.datadoghq.com)
Target() string

// Start starts the destination send loop. close the intput to stop listening for payloads. stopChan is
// signaled when the destination has fully shutdown and all buffered payloads have been flushed. isRetrying is
Expand Down
12 changes: 12 additions & 0 deletions pkg/logs/client/http/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type Destination struct {
destinationsContext *client.DestinationsContext
protocol config.IntakeProtocol
origin config.IntakeOrigin
isHA bool

// Concurrency
climit chan struct{} // semaphore for limiting concurrent background sends
Expand Down Expand Up @@ -150,6 +151,7 @@ func newDestination(endpoint config.Endpoint,
shouldRetry: shouldRetry,
expVars: expVars,
telemetryName: telemetryName,
isHA: endpoint.IsHA,
}
}

Expand All @@ -163,6 +165,16 @@ func errorToTag(err error) string {
}
}

// IsHA indicates that this destination is a High Availability destination.
func (d *Destination) IsHA() bool {
return d.isHA
}

// Target is the address of the destination.
func (d *Destination) Target() string {
return d.url
}

// Start starts reading the input channel
func (d *Destination) Start(input chan *message.Payload, output chan *message.Payload, isRetrying chan bool) (stopChan <-chan struct{}) {
stop := make(chan struct{})
Expand Down
17 changes: 16 additions & 1 deletion pkg/logs/client/http/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/pointer"
)

func getNewConfig() pkgconfigmodel.Reader {
func getNewConfig() pkgconfigmodel.ReaderWriter {
return pkgconfigmodel.NewConfig("test", "DD", strings.NewReplacer(".", "_"))
}

Expand Down Expand Up @@ -362,3 +362,18 @@ func TestBackoffDelayDisabled(t *testing.T) {
assert.Equal(t, 0, server.Destination.nbErrors)
server.Stop()
}

func TestDestinationHA(t *testing.T) {
variants := []bool{true, false}
for _, variant := range variants {
endpoint := config.Endpoint{
IsHA: variant,
}
isEndpointHA := endpoint.IsHA

dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), 1, false, "test", getNewConfig())
isDestHA := dest.IsHA()

assert.Equal(t, isEndpointHA, isDestHA)
}
}
12 changes: 12 additions & 0 deletions pkg/logs/client/tcp/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Destination struct {
shouldRetry bool
retryLock sync.Mutex
lastRetryError error
isHA bool
}

// NewDestination returns a new destination.
Expand All @@ -44,9 +45,20 @@ func NewDestination(endpoint config.Endpoint, useProto bool, destinationsContext
retryLock: sync.Mutex{},
shouldRetry: shouldRetry,
lastRetryError: nil,
isHA: endpoint.IsHA,
}
}

// IsHA indicates that this destination is a High Availability destination.
func (d *Destination) IsHA() bool {
return d.isHA
}

// Target is the address of the destination.
func (d *Destination) Target() string {
return d.connManager.address()
}

// Start reads from the input, transforms a message into a frame and sends it to a remote server,
func (d *Destination) Start(input chan *message.Payload, output chan *message.Payload, isRetrying chan bool) (stopChan <-chan struct{}) {
stop := make(chan struct{})
Expand Down
32 changes: 32 additions & 0 deletions pkg/logs/client/tcp/destination_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package tcp

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
)

func TestDestinationHA(t *testing.T) {
variants := []bool{true, false}
for _, variant := range variants {
endpoint := config.Endpoint{
IsHA: variant,
}
isEndpointHA := endpoint.IsHA

dest := NewDestination(endpoint, false, client.NewDestinationsContext(), false, statusinterface.NewStatusProviderMock())
isDestHA := dest.IsHA()

assert.Equal(t, isEndpointHA, isDestHA)
}
}
2 changes: 1 addition & 1 deletion pkg/logs/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewPipeline(outputChan chan *message.Payload,
}

strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, pipelineID)
logsSender = sender.NewSender(senderInput, outputChan, mainDestinations, config.DestinationPayloadChanSize)
logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, config.DestinationPayloadChanSize)

inputChan := make(chan *message.Message, config.ChanSize)
processor := processor.New(inputChan, strategyInput, processingRules, encoder, diagnosticMessageReceiver, hostname)
Expand Down
Loading
Loading