diff --git a/comp/collector/collector/collectorimpl/agent_check_metadata_test.go b/comp/collector/collector/collectorimpl/agent_check_metadata_test.go index 114dd0b8f93df..f997bd0332518 100644 --- a/comp/collector/collector/collectorimpl/agent_check_metadata_test.go +++ b/comp/collector/collector/collectorimpl/agent_check_metadata_test.go @@ -16,6 +16,7 @@ import ( "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl" "github.com/DataDog/datadog-agent/comp/core" "github.com/DataDog/datadog-agent/comp/core/config" + haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" "github.com/DataDog/datadog-agent/pkg/collector/externalhost" "github.com/DataDog/datadog-agent/pkg/serializer" "github.com/DataDog/datadog-agent/pkg/util/fxutil" @@ -36,6 +37,7 @@ func TestExternalHostTags(t *testing.T) { c := newCollector(fxutil.Test[dependencies](t, core.MockBundle(), demultiplexerimpl.MockModule(), + haagentmock.Module(), fx.Provide(func() optional.Option[serializer.MetricSerializer] { return optional.NewNoneOption[serializer.MetricSerializer]() }), diff --git a/comp/collector/collector/collectorimpl/collector.go b/comp/collector/collector/collectorimpl/collector.go index f90b0c61cec88..94b10a962d3ed 100644 --- a/comp/collector/collector/collectorimpl/collector.go +++ b/comp/collector/collector/collectorimpl/collector.go @@ -24,6 +24,7 @@ import ( flaretypes "github.com/DataDog/datadog-agent/comp/core/flare/types" log "github.com/DataDog/datadog-agent/comp/core/log/def" "github.com/DataDog/datadog-agent/comp/core/status" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" metadata "github.com/DataDog/datadog-agent/comp/metadata/runner/runnerimpl" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" pkgCollector "github.com/DataDog/datadog-agent/pkg/collector" @@ -48,17 +49,19 @@ const ( type dependencies struct { fx.In - Lc fx.Lifecycle - Config config.Component - Log log.Component + Lc fx.Lifecycle + Config config.Component + Log log.Component + HaAgent haagent.Component SenderManager sender.SenderManager MetricSerializer optional.Option[serializer.MetricSerializer] } type collectorImpl struct { - log log.Component - config config.Component + log log.Component + config config.Component + haAgent haagent.Component senderManager sender.SenderManager metricSerializer optional.Option[serializer.MetricSerializer] @@ -119,6 +122,7 @@ func newCollector(deps dependencies) *collectorImpl { c := &collectorImpl{ log: deps.Log, config: deps.Config, + haAgent: deps.HaAgent, senderManager: deps.SenderManager, metricSerializer: deps.MetricSerializer, checks: make(map[checkid.ID]*middleware.CheckWrapper), @@ -186,7 +190,7 @@ func (c *collectorImpl) start(_ context.Context) error { c.m.Lock() defer c.m.Unlock() - run := runner.NewRunner(c.senderManager) + run := runner.NewRunner(c.senderManager, c.haAgent) sched := scheduler.NewScheduler(run.GetChan()) // let the runner some visibility into the scheduler diff --git a/comp/collector/collector/collectorimpl/collector_demux_test.go b/comp/collector/collector/collectorimpl/collector_demux_test.go index e142221841f90..b68f7a91e546a 100644 --- a/comp/collector/collector/collectorimpl/collector_demux_test.go +++ b/comp/collector/collector/collectorimpl/collector_demux_test.go @@ -21,6 +21,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/config" log "github.com/DataDog/datadog-agent/comp/core/log/def" logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" + haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" compressionmock "github.com/DataDog/datadog-agent/comp/serializer/compression/fx-mock" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" @@ -84,6 +85,7 @@ func (suite *CollectorDemuxTestSuite) SetupTest() { suite.SenderManagerMock = NewSenderManagerMock(suite.demux) suite.c = newCollector(fxutil.Test[dependencies](suite.T(), core.MockBundle(), + haagentmock.Module(), fx.Provide(func() sender.SenderManager { return suite.SenderManagerMock }), diff --git a/comp/collector/collector/collectorimpl/collector_test.go b/comp/collector/collector/collectorimpl/collector_test.go index 1aea4301bc791..44ddc7e2f357c 100644 --- a/comp/collector/collector/collectorimpl/collector_test.go +++ b/comp/collector/collector/collectorimpl/collector_test.go @@ -22,6 +22,7 @@ import ( "github.com/DataDog/datadog-agent/comp/collector/collector/collectorimpl/internal/middleware" "github.com/DataDog/datadog-agent/comp/core" "github.com/DataDog/datadog-agent/comp/core/config" + haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/collector/check" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" @@ -97,6 +98,7 @@ func (suite *CollectorTestSuite) SetupTest() { suite.c = newCollector(fxutil.Test[dependencies](suite.T(), core.MockBundle(), demultiplexerimpl.MockModule(), + haagentmock.Module(), fx.Provide(func() optional.Option[serializer.MetricSerializer] { return optional.NewNoneOption[serializer.MetricSerializer]() }), diff --git a/comp/haagent/def/component.go b/comp/haagent/def/component.go index 2472322d9a400..f1d3f53ce3fa5 100644 --- a/comp/haagent/def/component.go +++ b/comp/haagent/def/component.go @@ -22,4 +22,7 @@ type Component interface { // SetLeader takes the leader agent hostname as input, if it matches the current agent hostname, // the isLeader state is set to true, otherwise false. SetLeader(leaderAgentHostname string) + + // ShouldRunIntegration returns true if the integration should be run + ShouldRunIntegration(integrationName string) bool } diff --git a/comp/haagent/impl/config.go b/comp/haagent/impl/config.go index 2417106455a7d..4fea0af76d0fd 100644 --- a/comp/haagent/impl/config.go +++ b/comp/haagent/impl/config.go @@ -9,6 +9,15 @@ import ( "github.com/DataDog/datadog-agent/comp/core/config" ) +// validHaIntegrations represent the list of integrations that will be considered as +// an "HA Integration", meaning it will only run on the leader Agent. +// At the moment, the list of HA Integrations is hardcoded here, but we might provide +// more dynamic way to configure which integration should be considered HA Integration. +var validHaIntegrations = map[string]bool{ + "snmp": true, + "network_path": true, +} + type haAgentConfigs struct { enabled bool group string diff --git a/comp/haagent/impl/haagent.go b/comp/haagent/impl/haagent.go index 5328d039da2d3..050596256e16a 100644 --- a/comp/haagent/impl/haagent.go +++ b/comp/haagent/impl/haagent.go @@ -50,6 +50,15 @@ func (h *haAgentImpl) SetLeader(leaderAgentHostname string) { h.isLeader.Store(agentHostname == leaderAgentHostname) } +// ShouldRunIntegration return true if the agent integrations should to run. +// When ha-agent is disabled, the agent behave as standalone agent (non HA) and will always run all integrations. +func (h *haAgentImpl) ShouldRunIntegration(integrationName string) bool { + if h.Enabled() && validHaIntegrations[integrationName] { + return h.isLeader.Load() + } + return true +} + func (h *haAgentImpl) onHaAgentUpdate(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) { h.log.Debugf("Updates received: count=%d", len(updates)) diff --git a/comp/haagent/impl/haagent_comp.go b/comp/haagent/impl/haagent_comp.go index f922325a9a440..19697bba7d464 100644 --- a/comp/haagent/impl/haagent_comp.go +++ b/comp/haagent/impl/haagent_comp.go @@ -28,8 +28,8 @@ type Provides struct { // NewComponent creates a new haagent component func NewComponent(reqs Requires) (Provides, error) { - haAgentConfigs := newHaAgentConfigs(reqs.AgentConfig) - haAgent := newHaAgentImpl(reqs.Logger, haAgentConfigs) + haAgentConf := newHaAgentConfigs(reqs.AgentConfig) + haAgent := newHaAgentImpl(reqs.Logger, haAgentConf) var rcListener rctypes.ListenerProvider if haAgent.Enabled() { reqs.Logger.Debug("Add onHaAgentUpdate RCListener") diff --git a/comp/haagent/impl/haagent_test.go b/comp/haagent/impl/haagent_test.go index b91e33f27c4b6..c427220404696 100644 --- a/comp/haagent/impl/haagent_test.go +++ b/comp/haagent/impl/haagent_test.go @@ -169,3 +169,74 @@ func Test_haAgentImpl_onHaAgentUpdate(t *testing.T) { }) } } + +func Test_haAgentImpl_ShouldRunIntegration(t *testing.T) { + testAgentHostname := "my-agent-hostname" + tests := []struct { + name string + leader string + agentConfigs map[string]interface{} + expectShouldRunIntegration map[string]bool + }{ + { + name: "ha agent enabled and agent is leader", + // should run HA-integrations + // should run "non HA integrations" + agentConfigs: map[string]interface{}{ + "hostname": testAgentHostname, + "ha_agent.enabled": true, + "ha_agent.group": testGroup, + }, + leader: testAgentHostname, + expectShouldRunIntegration: map[string]bool{ + "snmp": true, + "network_path": true, + "unknown_integration": true, + "cpu": true, + }, + }, + { + name: "ha agent enabled and agent is not leader", + // should skip HA-integrations + // should run "non HA integrations" + agentConfigs: map[string]interface{}{ + "hostname": testAgentHostname, + "ha_agent.enabled": true, + "ha_agent.group": testGroup, + }, + leader: "another-agent-is-leader", + expectShouldRunIntegration: map[string]bool{ + "snmp": false, + "network_path": false, + "unknown_integration": true, + "cpu": true, + }, + }, + { + name: "ha agent not enabled", + // should run all integrations + agentConfigs: map[string]interface{}{ + "hostname": testAgentHostname, + "ha_agent.enabled": false, + "ha_agent.group": testGroup, + }, + leader: testAgentHostname, + expectShouldRunIntegration: map[string]bool{ + "snmp": true, + "network_path": true, + "unknown_integration": true, + "cpu": true, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + haAgent := newTestHaAgentComponent(t, tt.agentConfigs) + haAgent.Comp.SetLeader(tt.leader) + + for integrationName, shouldRun := range tt.expectShouldRunIntegration { + assert.Equalf(t, shouldRun, haAgent.Comp.ShouldRunIntegration(integrationName), "fail for integration: "+integrationName) + } + }) + } +} diff --git a/comp/haagent/mock/mock.go b/comp/haagent/mock/mock.go index 52142737704c9..37c5cf4aa9916 100644 --- a/comp/haagent/mock/mock.go +++ b/comp/haagent/mock/mock.go @@ -44,6 +44,10 @@ func (m *mockHaAgent) SetEnabled(enabled bool) { m.enabled = enabled } +func (m *mockHaAgent) ShouldRunIntegration(_ string) bool { + return true +} + // Component is the component type. type Component interface { haagent.Component diff --git a/pkg/collector/runner/runner.go b/pkg/collector/runner/runner.go index e6ba495f02f2b..a388e73dea776 100644 --- a/pkg/collector/runner/runner.go +++ b/pkg/collector/runner/runner.go @@ -14,6 +14,7 @@ import ( "go.uber.org/atomic" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" "github.com/DataDog/datadog-agent/pkg/collector/check" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" @@ -40,6 +41,7 @@ var ( // Runner is the object in charge of running all the checks type Runner struct { senderManager sender.SenderManager + haAgent haagent.Component isRunning *atomic.Bool id int // Globally unique identifier for the Runner workers map[int]*worker.Worker // Workers currrently under this Runner's management @@ -52,11 +54,12 @@ type Runner struct { } // NewRunner takes the number of desired goroutines processing incoming checks. -func NewRunner(senderManager sender.SenderManager) *Runner { +func NewRunner(senderManager sender.SenderManager, haAgent haagent.Component) *Runner { numWorkers := pkgconfigsetup.Datadog().GetInt("check_runners") r := &Runner{ senderManager: senderManager, + haAgent: haAgent, id: int(runnerIDGenerator.Inc()), isRunning: atomic.NewBool(true), workers: make(map[int]*worker.Worker), @@ -117,6 +120,7 @@ func (r *Runner) AddWorker() { func (r *Runner) newWorker() (*worker.Worker, error) { worker, err := worker.NewWorker( r.senderManager, + r.haAgent, r.id, int(workerIDGenerator.Inc()), r.pendingChecksChan, diff --git a/pkg/collector/runner/runner_test.go b/pkg/collector/runner/runner_test.go index 6ae3cd335c81b..b6a02aab0739f 100644 --- a/pkg/collector/runner/runner_test.go +++ b/pkg/collector/runner/runner_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" "github.com/DataDog/datadog-agent/pkg/aggregator" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" "github.com/DataDog/datadog-agent/pkg/collector/check/stub" @@ -152,7 +153,7 @@ func TestNewRunner(t *testing.T) { testSetUp(t) pkgconfigsetup.Datadog().SetWithoutSource("check_runners", "3") - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -166,7 +167,7 @@ func TestRunnerAddWorker(t *testing.T) { testSetUp(t) pkgconfigsetup.Datadog().SetWithoutSource("check_runners", "1") - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -181,7 +182,7 @@ func TestRunnerStaticUpdateNumWorkers(t *testing.T) { testSetUp(t) pkgconfigsetup.Datadog().SetWithoutSource("check_runners", "2") - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer func() { r.Stop() @@ -212,7 +213,7 @@ func TestRunnerDynamicUpdateNumWorkers(t *testing.T) { assertAsyncWorkerCount(t, 0) min, max, expectedWorkers := testCase[0], testCase[1], testCase[2] - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) for checks := min; checks <= max; checks++ { @@ -234,7 +235,7 @@ func TestRunner(t *testing.T) { checks[idx] = newCheck(t, fmt.Sprintf("mycheck_%d:123", idx), false, nil) } - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -262,7 +263,7 @@ func TestRunnerStop(t *testing.T) { checks[idx].RunLock.Lock() } - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -320,7 +321,7 @@ func TestRunnerStopWithStuckCheck(t *testing.T) { blockedCheck.RunLock.Lock() blockedCheck.StopLock.Lock() - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -369,7 +370,7 @@ func TestRunnerStopCheck(t *testing.T) { blockedCheck.RunLock.Lock() blockedCheck.StopLock.Lock() - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer func() { r.Stop() @@ -413,7 +414,7 @@ func TestRunnerScheduler(t *testing.T) { sched1 := newScheduler() sched2 := newScheduler() - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -433,7 +434,7 @@ func TestRunnerShouldAddCheckStats(t *testing.T) { testCheck := newCheck(t, "test", false, nil) sched := newScheduler() - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() diff --git a/pkg/collector/worker/worker.go b/pkg/collector/worker/worker.go index 00a0b40668135..144ecf58240f2 100644 --- a/pkg/collector/worker/worker.go +++ b/pkg/collector/worker/worker.go @@ -10,6 +10,7 @@ import ( "fmt" "time" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" "github.com/DataDog/datadog-agent/pkg/collector/check" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" @@ -53,11 +54,13 @@ type Worker struct { runnerID int shouldAddCheckStatsFunc func(id checkid.ID) bool utilizationTickInterval time.Duration + haAgent haagent.Component } // NewWorker returns an instance of a `Worker` after parameter sanity checks are passed func NewWorker( senderManager sender.SenderManager, + haAgent haagent.Component, runnerID int, ID int, pendingChecksChan chan check.Check, @@ -84,6 +87,7 @@ func NewWorker( checksTracker, shouldAddCheckStatsFunc, senderManager.GetDefaultSender, + haAgent, pollingInterval, ) } @@ -98,6 +102,7 @@ func newWorkerWithOptions( checksTracker *tracker.RunningChecksTracker, shouldAddCheckStatsFunc func(id checkid.ID) bool, getDefaultSenderFunc func() (sender.Sender, error), + haAgent haagent.Component, utilizationTickInterval time.Duration, ) (*Worker, error) { @@ -115,6 +120,7 @@ func newWorkerWithOptions( runnerID: runnerID, shouldAddCheckStatsFunc: shouldAddCheckStatsFunc, getDefaultSenderFunc: getDefaultSenderFunc, + haAgent: haAgent, utilizationTickInterval: utilizationTickInterval, }, nil } @@ -135,6 +141,11 @@ func (w *Worker) Run() { checkLogger := CheckLogger{Check: check} longRunning := check.Interval() == 0 + if !w.haAgent.ShouldRunIntegration(check.String()) { + checkLogger.Debug("Check is an HA integration and current agent is not leader, skipping execution...") + continue + } + // Add check to tracker if it's not already running if !w.checksTracker.AddCheck(check) { checkLogger.Debug("Check is already running, skipping execution...") diff --git a/pkg/collector/worker/worker_test.go b/pkg/collector/worker/worker_test.go index 129ed30a499e8..3bbd9e659e224 100644 --- a/pkg/collector/worker/worker_test.go +++ b/pkg/collector/worker/worker_test.go @@ -15,7 +15,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "go.uber.org/fx" + "github.com/DataDog/datadog-agent/comp/core/config" + logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" + haagentimpl "github.com/DataDog/datadog-agent/comp/haagent/impl" + haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/aggregator/mocksender" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" @@ -26,6 +31,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/collector/runner/tracker" pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" "github.com/DataDog/datadog-agent/pkg/metrics/servicecheck" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" ) type testCheck struct { @@ -122,16 +128,16 @@ func TestWorkerInit(t *testing.T) { mockShouldAddStatsFunc := func(checkid.ID) bool { return true } senderManager := aggregator.NewNoOpSenderManager() - _, err := NewWorker(senderManager, 1, 2, nil, checksTracker, mockShouldAddStatsFunc) + _, err := NewWorker(senderManager, haagentmock.NewMockHaAgent(), 1, 2, nil, checksTracker, mockShouldAddStatsFunc) require.NotNil(t, err) - _, err = NewWorker(senderManager, 1, 2, pendingChecksChan, nil, mockShouldAddStatsFunc) + _, err = NewWorker(senderManager, haagentmock.NewMockHaAgent(), 1, 2, pendingChecksChan, nil, mockShouldAddStatsFunc) require.NotNil(t, err) - _, err = NewWorker(senderManager, 1, 2, pendingChecksChan, checksTracker, nil) + _, err = NewWorker(senderManager, haagentmock.NewMockHaAgent(), 1, 2, pendingChecksChan, checksTracker, nil) require.NotNil(t, err) - worker, err := NewWorker(senderManager, 1, 2, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(senderManager, haagentmock.NewMockHaAgent(), 1, 2, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) assert.Nil(t, err) assert.NotNil(t, worker) } @@ -150,7 +156,7 @@ func TestWorkerInitExpvarStats(t *testing.T) { go func(idx int) { defer wg.Done() - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 1, idx, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 1, idx, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) assert.Nil(t, err) worker.Run() @@ -172,7 +178,7 @@ func TestWorkerName(t *testing.T) { for _, id := range []int{1, 100, 500} { expectedName := fmt.Sprintf("worker_%d", id) - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 1, id, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 1, id, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) assert.Nil(t, err) assert.NotNil(t, worker) @@ -224,7 +230,7 @@ func TestWorker(t *testing.T) { pendingChecksChan <- testCheck1 close(pendingChecksChan) - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) require.Nil(t, err) wg.Add(1) @@ -284,6 +290,7 @@ func TestWorkerUtilizationExpvars(t *testing.T) { checksTracker, mockShouldAddStatsFunc, func() (sender.Sender, error) { return nil, nil }, + haagentmock.NewMockHaAgent(), 100*time.Millisecond, ) require.Nil(t, err) @@ -354,7 +361,7 @@ func TestWorkerErrorAndWarningHandling(t *testing.T) { } close(pendingChecksChan) - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) require.Nil(t, err) AssertAsyncWorkerCount(t, 0) @@ -399,7 +406,7 @@ func TestWorkerConcurrentCheckScheduling(t *testing.T) { pendingChecksChan <- testCheck close(pendingChecksChan) - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) require.Nil(t, err) worker.Run() @@ -453,7 +460,7 @@ func TestWorkerStatsAddition(t *testing.T) { pendingChecksChan <- squelchedStatsCheck close(pendingChecksChan) - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 100, 200, pendingChecksChan, checksTracker, shouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 100, 200, pendingChecksChan, checksTracker, shouldAddStatsFunc) require.Nil(t, err) worker.Run() @@ -505,6 +512,7 @@ func TestWorkerServiceCheckSending(t *testing.T) { func() (sender.Sender, error) { return mockSender, nil }, + haagentmock.NewMockHaAgent(), pollingInterval, ) require.Nil(t, err) @@ -575,6 +583,7 @@ func TestWorkerSenderNil(t *testing.T) { func() (sender.Sender, error) { return nil, fmt.Errorf("testerr") }, + haagentmock.NewMockHaAgent(), pollingInterval, ) require.Nil(t, err) @@ -615,6 +624,7 @@ func TestWorkerServiceCheckSendingLongRunningTasks(t *testing.T) { func() (sender.Sender, error) { return mockSender, nil }, + haagentmock.NewMockHaAgent(), pollingInterval, ) require.Nil(t, err) @@ -628,6 +638,98 @@ func TestWorkerServiceCheckSendingLongRunningTasks(t *testing.T) { mockSender.AssertNumberOfCalls(t, "ServiceCheck", 0) } +func TestWorker_HaIntegration(t *testing.T) { + testHostname := "myhost" + + tests := []struct { + name string + haAgentEnabled bool + setLeaderValue string + expectedSnmpCheckRunCount int + expectedUnknownCheckRunCount int + }{ + { + name: "ha-agent enabled and is leader", + // should run HA-integrations + // should run "non HA integrations" + haAgentEnabled: true, + setLeaderValue: testHostname, + expectedSnmpCheckRunCount: 1, + expectedUnknownCheckRunCount: 1, + }, + { + name: "ha-agent enabled and not leader", + // should skip HA-integrations + // should run "non HA integrations" + haAgentEnabled: true, + setLeaderValue: "leader-is-another-agent", + expectedSnmpCheckRunCount: 0, + expectedUnknownCheckRunCount: 1, + }, + { + name: "ha-agent disabled", + // When ha-agent is disabled, the agent behave as standalone agent (non HA) and will always run all integrations. + // should run all integrations + haAgentEnabled: false, + setLeaderValue: "", + expectedSnmpCheckRunCount: 1, + expectedUnknownCheckRunCount: 1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + expvars.Reset() + + var wg sync.WaitGroup + + checksTracker := tracker.NewRunningChecksTracker() + pendingChecksChan := make(chan check.Check, 10) + mockShouldAddStatsFunc := func(checkid.ID) bool { return true } + + snmpCheck := newCheck(t, "snmp:123", false, nil) + unknownCheck := newCheck(t, "unknown-check:123", false, nil) + + pendingChecksChan <- snmpCheck + pendingChecksChan <- unknownCheck + close(pendingChecksChan) + + agentConfigs := map[string]interface{}{ + "hostname": testHostname, + "ha_agent.enabled": tt.haAgentEnabled, + "ha_agent.group": "my-group-01", + } + logComponent := logmock.New(t) + agentConfigComponent := fxutil.Test[config.Component](t, fx.Options( + config.MockModule(), + fx.Replace(config.MockParams{Overrides: agentConfigs}), + )) + requires := haagentimpl.Requires{ + Logger: logComponent, + AgentConfig: agentConfigComponent, + } + haagentcomp, _ := haagentimpl.NewComponent(requires) + haagentcomp.Comp.SetLeader(tt.setLeaderValue) + + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentcomp.Comp, 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + require.Nil(t, err) + + wg.Add(1) + go func() { + defer wg.Done() + worker.Run() + }() + + wg.Wait() + + assert.Equal(t, tt.expectedSnmpCheckRunCount, snmpCheck.RunCount()) + assert.Equal(t, tt.expectedUnknownCheckRunCount, unknownCheck.RunCount()) + + // make sure the check is deleted from checksTracker + assert.Equal(t, 0, len(checksTracker.RunningChecks())) + }) + } +} + // getWorkerUtilizationExpvar returns the utilization as presented by expvars // for a named worker. func getWorkerUtilizationExpvar(t *testing.T, name string) float64 { diff --git a/releasenotes/notes/NDMII-3154-ha-agent-collector-worker-22f3972469c669c3.yaml b/releasenotes/notes/NDMII-3154-ha-agent-collector-worker-22f3972469c669c3.yaml new file mode 100644 index 0000000000000..fa04e9cd53682 --- /dev/null +++ b/releasenotes/notes/NDMII-3154-ha-agent-collector-worker-22f3972469c669c3.yaml @@ -0,0 +1,11 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +enhancements: + - | + [ha-agent] Run HA enabled integrations only on leader Agent