Skip to content

Commit

Permalink
Stop self monitor output health reporting if output config is not ack… (
Browse files Browse the repository at this point in the history
#3335)

* Stop self monitor output health reporting if output config is not acked by agents

* updated changelog

* added test to error scenario
  • Loading branch information
juliaElastic authored Mar 11, 2024
1 parent 950eb52 commit 59facd6
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 3 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1710157707-fix-self-monitor-output-health.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Self monitor stops output health reporting if output config is not acked by agents

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3335

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 3334
15 changes: 12 additions & 3 deletions internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type Bulk interface {
GetBulker(outputName string) Bulk
GetBulkerMap() map[string]Bulk
CancelFn() context.CancelFunc
RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool

ReadSecrets(ctx context.Context, secretIds []string) (map[string]string, error)
}
Expand Down Expand Up @@ -247,17 +248,25 @@ func (b *Bulker) Client() *elasticsearch.Client {
return client
}

// check if remote output cfg changed
func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
func (b *Bulker) RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
curCfg := b.remoteOutputConfigMap[name]

hasChanged := false

// when output config first added, not reporting change
if curCfg != nil && !reflect.DeepEqual(curCfg, newCfg) {
zlog.Info().Str("name", name).Msg("remote output configuration has changed")
hasChanged = true
}
return hasChanged
}

// check if remote output cfg changed
func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
hasChanged := b.RemoteOutputConfigChanged(zlog, name, newCfg)
if hasChanged {
zlog.Debug().Str("name", name).Msg("remote output configuration has changed")
}

newCfgCopy := make(map[string]interface{})
for k, v := range newCfg {
newCfgCopy[k] = v
Expand Down
12 changes: 12 additions & 0 deletions internal/pkg/policy/self.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,22 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error
return state, nil
}

func isOutputCfgOutdated(ctx context.Context, bulker bulk.Bulk, zlog zerolog.Logger, outputName string) bool {
policy, err := dl.QueryOutputFromPolicy(ctx, bulker, outputName)
if err != nil || policy == nil {
return true
}
hasChanged := bulker.RemoteOutputConfigChanged(zlog, outputName, policy.Data.Outputs[outputName])
return hasChanged
}

func reportOutputHealth(ctx context.Context, bulker bulk.Bulk, zlog zerolog.Logger) {
//pinging logic
bulkerMap := bulker.GetBulkerMap()
for outputName, outputBulker := range bulkerMap {
if isOutputCfgOutdated(ctx, bulker, zlog, outputName) {
continue
}
doc := model.OutputHealth{
Output: outputName,
State: client.UnitStateHealthy.String(),
Expand Down
60 changes: 60 additions & 0 deletions internal/pkg/policy/self_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,14 @@ func TestSelfMonitor_reportOutputHealthyState(t *testing.T) {
}
return doc.Message == "" && doc.State == client.UnitStateHealthy.String()
}), mock.Anything).Return("", nil)
bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return(
&es.ResultT{
HitsT: es.HitsT{
Hits: []es.HitT{
{Source: []byte(`{"data": {"outputs":{"remote":{"type":"remote_elasticsearch","hosts":["http://localhost:9200"]}}}}`)},
},
},
}, nil)

reportOutputHealth(ctx, bulker, logger)

Expand Down Expand Up @@ -696,6 +704,58 @@ func TestSelfMonitor_reportOutputDegradedState(t *testing.T) {
}
return doc.Message == "remote ES is not reachable due to error: error connecting" && doc.State == client.UnitStateDegraded.String()
}), mock.Anything).Return("", nil)
bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return(
&es.ResultT{
HitsT: es.HitsT{
Hits: []es.HitT{
{Source: []byte(`{"data": {"outputs":{"remote":{"type":"remote_elasticsearch","hosts":["http://localhost:9200"]}}}}`)},
},
},
}, nil)

reportOutputHealth(ctx, bulker, logger)

bulker.AssertExpectations(t)
outputBulker.AssertExpectations(t)
}

func TestSelfMonitor_reportOutputSkipIfOutdated(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := testlog.SetLogger(t)

bulker := ftesting.NewMockBulk()
bulkerMap := make(map[string]bulk.Bulk)
outputBulker := ftesting.NewMockBulk()
bulkerMap["outdated"] = outputBulker
bulker.On("GetBulkerMap").Return(bulkerMap)
bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return(
&es.ResultT{
HitsT: es.HitsT{
Hits: []es.HitT{
{Source: []byte(`{"data": {"outputs":{"outdated":{"type":"remote_elasticsearch","hosts":["http://localhost:9200"]}}}}`)},
},
},
}, nil)

reportOutputHealth(ctx, bulker, logger)

bulker.AssertExpectations(t)
outputBulker.AssertExpectations(t)
}

func TestSelfMonitor_reportOutputSkipIfNotFound(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := testlog.SetLogger(t)

bulker := ftesting.NewMockBulk()
bulkerMap := make(map[string]bulk.Bulk)
outputBulker := ftesting.NewMockBulk()
bulkerMap["outdated"] = outputBulker
bulker.On("GetBulkerMap").Return(bulkerMap)
bulker.On("Search", mock.Anything, dl.FleetPolicies, mock.Anything, mock.Anything).Return(
&es.ResultT{}, errors.New("output not found"))

reportOutputHealth(ctx, bulker, logger)

Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/testing/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,8 @@ func (m *MockBulk) StartTransactionOptions(name, transactionType string, opts ap
return nil
}

func (m *MockBulk) RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool {
return name == "outdated"
}

var _ bulk.Bulk = (*MockBulk)(nil)

0 comments on commit 59facd6

Please sign in to comment.