Skip to content

Commit

Permalink
fix a boatload of PR comments and lints
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Mar 11, 2024
1 parent 3da7625 commit af641f1
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (sender *diagnoseSenderManager) LazyGetSenderManager() (sender.SenderManage
config := sender.deps.Config
forwarder := defaultforwarder.NewDefaultForwarder(config, log, defaultforwarder.NewOptions(config, log, nil))
orchestratorForwarder := optional.NewOptionPtr[defaultforwarder.Forwarder](defaultforwarder.NoopForwarder{})
eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(config, sender.deps.Hostname))
eventPlatformForwarder := optional.NewOptionPtr[eventplatform.Forwarder](eventplatformimpl.NewNoopEventPlatformForwarder(sender.deps.Hostname))
senderManager = aggregator.InitAndStartAgentDemultiplexer(
log,
forwarder,
Expand Down
12 changes: 6 additions & 6 deletions comp/forwarder/eventplatform/eventplatformimpl/epforwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ type passthroughPipelineDesc struct {

// newHTTPPassthroughPipeline creates a new HTTP-only event platform pipeline that sends messages directly to intake
// without any of the processing that exists in regular logs pipelines.
func newHTTPPassthroughPipeline(coreConfig configcomp.Component, eventPlatformReceiver eventplatformreceiver.Component, desc passthroughPipelineDesc, destinationsContext *client.DestinationsContext, pipelineID int) (p *passthroughPipeline, err error) {
func newHTTPPassthroughPipeline(coreConfig pkgconfig.Reader, eventPlatformReceiver eventplatformreceiver.Component, desc passthroughPipelineDesc, destinationsContext *client.DestinationsContext, pipelineID int) (p *passthroughPipeline, err error) {
configKeys := config.NewLogsConfigKeys(desc.endpointsConfigPrefix, pkgconfig.Datadog)
endpoints, err := config.BuildHTTPEndpointsWithConfig(pkgconfig.Datadog, configKeys, desc.hostnameEndpointPrefix, desc.intakeTrackType, config.DefaultIntakeProtocol, config.DefaultIntakeOrigin)
if err != nil {
Expand Down Expand Up @@ -437,12 +437,12 @@ func joinHosts(endpoints []config.Endpoint) string {
return strings.Join(additionalHosts, ",")
}

func newDefaultEventPlatformForwarder(coreConfig configcomp.Component, eventPlatformReceiver eventplatformreceiver.Component) *defaultEventPlatformForwarder {
func newDefaultEventPlatformForwarder(config pkgconfig.Reader, eventPlatformReceiver eventplatformreceiver.Component) *defaultEventPlatformForwarder {
destinationsCtx := client.NewDestinationsContext()
destinationsCtx.Start()
pipelines := make(map[string]*passthroughPipeline)
for i, desc := range passthroughPipelineDescs {
p, err := newHTTPPassthroughPipeline(coreConfig, eventPlatformReceiver, desc, destinationsCtx, i)
p, err := newHTTPPassthroughPipeline(config, eventPlatformReceiver, desc, destinationsCtx, i)
if err != nil {
log.Errorf("Failed to initialize event platform forwarder pipeline. eventType=%s, error=%s", desc.eventType, err.Error())
continue
Expand All @@ -468,7 +468,7 @@ func newEventPlatformForwarder(deps dependencies) eventplatform.Component {
var forwarder eventplatform.Forwarder

if deps.Params.UseNoopEventPlatformForwarder {
forwarder = NewNoopEventPlatformForwarder(deps.Config, deps.Hostname)
forwarder = NewNoopEventPlatformForwarder(deps.Hostname)
} else if deps.Params.UseEventPlatformForwarder {
forwarder = newDefaultEventPlatformForwarder(deps.Config, deps.EventPlatformReceiver)
}
Expand All @@ -480,8 +480,8 @@ func newEventPlatformForwarder(deps dependencies) eventplatform.Component {

// NewNoopEventPlatformForwarder returns the standard event platform forwarder with sending disabled, meaning events
// will build up in each pipeline channel without being forwarded to the intake
func NewNoopEventPlatformForwarder(coreConfig configcomp.Component, hostname hostnameinterface.Component) eventplatform.Forwarder {
f := newDefaultEventPlatformForwarder(coreConfig, eventplatformreceiverimpl.NewReceiver(hostname))
func NewNoopEventPlatformForwarder(hostname hostnameinterface.Component) eventplatform.Forwarder {
f := newDefaultEventPlatformForwarder(pkgconfig.Datadog, eventplatformreceiverimpl.NewReceiver(hostname))
// remove the senders
for _, p := range f.pipelines {
p.strategy = nil
Expand Down
11 changes: 8 additions & 3 deletions comp/logs/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,21 @@ func BuildHTTPEndpointsWithConfig(coreConfig pkgconfigmodel.Reader, logsConfig *
}

// Add in the HAMR endpoint if HA is enabled.
if coreConfig.GetBool("ha.enabled") && (coreConfig.IsSet("ha.site") || coreConfig.IsSet("ha.dd_url")) {
haURL := pkgconfigutils.GetHAEndpoint(coreConfig, endpointPrefix, "ha.dd_url")
if coreConfig.GetBool("ha.enabled") {
haURL, err := pkgconfigutils.GetHAEndpoint(coreConfig, endpointPrefix, "ha.dd_url")
if err != nil {
return nil, fmt.Errorf("cannot construct HA endpoint: %s", err)
}

haHost, haPort, haUseSSL, err := parseAddressWithScheme(haURL, defaultNoSSL, parseAddressAsHost)
if err != nil {
return nil, fmt.Errorf("could not parse %s: %v", haURL, err)
}

// HA endpoint is always reliable
additionals = append(additionals, Endpoint{
IsHA: pointer.Ptr(true),
IsHA: true,
IsReliable: pointer.Ptr(true),
APIKey: coreConfig.GetString("ha.api_key"),
Host: haHost,
Port: haPort,
Expand Down
7 changes: 1 addition & 6 deletions comp/logs/agent/config/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Endpoint struct {
CompressionLevel int `mapstructure:"compression_level" json:"compression_level"`
ProxyAddress string
IsReliable *bool `mapstructure:"is_reliable" json:"is_reliable"`
IsHA *bool
IsHA bool `mapstructure:"-" json:"-"`
ConnectionResetInterval time.Duration

BackoffFactor float64
Expand Down Expand Up @@ -104,11 +104,6 @@ func (e *Endpoint) GetIsReliable() bool {
return e.IsReliable == nil || *e.IsReliable
}

// GetIsReliable returns true if the endpoint is an HA endpoint. Endpoints are not HA by default.
func (e *Endpoint) GetIsHA() bool {
return e.IsHA != nil && *e.IsHA
}

// Endpoints holds the main endpoint and additional ones to dualship logs.
type Endpoints struct {
Main Endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package rcservicehaimpl
import (
"context"
"fmt"

"github.com/DataDog/datadog-agent/comp/core/log"

cfgcomp "github.com/DataDog/datadog-agent/comp/core/config"
Expand Down Expand Up @@ -62,7 +63,10 @@ func newHaRemoteConfigServiceOptional(deps dependencies) optional.Option[rcservi
// newHaRemoteConfigServiceOptional creates and configures a new service that receives remote config updates from the configured DD failover DC
func newHaRemoteConfigService(deps dependencies) (rcserviceha.Component, error) {
apiKey := configUtils.SanitizeAPIKey(deps.Cfg.GetString("ha.api_key"))
baseRawURL := configUtils.GetHAEndpoint(deps.Cfg, "https://config.", "ha.rc_dd_url")
baseRawURL, err := configUtils.GetHAEndpoint(deps.Cfg, "https://config.", "ha.rc_dd_url")
if err != nil {
return nil, fmt.Errorf("unable to get HA remote config endpoint: %s", err)
}
traceAgentEnv := configUtils.GetTraceAgentDefaultEnv(deps.Cfg)
configuredTags := configUtils.GetConfiguredTags(deps.Cfg, false)
options := []remoteconfig.Option{
Expand Down
21 changes: 13 additions & 8 deletions pkg/config/utils/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ func GetMultipleEndpoints(c pkgconfigmodel.Reader) (map[string][]string, error)

// populate with HA endpoints too
if c.GetBool("ha.enabled") {
site := c.GetString("ha.site")
if site != "" {
url := BuildURLWithPrefix(InfraURLPrefix, site)
additionalEndpoints[url] = []string{c.GetString("ha.api_key")}
haURL, err := GetHAInfraEndpoint(c)
if err != nil {
return nil, fmt.Errorf("could not parse HA endpoint: %s", err)
}
additionalEndpoints[haURL] = []string{c.GetString("ha.api_key")}
}
return mergeAdditionalEndpoints(keysPerDomain, additionalEndpoints)
}
Expand All @@ -127,14 +127,14 @@ func GetMainEndpoint(c pkgconfigmodel.Reader, prefix string, ddURLKey string) st
}

// GetHAEndpoint returns the HA DD URL defined in the config, based on `ha.site` and the prefix, or ddHaURLKey
func GetHAEndpoint(c pkgconfigmodel.Reader, prefix, ddHaURLKey string) string {
func GetHAEndpoint(c pkgconfigmodel.Reader, prefix, ddHaURLKey string) (string, error) {
// value under ddURLKey takes precedence over 'ha.site'
if c.IsSet(ddHaURLKey) && c.GetString(ddHaURLKey) != "" {
return getResolvedHaDdURL(c, ddHaURLKey)
return getResolvedHaDdURL(c, ddHaURLKey), nil
} else if c.GetString("ha.site") != "" {
return prefix + strings.TrimSpace(c.GetString("ha.site"))
return BuildURLWithPrefix(prefix, c.GetString("ha.site")), nil
}
return prefix + pkgconfigsetup.DefaultSite
return "", fmt.Errorf("`ha.site` or `ha.dd_url` must be set when High Availability is enabled")
}

func getResolvedHaDdURL(c pkgconfigmodel.Reader, haURLKey string) string {
Expand All @@ -150,6 +150,11 @@ func GetInfraEndpoint(c pkgconfigmodel.Reader) string {
return GetMainEndpoint(c, InfraURLPrefix, "dd_url")
}

// GetHAInfraEndpoint returns the HA DD Infra URL defined in config, based on the value of `ha.site` and `ha.dd_url`
func GetHAInfraEndpoint(c pkgconfigmodel.Reader) (string, error) {
return GetHAEndpoint(c, InfraURLPrefix, "ha.dd_url")
}

// ddURLRegexp determines if an URL belongs to Datadog or not. If the URL belongs to Datadog it's prefixed with the Agent
// version (see AddAgentVersionToDomain).
var ddURLRegexp = regexp.MustCompile(`^app(\.[a-z]{2}\d)?\.(datad(oghq|0g)\.(com|eu)|ddog-gov\.com)$`)
Expand Down
4 changes: 3 additions & 1 deletion pkg/logs/client/http/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func newDestination(endpoint config.Endpoint,
shouldRetry: shouldRetry,
expVars: expVars,
telemetryName: telemetryName,
isHA: endpoint.GetIsHA(),
isHA: endpoint.IsHA,
}
}

Expand All @@ -165,10 +165,12 @@ func errorToTag(err error) string {
}
}

// IsHA indicates that this destination is a High Availability destination.
func (d *Destination) IsHA() bool {
return d.isHA
}

// Target is the address of the destination.
func (d *Destination) Target() string {
return d.url
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/logs/client/http/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,12 @@ func TestBackoffDelayDisabled(t *testing.T) {
}

func TestDestinationHA(t *testing.T) {
variants := []*bool{nil, pointer.Ptr(true), pointer.Ptr(false)}
variants := []bool{true, false}
for _, variant := range variants {
endpoint := config.Endpoint{
IsHA: variant,
}
isEndpointHA := endpoint.GetIsHA()
isEndpointHA := endpoint.IsHA

dest := NewDestination(endpoint, JSONContentType, client.NewDestinationsContext(), 1, false, "test", getNewConfig())
isDestHA := dest.IsHA()
Expand Down
4 changes: 3 additions & 1 deletion pkg/logs/client/tcp/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ func NewDestination(endpoint config.Endpoint, useProto bool, destinationsContext
retryLock: sync.Mutex{},
shouldRetry: shouldRetry,
lastRetryError: nil,
isHA: endpoint.GetIsHA(),
isHA: endpoint.IsHA,
}
}

// IsHA indicates that this destination is a High Availability destination.
func (d *Destination) IsHA() bool {
return d.isHA
}

// Target is the address of the destination.
func (d *Destination) Target() string {
return d.connManager.address()
}
Expand Down
11 changes: 2 additions & 9 deletions pkg/logs/client/tcp/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package tcp

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -15,21 +14,15 @@ import (
"github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/util/pointer"
)

func getNewConfig() pkgconfigmodel.ReaderWriter {
return pkgconfigmodel.NewConfig("test", "DD", strings.NewReplacer(".", "_"))
}

func TestDestinationHA(t *testing.T) {
variants := []*bool{nil, pointer.Ptr(true), pointer.Ptr(false)}
variants := []bool{true, false}
for _, variant := range variants {
endpoint := config.Endpoint{
IsHA: variant,
}
isEndpointHA := endpoint.GetIsHA()
isEndpointHA := endpoint.IsHA

dest := NewDestination(endpoint, false, client.NewDestinationsContext(), false, statusinterface.NewStatusProviderMock())
isDestHA := dest.IsHA()
Expand Down
11 changes: 4 additions & 7 deletions pkg/logs/sender/destination_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ func newDestinationSenderWithConfigAndBufferSize(cfg pkgconfigmodel.Reader, buff
return dest, d
}

//nolint:revive // TODO(AML) Fix revive linter
func TestDestinationSender(t *testing.T) {
func TestDestinationSender(_ *testing.T) {
dest, destSender := newDestinationSenderWithBufferSize(1)

destSender.Send(&message.Payload{})
Expand Down Expand Up @@ -108,19 +107,17 @@ func TestDestinationSenderStopsRetrying(t *testing.T) {
}()

// retry the send until it succeeds
//nolint:revive // TODO(AML) Fix revive linter
for !destSender.Send(&message.Payload{}) {
for !destSender.Send(&message.Payload{}) { //revive:disable-line:empty-block
}

<-gotPayload
}

//nolint:revive // TODO(AML) Fix revive linter
func TestDestinationSenderDeadlock(t *testing.T) {
func TestDestinationSenderDeadlock(_ *testing.T) {
dest, destSender := newDestinationSenderWithBufferSize(100)

go func() {
for range dest.input {
for range dest.input { //revive:disable-line:empty-block
}
}()

Expand Down

0 comments on commit af641f1

Please sign in to comment.