diff --git a/comp/haagent/impl/haagent.go b/comp/haagent/impl/haagent.go index 7974867596547..5328d039da2d3 100644 --- a/comp/haagent/impl/haagent.go +++ b/comp/haagent/impl/haagent.go @@ -7,8 +7,10 @@ package haagentimpl import ( "context" + "encoding/json" log "github.com/DataDog/datadog-agent/comp/core/log/def" + "github.com/DataDog/datadog-agent/pkg/remoteconfig/state" "github.com/DataDog/datadog-agent/pkg/util/hostname" "go.uber.org/atomic" ) @@ -47,3 +49,38 @@ func (h *haAgentImpl) SetLeader(leaderAgentHostname string) { } h.isLeader.Store(agentHostname == leaderAgentHostname) } + +func (h *haAgentImpl) onHaAgentUpdate(updates map[string]state.RawConfig, applyStateCallback func(string, state.ApplyStatus)) { + h.log.Debugf("Updates received: count=%d", len(updates)) + + for configPath, rawConfig := range updates { + h.log.Debugf("Received config %s: %s", configPath, string(rawConfig.Config)) + haAgentMsg := haAgentConfig{} + err := json.Unmarshal(rawConfig.Config, &haAgentMsg) + if err != nil { + h.log.Warnf("Skipping invalid HA_AGENT update %s: %v", configPath, err) + applyStateCallback(configPath, state.ApplyStatus{ + State: state.ApplyStateError, + Error: "error unmarshalling payload", + }) + continue + } + if haAgentMsg.Group != h.GetGroup() { + h.log.Warnf("Skipping invalid HA_AGENT update %s: expected group %s, got %s", + configPath, h.GetGroup(), haAgentMsg.Group) + applyStateCallback(configPath, state.ApplyStatus{ + State: state.ApplyStateError, + Error: "group does not match", + }) + continue + } + + h.SetLeader(haAgentMsg.Leader) + + h.log.Debugf("Processed config %s: %v", configPath, haAgentMsg) + + applyStateCallback(configPath, state.ApplyStatus{ + State: state.ApplyStateAcknowledged, + }) + } +} diff --git a/comp/haagent/impl/haagent_comp.go b/comp/haagent/impl/haagent_comp.go index 24f591741fa0d..f922325a9a440 100644 --- a/comp/haagent/impl/haagent_comp.go +++ b/comp/haagent/impl/haagent_comp.go @@ -10,6 +10,8 @@ import ( "github.com/DataDog/datadog-agent/comp/core/config" log "github.com/DataDog/datadog-agent/comp/core/log/def" haagent "github.com/DataDog/datadog-agent/comp/haagent/def" + rctypes "github.com/DataDog/datadog-agent/comp/remote-config/rcclient/types" + "github.com/DataDog/datadog-agent/pkg/remoteconfig/state" ) // Requires defines the dependencies for the haagent component @@ -20,14 +22,25 @@ type Requires struct { // Provides defines the output of the haagent component type Provides struct { - Comp haagent.Component + Comp haagent.Component + RCListener rctypes.ListenerProvider } // NewComponent creates a new haagent component func NewComponent(reqs Requires) (Provides, error) { haAgentConfigs := newHaAgentConfigs(reqs.AgentConfig) + haAgent := newHaAgentImpl(reqs.Logger, haAgentConfigs) + var rcListener rctypes.ListenerProvider + if haAgent.Enabled() { + reqs.Logger.Debug("Add onHaAgentUpdate RCListener") + rcListener.ListenerProvider = rctypes.RCListener{ + state.ProductHaAgent: haAgent.onHaAgentUpdate, + } + } + provides := Provides{ - Comp: newHaAgentImpl(reqs.Logger, haAgentConfigs), + Comp: haAgent, + RCListener: rcListener, } return provides, nil } diff --git a/comp/haagent/impl/haagent_test.go b/comp/haagent/impl/haagent_test.go index 3be3d2d6341ee..b91e33f27c4b6 100644 --- a/comp/haagent/impl/haagent_test.go +++ b/comp/haagent/impl/haagent_test.go @@ -8,9 +8,17 @@ package haagentimpl import ( "testing" + "github.com/DataDog/datadog-agent/comp/core/config" + logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" + "github.com/DataDog/datadog-agent/pkg/remoteconfig/state" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" "github.com/stretchr/testify/assert" + "go.uber.org/fx" ) +var testConfigID = "datadog/2/HA_AGENT/group-62345762794c0c0b/65f17d667fb50f8ae28a3c858bdb1be9ea994f20249c119e007c520ac115c807" +var testGroup = "testGroup01" + func Test_Enabled(t *testing.T) { tests := []struct { name string @@ -34,7 +42,7 @@ func Test_Enabled(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - haAgent := newTestHaAgentComponent(t, tt.configs) + haAgent := newTestHaAgentComponent(t, tt.configs).Comp assert.Equal(t, tt.expectedEnabled, haAgent.Enabled()) }) } @@ -44,7 +52,7 @@ func Test_GetGroup(t *testing.T) { agentConfigs := map[string]interface{}{ "ha_agent.group": "my-group-01", } - haAgent := newTestHaAgentComponent(t, agentConfigs) + haAgent := newTestHaAgentComponent(t, agentConfigs).Comp assert.Equal(t, "my-group-01", haAgent.GetGroup()) } @@ -52,7 +60,7 @@ func Test_IsLeader_SetLeader(t *testing.T) { agentConfigs := map[string]interface{}{ "hostname": "my-agent-hostname", } - haAgent := newTestHaAgentComponent(t, agentConfigs) + haAgent := newTestHaAgentComponent(t, agentConfigs).Comp haAgent.SetLeader("another-agent") assert.False(t, haAgent.IsLeader()) @@ -60,3 +68,104 @@ func Test_IsLeader_SetLeader(t *testing.T) { haAgent.SetLeader("my-agent-hostname") assert.True(t, haAgent.IsLeader()) } + +func Test_RCListener(t *testing.T) { + tests := []struct { + name string + configs map[string]interface{} + expectRCListener bool + }{ + { + name: "enabled", + configs: map[string]interface{}{ + "ha_agent.enabled": true, + }, + expectRCListener: true, + }, + { + name: "disabled", + configs: map[string]interface{}{ + "ha_agent.enabled": false, + }, + expectRCListener: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provides := newTestHaAgentComponent(t, tt.configs) + if tt.expectRCListener { + assert.NotNil(t, provides.RCListener.ListenerProvider) + } else { + assert.Nil(t, provides.RCListener.ListenerProvider) + } + }) + } +} + +func Test_haAgentImpl_onHaAgentUpdate(t *testing.T) { + + tests := []struct { + name string + updates map[string]state.RawConfig + expectedApplyID string + expectedApplyStatus state.ApplyStatus + }{ + { + name: "successful update", + updates: map[string]state.RawConfig{ + testConfigID: {Config: []byte(`{"group":"testGroup01","leader":"ha-agent1"}`)}, + }, + expectedApplyID: testConfigID, + expectedApplyStatus: state.ApplyStatus{ + State: state.ApplyStateAcknowledged, + }, + }, + { + name: "invalid payload", + updates: map[string]state.RawConfig{ + testConfigID: {Config: []byte(`invalid-json`)}, + }, + expectedApplyID: testConfigID, + expectedApplyStatus: state.ApplyStatus{ + State: state.ApplyStateError, + Error: "error unmarshalling payload", + }, + }, + { + name: "invalid group", + updates: map[string]state.RawConfig{ + testConfigID: {Config: []byte(`{"group":"invalidGroup","leader":"ha-agent1"}`)}, + }, + expectedApplyID: testConfigID, + expectedApplyStatus: state.ApplyStatus{ + State: state.ApplyStateError, + Error: "group does not match", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + agentConfigs := map[string]interface{}{ + "hostname": "my-agent-hostname", + "ha_agent.enabled": true, + "ha_agent.group": testGroup, + } + agentConfigComponent := fxutil.Test[config.Component](t, fx.Options( + config.MockModule(), + fx.Replace(config.MockParams{Overrides: agentConfigs}), + )) + + h := newHaAgentImpl(logmock.New(t), newHaAgentConfigs(agentConfigComponent)) + + var applyID string + var applyStatus state.ApplyStatus + applyFunc := func(id string, status state.ApplyStatus) { + applyID = id + applyStatus = status + } + h.onHaAgentUpdate(tt.updates, applyFunc) + assert.Equal(t, tt.expectedApplyID, applyID) + assert.Equal(t, tt.expectedApplyStatus, applyStatus) + }) + } +} diff --git a/comp/haagent/impl/haagent_testutils_test.go b/comp/haagent/impl/haagent_testutils_test.go index a401d9bdd8a61..ec09c03d99c3f 100644 --- a/comp/haagent/impl/haagent_testutils_test.go +++ b/comp/haagent/impl/haagent_testutils_test.go @@ -10,13 +10,12 @@ import ( "github.com/DataDog/datadog-agent/comp/core/config" logmock "github.com/DataDog/datadog-agent/comp/core/log/mock" - haagent "github.com/DataDog/datadog-agent/comp/haagent/def" "github.com/DataDog/datadog-agent/pkg/util/fxutil" "github.com/stretchr/testify/require" "go.uber.org/fx" ) -func newTestHaAgentComponent(t *testing.T, agentConfigs map[string]interface{}) haagent.Component { +func newTestHaAgentComponent(t *testing.T, agentConfigs map[string]interface{}) Provides { logComponent := logmock.New(t) agentConfigComponent := fxutil.Test[config.Component](t, fx.Options( config.MockModule(), @@ -30,8 +29,5 @@ func newTestHaAgentComponent(t *testing.T, agentConfigs map[string]interface{}) provides, err := NewComponent(requires) require.NoError(t, err) - - comp := provides.Comp - require.NotNil(t, comp) - return comp + return provides } diff --git a/comp/haagent/impl/rcpayload.go b/comp/haagent/impl/rcpayload.go new file mode 100644 index 0000000000000..10784dc801d35 --- /dev/null +++ b/comp/haagent/impl/rcpayload.go @@ -0,0 +1,11 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +package haagentimpl + +type haAgentConfig struct { + Group string `json:"group"` + Leader string `json:"leader"` +} diff --git a/pkg/remoteconfig/state/products.go b/pkg/remoteconfig/state/products.go index ddb429cf565bb..be3133064b998 100644 --- a/pkg/remoteconfig/state/products.go +++ b/pkg/remoteconfig/state/products.go @@ -32,6 +32,7 @@ var validProducts = map[string]struct{}{ ProductTesting1: {}, ProductTesting2: {}, ProductOrchestratorK8sCRDs: {}, + ProductHaAgent: {}, } const ( @@ -87,4 +88,6 @@ const ( ProductTesting2 = "TESTING2" // ProductOrchestratorK8sCRDs receives values for k8s crds ProductOrchestratorK8sCRDs = "ORCHESTRATOR_K8S_CRDS" + // ProductHaAgent is the HA Agent product + ProductHaAgent = "HA_AGENT" )