Skip to content

Commit

Permalink
[ha-agent] Run agent integration only on leader agent
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandreYang committed Nov 26, 2024
1 parent f0b4956 commit 5f3f483
Show file tree
Hide file tree
Showing 15 changed files with 212 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl"
"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/config"
haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock"
"github.com/DataDog/datadog-agent/pkg/collector/externalhost"
"github.com/DataDog/datadog-agent/pkg/serializer"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
Expand All @@ -36,6 +37,7 @@ func TestExternalHostTags(t *testing.T) {
c := newCollector(fxutil.Test[dependencies](t,
core.MockBundle(),
demultiplexerimpl.MockModule(),
haagentmock.Module(),
fx.Provide(func() optional.Option[serializer.MetricSerializer] {
return optional.NewNoneOption[serializer.MetricSerializer]()
}),
Expand Down
16 changes: 10 additions & 6 deletions comp/collector/collector/collectorimpl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
flaretypes "github.com/DataDog/datadog-agent/comp/core/flare/types"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/comp/core/status"
haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
metadata "github.com/DataDog/datadog-agent/comp/metadata/runner/runnerimpl"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
pkgCollector "github.com/DataDog/datadog-agent/pkg/collector"
Expand All @@ -48,17 +49,19 @@ const (
type dependencies struct {
fx.In

Lc fx.Lifecycle
Config config.Component
Log log.Component
Lc fx.Lifecycle
Config config.Component
Log log.Component
HaAgent haagent.Component

SenderManager sender.SenderManager
MetricSerializer optional.Option[serializer.MetricSerializer]
}

type collectorImpl struct {
log log.Component
config config.Component
log log.Component
config config.Component
haAgent haagent.Component

senderManager sender.SenderManager
metricSerializer optional.Option[serializer.MetricSerializer]
Expand Down Expand Up @@ -119,6 +122,7 @@ func newCollector(deps dependencies) *collectorImpl {
c := &collectorImpl{
log: deps.Log,
config: deps.Config,
haAgent: deps.HaAgent,
senderManager: deps.SenderManager,
metricSerializer: deps.MetricSerializer,
checks: make(map[checkid.ID]*middleware.CheckWrapper),
Expand Down Expand Up @@ -186,7 +190,7 @@ func (c *collectorImpl) start(_ context.Context) error {
c.m.Lock()
defer c.m.Unlock()

run := runner.NewRunner(c.senderManager)
run := runner.NewRunner(c.senderManager, c.haAgent)
sched := scheduler.NewScheduler(run.GetChan())

// let the runner some visibility into the scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
logmock "github.com/DataDog/datadog-agent/comp/core/log/mock"
haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"

"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
Expand Down Expand Up @@ -85,6 +86,7 @@ func (suite *CollectorDemuxTestSuite) SetupTest() {
suite.SenderManagerMock = NewSenderManagerMock(suite.demux)
suite.c = newCollector(fxutil.Test[dependencies](suite.T(),
core.MockBundle(),
haagentmock.Module(),
fx.Provide(func() sender.SenderManager {
return suite.SenderManagerMock
}),
Expand Down
2 changes: 2 additions & 0 deletions comp/collector/collector/collectorimpl/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/DataDog/datadog-agent/comp/collector/collector/collectorimpl/internal/middleware"
"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/config"
haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/collector/check"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
Expand Down Expand Up @@ -97,6 +98,7 @@ func (suite *CollectorTestSuite) SetupTest() {
suite.c = newCollector(fxutil.Test[dependencies](suite.T(),
core.MockBundle(),
demultiplexerimpl.MockModule(),
haagentmock.Module(),
fx.Provide(func() optional.Option[serializer.MetricSerializer] {
return optional.NewNoneOption[serializer.MetricSerializer]()
}),
Expand Down
3 changes: 3 additions & 0 deletions comp/haagent/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ type Component interface {
// SetLeader takes the leader agent hostname as input, if it matches the current agent hostname,
// the isLeader state is set to true, otherwise false.
SetLeader(leaderAgentHostname string)

// IsHaIntegration returns true if the integration type is an HA Integration
IsHaIntegration(checkType string) bool
}
36 changes: 31 additions & 5 deletions comp/haagent/impl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,42 @@ package haagentimpl

import (
"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
)

type integrationConfig struct {
Name string `mapstructure:"name"`
}

type haAgentConfigs struct {
enabled bool
group string
enabled bool
group string
isHaIntegrationMap map[string]bool
}

func newHaAgentConfigs(agentConfig config.Component) *haAgentConfigs {
func newHaAgentConfigs(agentConfig config.Component, logger log.Component) *haAgentConfigs {
var integrationConfigs []integrationConfig
// TODO: DO NOT EXPOSE integrations CONFIG TO USERS
// TODO: DO NOT EXPOSE integrations CONFIG TO USERS
// TODO: DO NOT EXPOSE integrations CONFIG TO USERS
// TODO: DO NOT EXPOSE integrations CONFIG TO USERS

err := agentConfig.UnmarshalKey("ha_agent.integrations", &integrationConfigs)
if err != nil {
logger.Errorf("Error while reading 'ha_agent.integrationConfigs' settings: %v", err)
}
logger.Debugf("integration configs: %v", integrationConfigs)
integrationMap := make(map[string]bool)
for _, intg := range integrationConfigs {
if intg.Name == "" {
logger.Warn("Setging 'ha_agent.integrationConfigs[].name' shouldn't be empty")
continue
}
integrationMap[intg.Name] = true
}
return &haAgentConfigs{
enabled: agentConfig.GetBool("ha_agent.enabled"),
group: agentConfig.GetString("ha_agent.group"),
enabled: agentConfig.GetBool("ha_agent.enabled"),
group: agentConfig.GetString("ha_agent.group"),
isHaIntegrationMap: integrationMap,
}
}
87 changes: 87 additions & 0 deletions comp/haagent/impl/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.

package haagentimpl

import (
"testing"

"github.com/DataDog/datadog-agent/comp/core/config"
logmock "github.com/DataDog/datadog-agent/comp/core/log/mock"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/stretchr/testify/assert"
"go.uber.org/fx"
)

func Test_newHaAgentConfigs(t *testing.T) {
tests := []struct {
name string
agentConfigs map[string]interface{}
expectedConfig *haAgentConfigs
}{
{
name: "ok",
agentConfigs: map[string]interface{}{
"ha_agent.enabled": true,
"ha_agent.group": "mygroup01",
"ha_agent.integrations": []integrationConfig{
{Name: "snmp"},
{Name: "mysql"},
},
},
expectedConfig: &haAgentConfigs{
enabled: true,
group: "mygroup01",
isHaIntegrationMap: map[string]bool{
"snmp": true,
"mysql": true,
},
},
},
{
name: "invalid integrations",
agentConfigs: map[string]interface{}{
"ha_agent.enabled": true,
"ha_agent.group": "mygroup01",
"ha_agent.integrations": "abc",
},
expectedConfig: &haAgentConfigs{
enabled: true,
group: "mygroup01",
isHaIntegrationMap: map[string]bool{},
},
},
{
name: "empty integrationConfig name is skipped",
agentConfigs: map[string]interface{}{
"ha_agent.enabled": true,
"ha_agent.group": "mygroup01",
"ha_agent.integrations": []integrationConfig{
{Name: "snmp"},
{Name: "mysql"},
{Name: ""},
},
},
expectedConfig: &haAgentConfigs{
enabled: true,
group: "mygroup01",
isHaIntegrationMap: map[string]bool{
"snmp": true,
"mysql": true,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logComponent := logmock.New(t)
agentConfigComponent := fxutil.Test[config.Component](t, fx.Options(
config.MockModule(),
fx.Replace(config.MockParams{Overrides: tt.agentConfigs}),
))
assert.Equal(t, tt.expectedConfig, newHaAgentConfigs(agentConfigComponent, logComponent))
})
}
}
4 changes: 4 additions & 0 deletions comp/haagent/impl/haagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (h *haAgentImpl) SetLeader(leaderAgentHostname string) {
h.isLeader.Store(agentHostname == leaderAgentHostname)
}

func (h *haAgentImpl) IsHaIntegration(integrationName string) bool {
return h.haAgentConfigs.isHaIntegrationMap[integrationName]
}

func (h *haAgentImpl) onHaAgentUpdate(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) {
h.log.Debugf("Updates received: count=%d", len(updates))

Expand Down
2 changes: 1 addition & 1 deletion comp/haagent/impl/haagent_comp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Provides struct {

// NewComponent creates a new haagent component
func NewComponent(reqs Requires) (Provides, error) {
haAgentConfigs := newHaAgentConfigs(reqs.AgentConfig)
haAgentConfigs := newHaAgentConfigs(reqs.AgentConfig, reqs.Logger)
haAgent := newHaAgentImpl(reqs.Logger, haAgentConfigs)
var rcListener rctypes.ListenerProvider
if haAgent.Enabled() {
Expand Down
14 changes: 14 additions & 0 deletions comp/haagent/impl/haagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ func Test_IsLeader_SetLeader(t *testing.T) {
assert.True(t, haAgent.IsLeader())
}

func Test_IsHaIntegration(t *testing.T) {
agentConfigs := map[string]interface{}{
"ha_agent.integrations": []integrationConfig{
{Name: "snmp"},
{Name: "mysql"},
},
}
haAgent := newTestHaAgentComponent(t, agentConfigs)

assert.True(t, haAgent.IsHaIntegration("snmp"))
assert.True(t, haAgent.IsHaIntegration("mysql"))
assert.False(t, haAgent.IsHaIntegration("cpu"))
}

func Test_RCListener(t *testing.T) {
tests := []struct {
name string
Expand Down
4 changes: 4 additions & 0 deletions comp/haagent/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (m *mockHaAgent) SetEnabled(enabled bool) {
m.enabled = enabled
}

func (m *mockHaAgent) IsHaIntegration(_ string) bool {
return false
}

// Component is the component type.
type Component interface {
haagent.Component
Expand Down
6 changes: 5 additions & 1 deletion pkg/collector/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"go.uber.org/atomic"

haagent "github.com/DataDog/datadog-agent/comp/haagent/def"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
"github.com/DataDog/datadog-agent/pkg/collector/check"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
Expand All @@ -40,6 +41,7 @@ var (
// Runner is the object in charge of running all the checks
type Runner struct {
senderManager sender.SenderManager
haAgent haagent.Component
isRunning *atomic.Bool
id int // Globally unique identifier for the Runner
workers map[int]*worker.Worker // Workers currrently under this Runner's management
Expand All @@ -52,11 +54,12 @@ type Runner struct {
}

// NewRunner takes the number of desired goroutines processing incoming checks.
func NewRunner(senderManager sender.SenderManager) *Runner {
func NewRunner(senderManager sender.SenderManager, haAgent haagent.Component) *Runner {
numWorkers := pkgconfigsetup.Datadog().GetInt("check_runners")

r := &Runner{
senderManager: senderManager,
haAgent: haAgent,
id: int(runnerIDGenerator.Inc()),
isRunning: atomic.NewBool(true),
workers: make(map[int]*worker.Worker),
Expand Down Expand Up @@ -117,6 +120,7 @@ func (r *Runner) AddWorker() {
func (r *Runner) newWorker() (*worker.Worker, error) {
worker, err := worker.NewWorker(
r.senderManager,
r.haAgent,
r.id,
int(workerIDGenerator.Inc()),
r.pendingChecksChan,
Expand Down
Loading

0 comments on commit 5f3f483

Please sign in to comment.