Skip to content

Commit

Permalink
use a hybrid health check for wlm kubeapiserver collector
Browse files Browse the repository at this point in the history
  • Loading branch information
adel121 committed Dec 9, 2024
1 parent 2491564 commit 68efa9b
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 23 deletions.
5 changes: 3 additions & 2 deletions comp/core/healthprobe/impl/healthprobe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"

healthprobeComponent "github.com/DataDog/datadog-agent/comp/core/healthprobe/def"
logmock "github.com/DataDog/datadog-agent/comp/core/log/mock"
compdef "github.com/DataDog/datadog-agent/comp/def"
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/stretchr/testify/assert"
)

func TestServer(t *testing.T) {
Expand Down Expand Up @@ -115,7 +116,7 @@ func TestReadyHandlerUnhealthy(t *testing.T) {
request := httptest.NewRequest(http.MethodGet, "/ready", nil)
responseRecorder := httptest.NewRecorder()

handler := health.RegisterReadiness("fake")
handler := health.RegisterReadiness("fake", false)
defer func() {
health.Deregister(handler)
}()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func runStartupCheck(ctx context.Context, stores []*reflectorStore) {
// There is no way to ensure liveness correctly as it would need to be plugged inside the
// inner loop of Reflector.
// However, we add Startup when we got at least some data.
startupHealthCheck := health.RegisterStartup(componentName)
startupHealthCheck := health.RegisterReadiness(componentName, true)

// Checked synced, in its own scope to cleanly un-reference the syncTimer
{
Expand Down
2 changes: 1 addition & 1 deletion comp/forwarder/defaultforwarder/forwarder_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (fh *forwarderHealth) Start() {
return
}

fh.health = health.RegisterReadiness("forwarder")
fh.health = health.RegisterReadiness("forwarder", false)
fh.log.Debug("Starting forwarder health check")
fh.init()
go fh.healthCheckLoop()
Expand Down
12 changes: 7 additions & 5 deletions pkg/status/health/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@ var readinessOnlyCatalog = newCatalog()
var startupOnlyCatalog = newStartupCatalog()

// RegisterReadiness registers a component for readiness check with the default 30 seconds timeout, returns a token
func RegisterReadiness(name string) *Handle {
return readinessOnlyCatalog.register(name)
func RegisterReadiness(name string, once bool) *Handle {
return readinessOnlyCatalog.register(name, once)
}

// RegisterLiveness registers a component for liveness check with the default 30 seconds timeout, returns a token
func RegisterLiveness(name string) *Handle {
return readinessAndLivenessCatalog.register(name)
func RegisterLiveness(name string, once bool) *Handle {
return readinessAndLivenessCatalog.register(name, once)
}

// RegisterStartup registers a component for startup check, returns a token
func RegisterStartup(name string) *Handle {
return startupOnlyCatalog.register(name)
// we register the startup probe with once=false because once a startup is healthy
// it is marked as unhealthy
return startupOnlyCatalog.register(name, false)
}

// Deregister a component from the healthcheck
Expand Down
16 changes: 13 additions & 3 deletions pkg/status/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"errors"
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/util/log"
)

var pingFrequency = 15 * time.Second
Expand All @@ -29,6 +31,8 @@ type component struct {
name string
healthChan chan time.Time
healthy bool
// if set to true, once the check is healthy, we mark it as healthy forever and we stop checking it
once bool
}

type catalog struct {
Expand All @@ -55,7 +59,7 @@ func newStartupCatalog() *catalog {
}

// register a component with the default 30 seconds timeout, returns a token
func (c *catalog) register(name string) *Handle {
func (c *catalog) register(name string, once bool) *Handle {
c.Lock()
defer c.Unlock()

Expand All @@ -67,6 +71,7 @@ func (c *catalog) register(name string) *Handle {
name: name,
healthChan: make(chan time.Time, bufferSize),
healthy: false,
once: once,
}
h := &Handle{
C: component.healthChan,
Expand Down Expand Up @@ -107,8 +112,13 @@ func (c *catalog) pingComponents(healthDeadline time.Time) bool {
c.Lock()
defer c.Unlock()
for _, component := range c.components {
// In startup mode, we skip already healthy components.
if c.startup && component.healthy {
// We skip components that are:
// - in startup mode
// - registered to be skipped once they pass once
if component.healthy && (c.startup || component.once) {
if component.once {
log.Debugf("skipping component %q: component already ")
}
continue
}
select {
Expand Down
48 changes: 37 additions & 11 deletions pkg/status/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestCatalogStartsHealthy(t *testing.T) {
cat := newCatalog()
// Register a fake compoment
// because without any registered component, the `healthcheck` component would be disabled
_ = cat.register("test1")
_ = cat.register("test1", false)

status := cat.getStatus()
assert.Len(t, status.Healthy, 1)
Expand All @@ -38,7 +38,7 @@ func TestCatalogGetsUnhealthyAndBack(t *testing.T) {
cat := newCatalog()
// Register a fake compoment
// because without any registered component, the `healthcheck` component would be disabled
_ = cat.register("test1")
_ = cat.register("test1", false)

status := cat.getStatus()
assert.Contains(t, status.Healthy, "healthcheck")
Expand All @@ -54,7 +54,7 @@ func TestCatalogGetsUnhealthyAndBack(t *testing.T) {

func TestRegisterAndUnhealthy(t *testing.T) {
cat := newCatalog()
token := cat.register("test1")
token := cat.register("test1", false)

_, found := cat.components[token]
require.True(t, found)
Expand All @@ -67,16 +67,16 @@ func TestRegisterAndUnhealthy(t *testing.T) {

func TestRegisterTriplets(t *testing.T) {
cat := newCatalog()
cat.register("triplet")
cat.register("triplet")
cat.register("triplet")
cat.register("triplet", false)
cat.register("triplet", false)
cat.register("triplet", false)
assert.Len(t, cat.components, 3)
}

func TestDeregister(t *testing.T) {
cat := newCatalog()
token1 := cat.register("test1")
token2 := cat.register("test2")
token1 := cat.register("test1", false)
token2 := cat.register("test2", false)

assert.Len(t, cat.components, 2)

Expand All @@ -88,7 +88,7 @@ func TestDeregister(t *testing.T) {

func TestDeregisterBadToken(t *testing.T) {
cat := newCatalog()
token1 := cat.register("test1")
token1 := cat.register("test1", false)

assert.Len(t, cat.components, 1)

Expand All @@ -100,7 +100,7 @@ func TestDeregisterBadToken(t *testing.T) {

func TestGetHealthy(t *testing.T) {
cat := newCatalog()
token := cat.register("test1")
token := cat.register("test1", false)

// Start unhealthy
status := cat.getStatus()
Expand All @@ -126,7 +126,33 @@ func TestGetHealthy(t *testing.T) {

func TestStartupCatalog(t *testing.T) {
cat := newStartupCatalog()
token := cat.register("test1")
token := cat.register("test1", false)

// Start unhealthy
status := cat.getStatus()
assert.Len(t, status.Healthy, 1)
assert.Contains(t, status.Healthy, "healthcheck")
assert.Len(t, status.Unhealthy, 1)
assert.Contains(t, status.Unhealthy, "test1")

// Get healthy
<-token.C
cat.pingComponents(time.Time{}) // First ping will make component healthy and fill the channel again.
status = cat.getStatus()
assert.Len(t, status.Healthy, 2)
assert.Contains(t, status.Healthy, "test1")

// Make sure that we stay health even if we don't ping.
cat.pingComponents(time.Time{})
status = cat.getStatus()
assert.Len(t, status.Healthy, 2)
assert.Contains(t, status.Healthy, "test1")
}

func TestCatalogWithOnceComponent(t *testing.T) {
cat := newCatalog()
once := true
token := cat.register("test1", once)

// Start unhealthy
status := cat.getStatus()
Expand Down

0 comments on commit 68efa9b

Please sign in to comment.