diff --git a/comp/collector/collector/collectorimpl/agent_check_metadata_test.go b/comp/collector/collector/collectorimpl/agent_check_metadata_test.go index 114dd0b8f93dfa..f997bd03325183 100644 --- a/comp/collector/collector/collectorimpl/agent_check_metadata_test.go +++ b/comp/collector/collector/collectorimpl/agent_check_metadata_test.go @@ -16,6 +16,7 @@ import ( "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl" "github.com/DataDog/datadog-agent/comp/core" "github.com/DataDog/datadog-agent/comp/core/config" + haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" "github.com/DataDog/datadog-agent/pkg/collector/externalhost" "github.com/DataDog/datadog-agent/pkg/serializer" "github.com/DataDog/datadog-agent/pkg/util/fxutil" @@ -36,6 +37,7 @@ func TestExternalHostTags(t *testing.T) { c := newCollector(fxutil.Test[dependencies](t, core.MockBundle(), demultiplexerimpl.MockModule(), + haagentmock.Module(), fx.Provide(func() optional.Option[serializer.MetricSerializer] { return optional.NewNoneOption[serializer.MetricSerializer]() }), diff --git a/comp/collector/collector/collectorimpl/collector.go b/comp/collector/collector/collectorimpl/collector.go index f90b0c61cec885..94b10a962d3ede 100644 --- a/comp/collector/collector/collectorimpl/collector.go +++ b/comp/collector/collector/collectorimpl/collector.go @@ -24,6 +24,7 @@ import ( flaretypes "github.com/DataDog/datadog-agent/comp/core/flare/types" log "github.com/DataDog/datadog-agent/comp/core/log/def" "github.com/DataDog/datadog-agent/comp/core/status" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" metadata "github.com/DataDog/datadog-agent/comp/metadata/runner/runnerimpl" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" pkgCollector "github.com/DataDog/datadog-agent/pkg/collector" @@ -48,17 +49,19 @@ const ( type dependencies struct { fx.In - Lc fx.Lifecycle - Config config.Component - Log log.Component + Lc fx.Lifecycle + Config config.Component + Log log.Component + HaAgent haagent.Component SenderManager sender.SenderManager MetricSerializer optional.Option[serializer.MetricSerializer] } type collectorImpl struct { - log log.Component - config config.Component + log log.Component + config config.Component + haAgent haagent.Component senderManager sender.SenderManager metricSerializer optional.Option[serializer.MetricSerializer] @@ -119,6 +122,7 @@ func newCollector(deps dependencies) *collectorImpl { c := &collectorImpl{ log: deps.Log, config: deps.Config, + haAgent: deps.HaAgent, senderManager: deps.SenderManager, metricSerializer: deps.MetricSerializer, checks: make(map[checkid.ID]*middleware.CheckWrapper), @@ -186,7 +190,7 @@ func (c *collectorImpl) start(_ context.Context) error { c.m.Lock() defer c.m.Unlock() - run := runner.NewRunner(c.senderManager) + run := runner.NewRunner(c.senderManager, c.haAgent) sched := scheduler.NewScheduler(run.GetChan()) // let the runner some visibility into the scheduler diff --git a/comp/collector/collector/collectorimpl/collector_demux_test.go b/comp/collector/collector/collectorimpl/collector_demux_test.go index d7e3037cb1c755..469bede72bd109 100644 --- a/comp/collector/collector/collectorimpl/collector_demux_test.go +++ b/comp/collector/collector/collectorimpl/collector_demux_test.go @@ -21,6 +21,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/config" log "github.com/DataDog/datadog-agent/comp/core/log/def" logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" + haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" "github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer" @@ -85,6 +86,7 @@ func (suite *CollectorDemuxTestSuite) SetupTest() { suite.SenderManagerMock = NewSenderManagerMock(suite.demux) suite.c = newCollector(fxutil.Test[dependencies](suite.T(), core.MockBundle(), + haagentmock.Module(), fx.Provide(func() sender.SenderManager { return suite.SenderManagerMock }), diff --git a/comp/collector/collector/collectorimpl/collector_test.go b/comp/collector/collector/collectorimpl/collector_test.go index 1aea4301bc7912..44ddc7e2f357c5 100644 --- a/comp/collector/collector/collectorimpl/collector_test.go +++ b/comp/collector/collector/collectorimpl/collector_test.go @@ -22,6 +22,7 @@ import ( "github.com/DataDog/datadog-agent/comp/collector/collector/collectorimpl/internal/middleware" "github.com/DataDog/datadog-agent/comp/core" "github.com/DataDog/datadog-agent/comp/core/config" + haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/collector/check" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" @@ -97,6 +98,7 @@ func (suite *CollectorTestSuite) SetupTest() { suite.c = newCollector(fxutil.Test[dependencies](suite.T(), core.MockBundle(), demultiplexerimpl.MockModule(), + haagentmock.Module(), fx.Provide(func() optional.Option[serializer.MetricSerializer] { return optional.NewNoneOption[serializer.MetricSerializer]() }), diff --git a/comp/haagent/def/component.go b/comp/haagent/def/component.go index 2472322d9a4005..4d10fb4e160588 100644 --- a/comp/haagent/def/component.go +++ b/comp/haagent/def/component.go @@ -22,4 +22,7 @@ type Component interface { // SetLeader takes the leader agent hostname as input, if it matches the current agent hostname, // the isLeader state is set to true, otherwise false. SetLeader(leaderAgentHostname string) + + // IsHaIntegration returns true if the integration type is an HA Integration + IsHaIntegration(checkType string) bool } diff --git a/comp/haagent/impl/config.go b/comp/haagent/impl/config.go index 2417106455a7da..0420093c5f588d 100644 --- a/comp/haagent/impl/config.go +++ b/comp/haagent/impl/config.go @@ -7,16 +7,42 @@ package haagentimpl import ( "github.com/DataDog/datadog-agent/comp/core/config" + log "github.com/DataDog/datadog-agent/comp/core/log/def" ) +type integrationConfig struct { + Name string `mapstructure:"name"` +} + type haAgentConfigs struct { - enabled bool - group string + enabled bool + group string + isHaIntegrationMap map[string]bool } -func newHaAgentConfigs(agentConfig config.Component) *haAgentConfigs { +func newHaAgentConfigs(agentConfig config.Component, logger log.Component) *haAgentConfigs { + var integrationConfigs []integrationConfig + // TODO: DO NOT EXPOSE integrations CONFIG TO USERS + // TODO: DO NOT EXPOSE integrations CONFIG TO USERS + // TODO: DO NOT EXPOSE integrations CONFIG TO USERS + // TODO: DO NOT EXPOSE integrations CONFIG TO USERS + + err := agentConfig.UnmarshalKey("ha_agent.integrations", &integrationConfigs) + if err != nil { + logger.Errorf("Error while reading 'ha_agent.integrationConfigs' settings: %v", err) + } + logger.Debugf("integration configs: %v", integrationConfigs) + integrationMap := make(map[string]bool) + for _, intg := range integrationConfigs { + if intg.Name == "" { + logger.Warn("Setging 'ha_agent.integrationConfigs[].name' shouldn't be empty") + continue + } + integrationMap[intg.Name] = true + } return &haAgentConfigs{ - enabled: agentConfig.GetBool("ha_agent.enabled"), - group: agentConfig.GetString("ha_agent.group"), + enabled: agentConfig.GetBool("ha_agent.enabled"), + group: agentConfig.GetString("ha_agent.group"), + isHaIntegrationMap: integrationMap, } } diff --git a/comp/haagent/impl/config_test.go b/comp/haagent/impl/config_test.go new file mode 100644 index 00000000000000..0896fc4c6bdc6f --- /dev/null +++ b/comp/haagent/impl/config_test.go @@ -0,0 +1,87 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package haagentimpl + +import ( + "testing" + + "github.com/DataDog/datadog-agent/comp/core/config" + logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" + "github.com/stretchr/testify/assert" + "go.uber.org/fx" +) + +func Test_newHaAgentConfigs(t *testing.T) { + tests := []struct { + name string + agentConfigs map[string]interface{} + expectedConfig *haAgentConfigs + }{ + { + name: "ok", + agentConfigs: map[string]interface{}{ + "ha_agent.enabled": true, + "ha_agent.group": "mygroup01", + "ha_agent.integrations": []integrationConfig{ + {Name: "snmp"}, + {Name: "mysql"}, + }, + }, + expectedConfig: &haAgentConfigs{ + enabled: true, + group: "mygroup01", + isHaIntegrationMap: map[string]bool{ + "snmp": true, + "mysql": true, + }, + }, + }, + { + name: "invalid integrations", + agentConfigs: map[string]interface{}{ + "ha_agent.enabled": true, + "ha_agent.group": "mygroup01", + "ha_agent.integrations": "abc", + }, + expectedConfig: &haAgentConfigs{ + enabled: true, + group: "mygroup01", + isHaIntegrationMap: map[string]bool{}, + }, + }, + { + name: "empty integrationConfig name is skipped", + agentConfigs: map[string]interface{}{ + "ha_agent.enabled": true, + "ha_agent.group": "mygroup01", + "ha_agent.integrations": []integrationConfig{ + {Name: "snmp"}, + {Name: "mysql"}, + {Name: ""}, + }, + }, + expectedConfig: &haAgentConfigs{ + enabled: true, + group: "mygroup01", + isHaIntegrationMap: map[string]bool{ + "snmp": true, + "mysql": true, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logComponent := logmock.New(t) + agentConfigComponent := fxutil.Test[config.Component](t, fx.Options( + config.MockModule(), + fx.Replace(config.MockParams{Overrides: tt.agentConfigs}), + )) + assert.Equal(t, tt.expectedConfig, newHaAgentConfigs(agentConfigComponent, logComponent)) + }) + } +} diff --git a/comp/haagent/impl/haagent.go b/comp/haagent/impl/haagent.go index c5f8611502a2a4..02f5519085bf19 100644 --- a/comp/haagent/impl/haagent.go +++ b/comp/haagent/impl/haagent.go @@ -50,6 +50,10 @@ func (h *haAgentImpl) SetLeader(leaderAgentHostname string) { h.isLeader.Store(agentHostname == leaderAgentHostname) } +func (h *haAgentImpl) IsHaIntegration(integrationName string) bool { + return h.haAgentConfigs.isHaIntegrationMap[integrationName] +} + func (h *haAgentImpl) onHaAgentUpdate(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) { h.log.Debugf("Updates received: count=%d", len(updates)) diff --git a/comp/haagent/impl/haagent_comp.go b/comp/haagent/impl/haagent_comp.go index f922325a9a440e..6cbd86f4cb5f6d 100644 --- a/comp/haagent/impl/haagent_comp.go +++ b/comp/haagent/impl/haagent_comp.go @@ -28,7 +28,7 @@ type Provides struct { // NewComponent creates a new haagent component func NewComponent(reqs Requires) (Provides, error) { - haAgentConfigs := newHaAgentConfigs(reqs.AgentConfig) + haAgentConfigs := newHaAgentConfigs(reqs.AgentConfig, reqs.Logger) haAgent := newHaAgentImpl(reqs.Logger, haAgentConfigs) var rcListener rctypes.ListenerProvider if haAgent.Enabled() { diff --git a/comp/haagent/impl/haagent_test.go b/comp/haagent/impl/haagent_test.go index b0147e2d5ad2bd..5a70d67c9e200a 100644 --- a/comp/haagent/impl/haagent_test.go +++ b/comp/haagent/impl/haagent_test.go @@ -68,6 +68,20 @@ func Test_IsLeader_SetLeader(t *testing.T) { assert.True(t, haAgent.IsLeader()) } +func Test_IsHaIntegration(t *testing.T) { + agentConfigs := map[string]interface{}{ + "ha_agent.integrations": []integrationConfig{ + {Name: "snmp"}, + {Name: "mysql"}, + }, + } + haAgent := newTestHaAgentComponent(t, agentConfigs) + + assert.True(t, haAgent.IsHaIntegration("snmp")) + assert.True(t, haAgent.IsHaIntegration("mysql")) + assert.False(t, haAgent.IsHaIntegration("cpu")) +} + func Test_RCListener(t *testing.T) { tests := []struct { name string diff --git a/comp/haagent/mock/mock.go b/comp/haagent/mock/mock.go index 52142737704c91..28b3b95f684961 100644 --- a/comp/haagent/mock/mock.go +++ b/comp/haagent/mock/mock.go @@ -44,6 +44,10 @@ func (m *mockHaAgent) SetEnabled(enabled bool) { m.enabled = enabled } +func (m *mockHaAgent) IsHaIntegration(_ string) bool { + return false +} + // Component is the component type. type Component interface { haagent.Component diff --git a/pkg/collector/runner/runner.go b/pkg/collector/runner/runner.go index e6ba495f02f2b5..a388e73dea776d 100644 --- a/pkg/collector/runner/runner.go +++ b/pkg/collector/runner/runner.go @@ -14,6 +14,7 @@ import ( "go.uber.org/atomic" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" "github.com/DataDog/datadog-agent/pkg/collector/check" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" @@ -40,6 +41,7 @@ var ( // Runner is the object in charge of running all the checks type Runner struct { senderManager sender.SenderManager + haAgent haagent.Component isRunning *atomic.Bool id int // Globally unique identifier for the Runner workers map[int]*worker.Worker // Workers currrently under this Runner's management @@ -52,11 +54,12 @@ type Runner struct { } // NewRunner takes the number of desired goroutines processing incoming checks. -func NewRunner(senderManager sender.SenderManager) *Runner { +func NewRunner(senderManager sender.SenderManager, haAgent haagent.Component) *Runner { numWorkers := pkgconfigsetup.Datadog().GetInt("check_runners") r := &Runner{ senderManager: senderManager, + haAgent: haAgent, id: int(runnerIDGenerator.Inc()), isRunning: atomic.NewBool(true), workers: make(map[int]*worker.Worker), @@ -117,6 +120,7 @@ func (r *Runner) AddWorker() { func (r *Runner) newWorker() (*worker.Worker, error) { worker, err := worker.NewWorker( r.senderManager, + r.haAgent, r.id, int(workerIDGenerator.Inc()), r.pendingChecksChan, diff --git a/pkg/collector/runner/runner_test.go b/pkg/collector/runner/runner_test.go index 6ae3cd335c81b7..b6a02aab0739fa 100644 --- a/pkg/collector/runner/runner_test.go +++ b/pkg/collector/runner/runner_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" "github.com/DataDog/datadog-agent/pkg/aggregator" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" "github.com/DataDog/datadog-agent/pkg/collector/check/stub" @@ -152,7 +153,7 @@ func TestNewRunner(t *testing.T) { testSetUp(t) pkgconfigsetup.Datadog().SetWithoutSource("check_runners", "3") - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -166,7 +167,7 @@ func TestRunnerAddWorker(t *testing.T) { testSetUp(t) pkgconfigsetup.Datadog().SetWithoutSource("check_runners", "1") - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -181,7 +182,7 @@ func TestRunnerStaticUpdateNumWorkers(t *testing.T) { testSetUp(t) pkgconfigsetup.Datadog().SetWithoutSource("check_runners", "2") - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer func() { r.Stop() @@ -212,7 +213,7 @@ func TestRunnerDynamicUpdateNumWorkers(t *testing.T) { assertAsyncWorkerCount(t, 0) min, max, expectedWorkers := testCase[0], testCase[1], testCase[2] - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) for checks := min; checks <= max; checks++ { @@ -234,7 +235,7 @@ func TestRunner(t *testing.T) { checks[idx] = newCheck(t, fmt.Sprintf("mycheck_%d:123", idx), false, nil) } - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -262,7 +263,7 @@ func TestRunnerStop(t *testing.T) { checks[idx].RunLock.Lock() } - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -320,7 +321,7 @@ func TestRunnerStopWithStuckCheck(t *testing.T) { blockedCheck.RunLock.Lock() blockedCheck.StopLock.Lock() - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -369,7 +370,7 @@ func TestRunnerStopCheck(t *testing.T) { blockedCheck.RunLock.Lock() blockedCheck.StopLock.Lock() - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer func() { r.Stop() @@ -413,7 +414,7 @@ func TestRunnerScheduler(t *testing.T) { sched1 := newScheduler() sched2 := newScheduler() - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() @@ -433,7 +434,7 @@ func TestRunnerShouldAddCheckStats(t *testing.T) { testCheck := newCheck(t, "test", false, nil) sched := newScheduler() - r := NewRunner(aggregator.NewNoOpSenderManager()) + r := NewRunner(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent()) require.NotNil(t, r) defer r.Stop() diff --git a/pkg/collector/worker/worker.go b/pkg/collector/worker/worker.go index 00a0b406681355..876eab974cabcf 100644 --- a/pkg/collector/worker/worker.go +++ b/pkg/collector/worker/worker.go @@ -10,6 +10,7 @@ import ( "fmt" "time" + haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" "github.com/DataDog/datadog-agent/pkg/collector/check" checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" @@ -53,11 +54,13 @@ type Worker struct { runnerID int shouldAddCheckStatsFunc func(id checkid.ID) bool utilizationTickInterval time.Duration + haAgent haagent.Component } // NewWorker returns an instance of a `Worker` after parameter sanity checks are passed func NewWorker( senderManager sender.SenderManager, + haAgent haagent.Component, runnerID int, ID int, pendingChecksChan chan check.Check, @@ -84,6 +87,7 @@ func NewWorker( checksTracker, shouldAddCheckStatsFunc, senderManager.GetDefaultSender, + haAgent, pollingInterval, ) } @@ -98,6 +102,7 @@ func newWorkerWithOptions( checksTracker *tracker.RunningChecksTracker, shouldAddCheckStatsFunc func(id checkid.ID) bool, getDefaultSenderFunc func() (sender.Sender, error), + haAgent haagent.Component, utilizationTickInterval time.Duration, ) (*Worker, error) { @@ -115,6 +120,7 @@ func newWorkerWithOptions( runnerID: runnerID, shouldAddCheckStatsFunc: shouldAddCheckStatsFunc, getDefaultSenderFunc: getDefaultSenderFunc, + haAgent: haAgent, utilizationTickInterval: utilizationTickInterval, }, nil } @@ -141,6 +147,14 @@ func (w *Worker) Run() { continue } + if !w.shouldRunIntegrationInstance(check) { + // TODO: TEST ME + checkLogger.Debug("HA Integration skipped") + // Remove the check from the running list + w.checksTracker.DeleteCheck(check.ID()) + continue + } + checkStartTime := time.Now() checkLogger.CheckStarted() @@ -212,6 +226,13 @@ func (w *Worker) Run() { log.Debugf("Runner %d, worker %d: Finished processing checks.", w.runnerID, w.ID) } +func (w *Worker) shouldRunIntegrationInstance(check check.Check) bool { + if w.haAgent.Enabled() && w.haAgent.IsHaIntegration(check.String()) { + return w.haAgent.IsLeader() + } + return true +} + func startUtilizationUpdater(name string, ut *utilizationtracker.UtilizationTracker) { expvars.SetWorkerStats(name, &expvars.WorkerStats{ Utilization: 0.0, diff --git a/pkg/collector/worker/worker_test.go b/pkg/collector/worker/worker_test.go index 129ed30a499e8b..89d69fbddb5a15 100644 --- a/pkg/collector/worker/worker_test.go +++ b/pkg/collector/worker/worker_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + haagentmock "github.com/DataDog/datadog-agent/comp/haagent/mock" "github.com/DataDog/datadog-agent/pkg/aggregator" "github.com/DataDog/datadog-agent/pkg/aggregator/mocksender" "github.com/DataDog/datadog-agent/pkg/aggregator/sender" @@ -122,16 +123,16 @@ func TestWorkerInit(t *testing.T) { mockShouldAddStatsFunc := func(checkid.ID) bool { return true } senderManager := aggregator.NewNoOpSenderManager() - _, err := NewWorker(senderManager, 1, 2, nil, checksTracker, mockShouldAddStatsFunc) + _, err := NewWorker(senderManager, haagentmock.NewMockHaAgent(), 1, 2, nil, checksTracker, mockShouldAddStatsFunc) require.NotNil(t, err) - _, err = NewWorker(senderManager, 1, 2, pendingChecksChan, nil, mockShouldAddStatsFunc) + _, err = NewWorker(senderManager, haagentmock.NewMockHaAgent(), 1, 2, pendingChecksChan, nil, mockShouldAddStatsFunc) require.NotNil(t, err) - _, err = NewWorker(senderManager, 1, 2, pendingChecksChan, checksTracker, nil) + _, err = NewWorker(senderManager, haagentmock.NewMockHaAgent(), 1, 2, pendingChecksChan, checksTracker, nil) require.NotNil(t, err) - worker, err := NewWorker(senderManager, 1, 2, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(senderManager, haagentmock.NewMockHaAgent(), 1, 2, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) assert.Nil(t, err) assert.NotNil(t, worker) } @@ -150,7 +151,7 @@ func TestWorkerInitExpvarStats(t *testing.T) { go func(idx int) { defer wg.Done() - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 1, idx, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 1, idx, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) assert.Nil(t, err) worker.Run() @@ -172,7 +173,7 @@ func TestWorkerName(t *testing.T) { for _, id := range []int{1, 100, 500} { expectedName := fmt.Sprintf("worker_%d", id) - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 1, id, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 1, id, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) assert.Nil(t, err) assert.NotNil(t, worker) @@ -224,7 +225,7 @@ func TestWorker(t *testing.T) { pendingChecksChan <- testCheck1 close(pendingChecksChan) - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) require.Nil(t, err) wg.Add(1) @@ -284,6 +285,7 @@ func TestWorkerUtilizationExpvars(t *testing.T) { checksTracker, mockShouldAddStatsFunc, func() (sender.Sender, error) { return nil, nil }, + haagentmock.NewMockHaAgent(), 100*time.Millisecond, ) require.Nil(t, err) @@ -354,7 +356,7 @@ func TestWorkerErrorAndWarningHandling(t *testing.T) { } close(pendingChecksChan) - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) require.Nil(t, err) AssertAsyncWorkerCount(t, 0) @@ -399,7 +401,7 @@ func TestWorkerConcurrentCheckScheduling(t *testing.T) { pendingChecksChan <- testCheck close(pendingChecksChan) - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 100, 200, pendingChecksChan, checksTracker, mockShouldAddStatsFunc) require.Nil(t, err) worker.Run() @@ -453,7 +455,7 @@ func TestWorkerStatsAddition(t *testing.T) { pendingChecksChan <- squelchedStatsCheck close(pendingChecksChan) - worker, err := NewWorker(aggregator.NewNoOpSenderManager(), 100, 200, pendingChecksChan, checksTracker, shouldAddStatsFunc) + worker, err := NewWorker(aggregator.NewNoOpSenderManager(), haagentmock.NewMockHaAgent(), 100, 200, pendingChecksChan, checksTracker, shouldAddStatsFunc) require.Nil(t, err) worker.Run() @@ -505,6 +507,7 @@ func TestWorkerServiceCheckSending(t *testing.T) { func() (sender.Sender, error) { return mockSender, nil }, + haagentmock.NewMockHaAgent(), pollingInterval, ) require.Nil(t, err) @@ -575,6 +578,7 @@ func TestWorkerSenderNil(t *testing.T) { func() (sender.Sender, error) { return nil, fmt.Errorf("testerr") }, + haagentmock.NewMockHaAgent(), pollingInterval, ) require.Nil(t, err) @@ -615,6 +619,7 @@ func TestWorkerServiceCheckSendingLongRunningTasks(t *testing.T) { func() (sender.Sender, error) { return mockSender, nil }, + haagentmock.NewMockHaAgent(), pollingInterval, ) require.Nil(t, err)