Skip to content

Commit

Permalink
use a hybrid health check for wlm kubeapiserver collector
Browse files Browse the repository at this point in the history
  • Loading branch information
adel121 committed Dec 9, 2024
1 parent 2491564 commit bc7b777
Show file tree
Hide file tree
Showing 31 changed files with 86 additions and 64 deletions.
2 changes: 1 addition & 1 deletion comp/core/autodiscovery/autodiscoveryimpl/autoconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func createNewAutoConfig(schedulerController *scheduler.Controller, secretResolv
listenerCandidates: make(map[string]*listenerCandidate),
listenerRetryStop: nil, // We'll open it if needed
listenerStop: make(chan struct{}),
healthListening: health.RegisterLiveness("ad-servicelistening"),
healthListening: health.RegisterLiveness("ad-servicelistening", false),
newService: make(chan listeners.Service),
delService: make(chan listeners.Service),
store: newStore(),
Expand Down
4 changes: 2 additions & 2 deletions comp/core/autodiscovery/autodiscoveryimpl/config_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (cp *configPoller) stream(ch chan struct{}, provider providers.StreamingCon
var ranOnce bool
ctx, cancel := context.WithCancel(context.Background())
changesCh := provider.Stream(ctx)
healthHandle := health.RegisterLiveness(fmt.Sprintf("ad-config-provider-%s", cp.provider.String()))
healthHandle := health.RegisterLiveness(fmt.Sprintf("ad-config-provider-%s", cp.provider.String()), false)

cp.isRunning = true

Expand Down Expand Up @@ -127,7 +127,7 @@ func (cp *configPoller) stream(ch chan struct{}, provider providers.StreamingCon
func (cp *configPoller) poll(provider providers.CollectingConfigProvider, ac *AutoConfig) {
ctx, cancel := context.WithCancel(context.Background())
ticker := time.NewTicker(cp.pollInterval)
healthHandle := health.RegisterLiveness(fmt.Sprintf("ad-config-provider-%s", cp.provider.String()))
healthHandle := health.RegisterLiveness(fmt.Sprintf("ad-config-provider-%s", cp.provider.String()), false)

cp.isRunning = true

Expand Down
2 changes: 1 addition & 1 deletion comp/core/autodiscovery/listeners/workloadmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (l *workloadmetaListenerImpl) Listen(newSvc chan<- Service, delSvc chan<- S
l.delService = delSvc

ch := l.store.Subscribe(l.name, workloadmeta.NormalPriority, l.workloadFilters)
health := health.RegisterLiveness(l.name)
health := health.RegisterLiveness(l.name, false)

log.Infof("%s initialized successfully", l.name)

Expand Down
7 changes: 4 additions & 3 deletions comp/core/healthprobe/impl/healthprobe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"

healthprobeComponent "github.com/DataDog/datadog-agent/comp/core/healthprobe/def"
logmock "github.com/DataDog/datadog-agent/comp/core/log/mock"
compdef "github.com/DataDog/datadog-agent/comp/def"
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/stretchr/testify/assert"
)

func TestServer(t *testing.T) {
Expand Down Expand Up @@ -84,7 +85,7 @@ func TestLiveHandlerUnhealthy(t *testing.T) {
request := httptest.NewRequest(http.MethodGet, "/live", nil)
responseRecorder := httptest.NewRecorder()

handler := health.RegisterLiveness("fake")
handler := health.RegisterLiveness("fake", false)
defer func() {
health.Deregister(handler)
}()
Expand Down Expand Up @@ -115,7 +116,7 @@ func TestReadyHandlerUnhealthy(t *testing.T) {
request := httptest.NewRequest(http.MethodGet, "/ready", nil)
responseRecorder := httptest.NewRecorder()

handler := health.RegisterReadiness("fake")
handler := health.RegisterReadiness("fake", false)
defer func() {
health.Deregister(handler)
}()
Expand Down
2 changes: 1 addition & 1 deletion comp/core/tagger/collectors/workloadmeta_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (c *WorkloadMetaCollector) collectStaticGlobalTags(ctx context.Context, dat
func (c *WorkloadMetaCollector) stream(ctx context.Context) {
const name = "tagger-workloadmeta"

health := health.RegisterLiveness(name)
health := health.RegisterLiveness(name, false)
defer func() {
err := health.Deregister()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion comp/core/tagger/tagstore/tagstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newTagStoreWithClock(clock clock.Clock, telemetryStore *telemetry.Store) *T
func (s *TagStore) Run(ctx context.Context) {
pruneTicker := time.NewTicker(1 * time.Minute)
telemetryTicker := time.NewTicker(1 * time.Minute)
health := health.RegisterLiveness("tagger-store")
health := health.RegisterLiveness("tagger-store", false)

for {
select {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (c *collector) GetTargetCatalog() workloadmeta.AgentType {
}

func (c *collector) stream(ctx context.Context) {
healthHandle := health.RegisterLiveness(componentName)
healthHandle := health.RegisterLiveness(componentName, false)
ctx, cancel := context.WithCancel(ctx)

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (c *collector) GetTargetCatalog() workloadmeta.AgentType {
}

func (c *collector) stream(ctx context.Context) {
health := health.RegisterLiveness(componentName)
health := health.RegisterLiveness(componentName, false)
ctx, cancel := context.WithCancel(ctx)

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func runStartupCheck(ctx context.Context, stores []*reflectorStore) {
// There is no way to ensure liveness correctly as it would need to be plugged inside the
// inner loop of Reflector.
// However, we add Startup when we got at least some data.
startupHealthCheck := health.RegisterStartup(componentName)
startupHealthCheck := health.RegisterReadiness(componentName, true)

// Checked synced, in its own scope to cleanly un-reference the syncTimer
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (c *collector) collect(ctx context.Context, containerProvider proccontainer
func (c *collector) stream(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
health := health.RegisterLiveness(componentName)
health := health.RegisterLiveness(componentName, false)
for {
select {
case <-health.C:
Expand Down
4 changes: 2 additions & 2 deletions comp/core/workloadmeta/impl/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type subscriber struct {
// start starts the workload metadata store.
func (w *workloadmeta) start(ctx context.Context) {
go func() {
health := health.RegisterLiveness("workloadmeta-store")
health := health.RegisterLiveness("workloadmeta-store", false)
for {
select {
case <-health.C:
Expand All @@ -61,7 +61,7 @@ func (w *workloadmeta) start(ctx context.Context) {

go func() {
pullTicker := time.NewTicker(pullCollectorInterval)
health := health.RegisterLiveness("workloadmeta-puller")
health := health.RegisterLiveness("workloadmeta-puller", false)

// Start a pull immediately to fill the store without waiting for the
// next tick.
Expand Down
2 changes: 1 addition & 1 deletion comp/dogstatsd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (s *server) start(context.Context) error {
}
}

s.health = health.RegisterLiveness("dogstatsd-main")
s.health = health.RegisterLiveness("dogstatsd-main", false)

// start the debug loop
// ----------------------
Expand Down
2 changes: 1 addition & 1 deletion comp/forwarder/defaultforwarder/forwarder_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (fh *forwarderHealth) Start() {
return
}

fh.health = health.RegisterReadiness("forwarder")
fh.health = health.RegisterReadiness("forwarder", false)
fh.log.Debug("Starting forwarder health check")
fh.init()
go fh.healthCheckLoop()
Expand Down
2 changes: 1 addition & 1 deletion comp/languagedetection/client/clientimpl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (c *client) startStreaming() {
periodicFlushTimer := time.NewTicker(c.periodicalFlushPeriod)
defer periodicFlushTimer.Stop()

health := health.RegisterLiveness("process-language-detection-client-sender")
health := health.RegisterLiveness("process-language-detection-client-sender", false)

ctx, cancel := context.WithCancel(c.ctx)
for {
Expand Down
2 changes: 1 addition & 1 deletion comp/logs/agent/agentimpl/agent_core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

// NewAgent returns a new Logs Agent
func (a *logAgent) SetupPipeline(processingRules []*config.ProcessingRule, wmeta optional.Option[workloadmeta.Component], integrationsLogs integrations.Component) {
health := health.RegisterLiveness("logs-agent")
health := health.RegisterLiveness("logs-agent", false)

// setup the auditor
// We pass the health handle to the auditor because it's the end of the pipeline and the most
Expand Down
2 changes: 1 addition & 1 deletion comp/logs/agent/agentimpl/agent_serverless_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (a *logAgent) SetupPipeline(
wmeta optional.Option[workloadmeta.Component],
_ integrations.Component,
) {
health := health.RegisterLiveness("logs-agent")
health := health.RegisterLiveness("logs-agent", false)

diagnosticMessageReceiver := diagnostic.NewBufferedMessageReceiver(streamlogs.Formatter{}, nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (a *Agent) GetPipelineProvider() pipeline.Provider {
func (a *Agent) SetupPipeline(
processingRules []*config.ProcessingRule,
) {
health := health.RegisterLiveness("logs-agent")
health := health.RegisterLiveness("logs-agent", false)

// setup the auditor
// We pass the health handle to the auditor because it's the end of the pipeline and the most
Expand Down
2 changes: 1 addition & 1 deletion pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func NewBufferedAggregator(s serializer.MetricSerializer, eventPlatformForwarder
hostnameUpdateDone: make(chan struct{}),
flushChan: make(chan flushTrigger),
stopChan: make(chan struct{}),
health: health.RegisterLiveness("aggregator"),
health: health.RegisterLiveness("aggregator", false),
agentName: agentName,
tlmContainerTagsEnabled: pkgconfigsetup.Datadog().GetBool("basic_telemetry_add_container_tags"),
agentTags: tagger.AgentTags,
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/clusterchecks/dispatcher_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (d *dispatcher) run(ctx context.Context) {
d.store.active = true
d.store.Unlock()

healthProbe := health.RegisterLiveness("clusterchecks-dispatch")
healthProbe := health.RegisterLiveness("clusterchecks-dispatch", false)
defer health.Deregister(healthProbe) //nolint:errcheck

cleanupTicker := time.NewTicker(time.Duration(d.nodeExpirationSeconds/2) * time.Second)
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/clusterchecks/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (h *Handler) leaderWatch(ctx context.Context) {
log.Warnf("Could not refresh leadership status: %s", err)
}

healthProbe := health.RegisterLiveness("clusterchecks-leadership")
healthProbe := health.RegisterLiveness("clusterchecks-leadership", false)
defer health.Deregister(healthProbe) //nolint:errcheck

watchTicker := time.NewTicker(h.leaderStatusFreq)
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/languagedetection/patcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (lp *languagePatcher) run(ctx context.Context) {
)
defer lp.store.Unsubscribe(eventCh)

health := health.RegisterLiveness("process-language-detection-patcher")
health := health.RegisterLiveness("process-language-detection-patcher", false)

for {
select {
Expand Down
2 changes: 1 addition & 1 deletion pkg/collector/scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newJobQueue(interval time.Duration) *jobQueue {
interval: interval,
stop: make(chan bool),
stopped: make(chan bool),
health: health.RegisterLiveness(fmt.Sprintf("collector-queue-%vs", interval.Seconds())),
health: health.RegisterLiveness(fmt.Sprintf("collector-queue-%vs", interval.Seconds()), false),
bucketTicker: time.NewTicker(time.Second),
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/gpu/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (c *cudaEventConsumer) Start() {
if c == nil {
return
}
health := health.RegisterLiveness("gpu-tracer-cuda-events")
health := health.RegisterLiveness("gpu-tracer-cuda-events", false)
processMonitor := monitor.GetProcessMonitor()
cleanupExit := processMonitor.SubscribeExit(c.handleProcessExit)

Expand Down
2 changes: 1 addition & 1 deletion pkg/jmxfetch/jmxfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (j *JMXFetch) Wait() error {
}

func (j *JMXFetch) heartbeat(beat *time.Ticker) {
health := health.RegisterLiveness("jmxfetch")
health := health.RegisterLiveness("jmxfetch", false)
defer health.Deregister() //nolint:errcheck

for range beat.C {
Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/auditor/auditor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (suite *AuditorTestSuite) SetupTest() {

suite.testRegistryPath = filepath.Join(suite.testRunPathDir, "registry.json")

suite.a = New(suite.testRunPathDir, DefaultRegistryFilename, time.Hour, health.RegisterLiveness("fake"))
suite.a = New(suite.testRunPathDir, DefaultRegistryFilename, time.Hour, health.RegisterLiveness("fake", false))
suite.source = sources.NewLogSource("", &config.LogsConfig{Path: testpath})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/launchers/journald/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newTestLauncher(t *testing.T) *Launcher {
fakeTagger := mock.SetupFakeTagger(t)

launcher := NewLauncherWithFactory(&MockJournalFactory{}, flare.NewFlareController(), fakeTagger)
launcher.Start(launchers.NewMockSourceProvider(), pipeline.NewMockProvider(), auditor.New("", "registry.json", time.Hour, health.RegisterLiveness("fake")), tailers.NewTailerTracker())
launcher.Start(launchers.NewMockSourceProvider(), pipeline.NewMockProvider(), auditor.New("", "registry.json", time.Hour, health.RegisterLiveness("fake", false)), tailers.NewTailerTracker())
return launcher
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/logs/pipeline/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ProviderTestSuite struct {
}

func (suite *ProviderTestSuite) SetupTest() {
suite.a = auditor.New("", auditor.DefaultRegistryFilename, time.Hour, health.RegisterLiveness("fake"))
suite.a = auditor.New("", auditor.DefaultRegistryFilename, time.Hour, health.RegisterLiveness("fake", false))
suite.p = &provider{
numberOfPipelines: 3,
auditor: suite.a,
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/tracer/connection/tcp_close_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *tcpCloseConsumer) Start(callback func(*network.ConnectionStats)) {
if c == nil {
return
}
health := health.RegisterLiveness("network-tracer")
health := health.RegisterLiveness("network-tracer", false)

var (
then = time.Now()
Expand Down
14 changes: 8 additions & 6 deletions pkg/status/health/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,23 @@ import (

var readinessAndLivenessCatalog = newCatalog()
var readinessOnlyCatalog = newCatalog()
var startupOnlyCatalog = newStartupCatalog()
var startupOnlyCatalog = newCatalog()

// RegisterReadiness registers a component for readiness check with the default 30 seconds timeout, returns a token
func RegisterReadiness(name string) *Handle {
return readinessOnlyCatalog.register(name)
func RegisterReadiness(name string, once bool) *Handle {
return readinessOnlyCatalog.register(name, once)
}

// RegisterLiveness registers a component for liveness check with the default 30 seconds timeout, returns a token
func RegisterLiveness(name string) *Handle {
return readinessAndLivenessCatalog.register(name)
func RegisterLiveness(name string, once bool) *Handle {
return readinessAndLivenessCatalog.register(name, once)
}

// RegisterStartup registers a component for startup check, returns a token
func RegisterStartup(name string) *Handle {
return startupOnlyCatalog.register(name)
// Startup health checks are registered with once=true because, by design, they should stop being checked
// once they are marked as healthy once
return startupOnlyCatalog.register(name, true)
}

// Deregister a component from the healthcheck
Expand Down
19 changes: 6 additions & 13 deletions pkg/status/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,25 @@ type component struct {
name string
healthChan chan time.Time
healthy bool
// if set to true, once the check is healthy, we mark it as healthy forever and we stop checking it
once bool
}

type catalog struct {
sync.RWMutex
components map[*Handle]*component
latestRun time.Time
startup bool
}

func newCatalog() *catalog {
return &catalog{
components: make(map[*Handle]*component),
latestRun: time.Now(), // Start healthy
startup: false,
}
}

func newStartupCatalog() *catalog {
return &catalog{
components: make(map[*Handle]*component),
latestRun: time.Now(), // Start healthy
startup: true,
}
}

// register a component with the default 30 seconds timeout, returns a token
func (c *catalog) register(name string) *Handle {
func (c *catalog) register(name string, once bool) *Handle {
c.Lock()
defer c.Unlock()

Expand All @@ -67,6 +59,7 @@ func (c *catalog) register(name string) *Handle {
name: name,
healthChan: make(chan time.Time, bufferSize),
healthy: false,
once: once,
}
h := &Handle{
C: component.healthChan,
Expand Down Expand Up @@ -107,8 +100,8 @@ func (c *catalog) pingComponents(healthDeadline time.Time) bool {
c.Lock()
defer c.Unlock()
for _, component := range c.components {
// In startup mode, we skip already healthy components.
if c.startup && component.healthy {
// We skip components that are registered to be skipped once they pass once
if component.healthy && component.once {
continue
}
select {
Expand Down
Loading

0 comments on commit bc7b777

Please sign in to comment.