Skip to content

Commit

Permalink
Make the 'Collector' an interface so it can more easily be mocked
Browse files Browse the repository at this point in the history
  • Loading branch information
hush-hush committed Nov 21, 2023
1 parent fa7381f commit 6cf7253
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cmd/agent/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
AC *autodiscovery.AutoConfig

// Coll is the global collector instance
Coll *collector.Collector
Coll collector.Collector

// ExpvarServer is the global expvar server
ExpvarServer *http.Server
Expand Down
43 changes: 27 additions & 16 deletions pkg/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,19 @@ const (

const cancelCheckTimeout time.Duration = 500 * time.Millisecond

// Collector abstract common operations about running a Check
type Collector struct {
// Collector manages a collection of checks and provides operations over them
type Collector interface {
Start()
Stop()
RunCheck(inner check.Check) (checkid.ID, error)
StopCheck(id checkid.ID) error
MapOverChecks(cb func([]check.Info))
GetChecks() []check.Check
GetAllInstanceIDs(checkName string) []checkid.ID
ReloadAllCheckInstances(name string, newInstances []check.Check) ([]checkid.ID, error)
}

type collector struct {
senderManager sender.SenderManager
checkInstances int64

Expand All @@ -46,8 +57,8 @@ type Collector struct {
}

// NewCollector create a Collector instance and sets up the Python Environment
func NewCollector(senderManager sender.SenderManager, paths ...string) *Collector {
c := &Collector{
func NewCollector(senderManager sender.SenderManager, paths ...string) Collector {
c := &collector{
senderManager: senderManager,
checks: make(map[checkid.ID]*middleware.CheckWrapper),
state: atomic.NewUint32(stopped),
Expand All @@ -73,7 +84,7 @@ func NewCollector(senderManager sender.SenderManager, paths ...string) *Collecto

// Start begins the collector's operation. The scheduler will not run any
// checks until this has been called.
func (c *Collector) Start() {
func (c *collector) Start() {
c.m.Lock()
defer c.m.Unlock()

Expand All @@ -94,7 +105,7 @@ func (c *Collector) Start() {
}

// Stop halts any component involved in running a Check
func (c *Collector) Stop() {
func (c *collector) Stop() {
c.m.Lock()
defer c.m.Unlock()

Expand All @@ -114,7 +125,7 @@ func (c *Collector) Stop() {
}

// RunCheck sends a Check in the execution queue
func (c *Collector) RunCheck(inner check.Check) (checkid.ID, error) {
func (c *collector) RunCheck(inner check.Check) (checkid.ID, error) {
c.m.Lock()
defer c.m.Unlock()

Expand Down Expand Up @@ -153,7 +164,7 @@ func (c *Collector) RunCheck(inner check.Check) (checkid.ID, error) {
}

// StopCheck halts a check and remove the instance
func (c *Collector) StopCheck(id checkid.ID) error {
func (c *collector) StopCheck(id checkid.ID) error {
if !c.started() {
return fmt.Errorf("the collector is not running")
}
Expand Down Expand Up @@ -192,7 +203,7 @@ func (c *Collector) StopCheck(id checkid.ID) error {
}

// cancelCheck calls Cancel on the passed check, with a timeout
func (c *Collector) cancelCheck(ch check.Check, timeout time.Duration) error {
func (c *collector) cancelCheck(ch check.Check, timeout time.Duration) error {
done := make(chan struct{})

go func() {
Expand All @@ -208,7 +219,7 @@ func (c *Collector) cancelCheck(ch check.Check, timeout time.Duration) error {
}
}

func (c *Collector) get(id checkid.ID) (check.Check, bool) {
func (c *collector) get(id checkid.ID) (check.Check, bool) {
c.m.RLock()
defer c.m.RUnlock()

Expand All @@ -217,20 +228,20 @@ func (c *Collector) get(id checkid.ID) (check.Check, bool) {
}

// remove the check from the list
func (c *Collector) delete(id checkid.ID) {
func (c *collector) delete(id checkid.ID) {
c.m.Lock()
defer c.m.Unlock()

delete(c.checks, id)
}

// lightweight shortcut to see if the collector has started
func (c *Collector) started() bool {
func (c *collector) started() bool {
return c.state.Load() == started
}

// MapOverChecks call the callback with the list of checks locked.
func (c *Collector) MapOverChecks(cb func([]check.Info)) {
func (c *collector) MapOverChecks(cb func([]check.Info)) {
c.m.RLock()
defer c.m.RUnlock()

Expand All @@ -242,7 +253,7 @@ func (c *Collector) MapOverChecks(cb func([]check.Info)) {
}

// GetChecks copies checks
func (c *Collector) GetChecks() []check.Check {
func (c *collector) GetChecks() []check.Check {
c.m.RLock()
defer c.m.RUnlock()

Expand All @@ -255,7 +266,7 @@ func (c *Collector) GetChecks() []check.Check {
}

// GetAllInstanceIDs returns the ID's of all instances of a check
func (c *Collector) GetAllInstanceIDs(checkName string) []checkid.ID {
func (c *collector) GetAllInstanceIDs(checkName string) []checkid.ID {
c.m.RLock()
defer c.m.RUnlock()

Expand All @@ -270,7 +281,7 @@ func (c *Collector) GetAllInstanceIDs(checkName string) []checkid.ID {
}

// ReloadAllCheckInstances completely restarts a check with a new configuration
func (c *Collector) ReloadAllCheckInstances(name string, newInstances []check.Check) ([]checkid.ID, error) {
func (c *collector) ReloadAllCheckInstances(name string, newInstances []check.Check) ([]checkid.ID, error) {
if !c.started() {
return nil, fmt.Errorf("The collector is not running")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/collector/collector_demux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ type CollectorDemuxTestSuite struct {
suite.Suite

demux *aggregator.TestAgentDemultiplexer
c *Collector
c *collector
}

func (suite *CollectorDemuxTestSuite) SetupTest() {
log := fxutil.Test[log.Component](suite.T(), log.MockModule)
suite.demux = aggregator.InitTestAgentDemultiplexerWithFlushInterval(log, 100*time.Hour)
suite.c = NewCollector(suite.demux)
suite.c = NewCollector(suite.demux).(*collector)

suite.c.Start()
}
Expand Down
75 changes: 75 additions & 0 deletions pkg/collector/collector_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build test

package collector

import (
"github.com/DataDog/datadog-agent/pkg/collector/check"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
"github.com/stretchr/testify/mock"
)

type mockCollector struct {
mock.Mock
checksInfo []check.Info
}

// NewMockCollector returns a mock collector.
//
// 'mockChecksInfo' will be used as argument for the callback when 'MapOverChecks' is called.
func NewMockCollector(mockChecksInfo []check.Info) Collector {
return &mockCollector{
checksInfo: mockChecksInfo,
}
}

// Start begins the collector's operation. The scheduler will not run any
// checks until this has been called.
func (c *mockCollector) Start() {
c.Called()
}

// Stop halts any component involved in running a Check
func (c *mockCollector) Stop() {
c.Called()
}

// RunCheck sends a Check in the execution queue
func (c *mockCollector) RunCheck(inner check.Check) (checkid.ID, error) {
args := c.Called(inner)
return args.Get(0).(checkid.ID), args.Error(1)
}

// StopCheck halts a check and remove the instance
func (c *mockCollector) StopCheck(id checkid.ID) error {
args := c.Called(id)
return args.Error(0)
}

// MapOverChecks call the callback with the list of checks locked.
func (c *mockCollector) MapOverChecks(cb func([]check.Info)) {
c.Called(cb)
cb(c.checksInfo)
}

// GetChecks copies checks
func (c *mockCollector) GetChecks() []check.Check {
args := c.Called()
return args.Get(0).([]check.Check)
}

// GetAllInstanceIDs returns the ID's of all instances of a check
func (c *mockCollector) GetAllInstanceIDs(checkName string) []checkid.ID {
args := c.Called(checkName)
return args.Get(0).([]checkid.ID)
}

// ReloadAllCheckInstances completely restarts a check with a new configuration
func (c *mockCollector) ReloadAllCheckInstances(name string, newInstances []check.Check) ([]checkid.ID, error) {
args := c.Called(name, newInstances)
return args.Get(0).([]checkid.ID), args.Error(1)
}
4 changes: 2 additions & 2 deletions pkg/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ func (p ChecksList) Less(i, j int) bool { return p[i] < p[j] }

type CollectorTestSuite struct {
suite.Suite
c *Collector
c *collector
}

func (suite *CollectorTestSuite) SetupTest() {
suite.c = NewCollector(aggregator.NewNoOpSenderManager())
suite.c = NewCollector(aggregator.NewNoOpSenderManager()).(*collector)
suite.c.Start()
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/collector/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ func init() {
type CheckScheduler struct {
configToChecks map[string][]checkid.ID // cache the ID of checks we load for each config
loaders []check.Loader
collector *Collector
collector Collector
senderManager sender.SenderManager
m sync.RWMutex
}

// InitCheckScheduler creates and returns a check scheduler
func InitCheckScheduler(collector *Collector, senderManager sender.SenderManager) *CheckScheduler {
func InitCheckScheduler(collector Collector, senderManager sender.SenderManager) *CheckScheduler {
checkScheduler = &CheckScheduler{
collector: collector,
senderManager: senderManager,
Expand Down

0 comments on commit 6cf7253

Please sign in to comment.