From 7e73994dfa737be28f1013a29b51785fcb94e0bc Mon Sep 17 00:00:00 2001 From: CharlyF Date: Wed, 13 Feb 2019 18:53:29 -0500 Subject: [PATCH 1/3] ensure dangling cluster checks can be re-scheduled --- .../clusterchecks/dispatcher_nodes.go | 2 + .../clusterchecks/dispatcher_test.go | 50 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go index ed9f9f3fe44a0..733e163f32f58 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go @@ -105,6 +105,8 @@ func (d *dispatcher) expireNodes() { } for digest, config := range node.digestToConfig { delete(d.store.digestToNode, digest) + // Dangling configs are meant to be rescheduled, ensure they re-enter the pool of schedulable Cluster Checks. + config.ClusterCheck = true d.store.danglingConfigs[digest] = config danglingConfigs.Inc() } diff --git a/pkg/clusteragent/clusterchecks/dispatcher_test.go b/pkg/clusteragent/clusterchecks/dispatcher_test.go index 8acec1ce95983..bfb90b3f75752 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_test.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_test.go @@ -228,6 +228,56 @@ func TestExpireNodes(t *testing.T) { requireNotLocked(t, dispatcher.store) } +func TestRescheduleDanglingFromExpiredNodes(t *testing.T) { + // This test case can represent a rollout of the cluster check workers + dispatcher := newDispatcher() + + // Register a node with a correct status & schedule a Check + dispatcher.processNodeStatus("nodeA", types.NodeStatus{}) + dispatcher.Schedule([]integration.Config{ + generateIntegration("A")}) + + // Ensure it's dispatch correctly + allConfigs, err := dispatcher.getAllConfigs() + assert.NoError(t, err) + assert.Equal(t, 1, len(allConfigs)) + assert.Equal(t, []string{"A"}, extractCheckNames(allConfigs)) + + // Ensure it's running correctly + configsA, _, err := dispatcher.getNodeConfigs("nodeA") + assert.NoError(t, err) + assert.Equal(t, 1, len(configsA)) + + // Expire NodeA + dispatcher.store.nodes["nodeA"].heartbeat = timestampNow() - 35 + + // Assert config becomes dangling when we trigger a expireNodes. + assert.Equal(t, 0, len(dispatcher.store.danglingConfigs)) + dispatcher.expireNodes() + assert.Equal(t, 0, len(dispatcher.store.nodes)) + assert.Equal(t, 1, len(dispatcher.store.danglingConfigs)) + + requireNotLocked(t, dispatcher.store) + + // Register new node as healthy + dispatcher.processNodeStatus("nodeB", types.NodeStatus{}) + + // Ensure we have 1 dangling to schedule, as new available node is registered + assert.True(t, dispatcher.shouldDispatchDanling()) + configs := dispatcher.retrieveAndClearDangling() + // Assert the check is scheduled + dispatcher.Schedule(configs) + danglingConfig, err := dispatcher.getAllConfigs() + assert.NoError(t, err) + assert.Equal(t, 1, len(danglingConfig)) + assert.Equal(t, []string{"A"}, extractCheckNames(danglingConfig)) + + // Make sure make sure the dangling check is rescheduled on the new node + configsB, _, err := dispatcher.getNodeConfigs("nodeB") + assert.NoError(t, err) + assert.Equal(t, 1, len(configsB)) +} + func TestDispatchFourConfigsTwoNodes(t *testing.T) { dispatcher := newDispatcher() From 9cd2cea1a36046ef936570a837d31deb35643795 Mon Sep 17 00:00:00 2001 From: CharlyF Date: Thu, 14 Feb 2019 08:33:17 -0500 Subject: [PATCH 2/3] moved patch to dispatcher run --- pkg/clusteragent/clusterchecks/dispatcher_main.go | 11 ++++++++++- pkg/clusteragent/clusterchecks/dispatcher_nodes.go | 3 +-- pkg/clusteragent/clusterchecks/dispatcher_test.go | 2 +- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_main.go b/pkg/clusteragent/clusterchecks/dispatcher_main.go index 0c94f3415cbbb..c12ca4ffb2cac 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_main.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_main.go @@ -76,6 +76,14 @@ func (d *dispatcher) Unschedule(configs []integration.Config) { } } +// reschdule sends configurations to dispatching without checking or patching them as Schedule does. +func (d *dispatcher) reschedule(configs []integration.Config) { + for _, c := range configs { + log.Debugf("Rescheduling the check %s from %s", c.Name, c.Entity) + d.add(c) + } +} + // add stores and delegates a given configuration func (d *dispatcher) add(config integration.Config) { target := d.getLeastBusyNode() @@ -130,7 +138,8 @@ func (d *dispatcher) run(ctx context.Context) { // Re-dispatch dangling configs if d.shouldDispatchDanling() { - d.Schedule(d.retrieveAndClearDangling()) + danglingConfs := d.retrieveAndClearDangling() + d.reschedule(danglingConfs) } } } diff --git a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go index 733e163f32f58..31fea7bc1fbeb 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go @@ -105,8 +105,7 @@ func (d *dispatcher) expireNodes() { } for digest, config := range node.digestToConfig { delete(d.store.digestToNode, digest) - // Dangling configs are meant to be rescheduled, ensure they re-enter the pool of schedulable Cluster Checks. - config.ClusterCheck = true + log.Debugf("Adding %s:%s as a dangling Cluster Check config", config.Name, digest) d.store.danglingConfigs[digest] = config danglingConfigs.Inc() } diff --git a/pkg/clusteragent/clusterchecks/dispatcher_test.go b/pkg/clusteragent/clusterchecks/dispatcher_test.go index bfb90b3f75752..09e99aa4b9311 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_test.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_test.go @@ -266,7 +266,7 @@ func TestRescheduleDanglingFromExpiredNodes(t *testing.T) { assert.True(t, dispatcher.shouldDispatchDanling()) configs := dispatcher.retrieveAndClearDangling() // Assert the check is scheduled - dispatcher.Schedule(configs) + dispatcher.reschedule(configs) danglingConfig, err := dispatcher.getAllConfigs() assert.NoError(t, err) assert.Equal(t, 1, len(danglingConfig)) From 2e2b68715bfad228999e1a10d5859743c9742e09 Mon Sep 17 00:00:00 2001 From: CharlyF Date: Thu, 14 Feb 2019 08:48:06 -0500 Subject: [PATCH 3/3] use Digest to be consistent --- pkg/clusteragent/clusterchecks/dispatcher_main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_main.go b/pkg/clusteragent/clusterchecks/dispatcher_main.go index c12ca4ffb2cac..a65c7f96d78d1 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_main.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_main.go @@ -79,7 +79,7 @@ func (d *dispatcher) Unschedule(configs []integration.Config) { // reschdule sends configurations to dispatching without checking or patching them as Schedule does. func (d *dispatcher) reschedule(configs []integration.Config) { for _, c := range configs { - log.Debugf("Rescheduling the check %s from %s", c.Name, c.Entity) + log.Debugf("Rescheduling the check %s:%s", c.Name, c.Digest()) d.add(c) } }