Skip to content

Commit

Permalink
Ensure dangling cluster checks can be re-scheduled (#3035)
Browse files Browse the repository at this point in the history
* ensure dangling cluster checks can be re-scheduled
  • Loading branch information
Charly Fontaine committed Feb 14, 2019
1 parent c877f9e commit 52053af
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 1 deletion.
11 changes: 10 additions & 1 deletion pkg/clusteragent/clusterchecks/dispatcher_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ func (d *dispatcher) Unschedule(configs []integration.Config) {
}
}

// reschdule sends configurations to dispatching without checking or patching them as Schedule does.
func (d *dispatcher) reschedule(configs []integration.Config) {
for _, c := range configs {
log.Debugf("Rescheduling the check %s:%s", c.Name, c.Digest())
d.add(c)
}
}

// add stores and delegates a given configuration
func (d *dispatcher) add(config integration.Config) {
target := d.getLeastBusyNode()
Expand Down Expand Up @@ -130,7 +138,8 @@ func (d *dispatcher) run(ctx context.Context) {

// Re-dispatch dangling configs
if d.shouldDispatchDanling() {
d.Schedule(d.retrieveAndClearDangling())
danglingConfs := d.retrieveAndClearDangling()
d.reschedule(danglingConfs)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/clusteragent/clusterchecks/dispatcher_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (d *dispatcher) expireNodes() {
}
for digest, config := range node.digestToConfig {
delete(d.store.digestToNode, digest)
log.Debugf("Adding %s:%s as a dangling Cluster Check config", config.Name, digest)
d.store.danglingConfigs[digest] = config
danglingConfigs.Inc()
}
Expand Down
50 changes: 50 additions & 0 deletions pkg/clusteragent/clusterchecks/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,56 @@ func TestExpireNodes(t *testing.T) {
requireNotLocked(t, dispatcher.store)
}

func TestRescheduleDanglingFromExpiredNodes(t *testing.T) {
// This test case can represent a rollout of the cluster check workers
dispatcher := newDispatcher()

// Register a node with a correct status & schedule a Check
dispatcher.processNodeStatus("nodeA", types.NodeStatus{})
dispatcher.Schedule([]integration.Config{
generateIntegration("A")})

// Ensure it's dispatch correctly
allConfigs, err := dispatcher.getAllConfigs()
assert.NoError(t, err)
assert.Equal(t, 1, len(allConfigs))
assert.Equal(t, []string{"A"}, extractCheckNames(allConfigs))

// Ensure it's running correctly
configsA, _, err := dispatcher.getNodeConfigs("nodeA")
assert.NoError(t, err)
assert.Equal(t, 1, len(configsA))

// Expire NodeA
dispatcher.store.nodes["nodeA"].heartbeat = timestampNow() - 35

// Assert config becomes dangling when we trigger a expireNodes.
assert.Equal(t, 0, len(dispatcher.store.danglingConfigs))
dispatcher.expireNodes()
assert.Equal(t, 0, len(dispatcher.store.nodes))
assert.Equal(t, 1, len(dispatcher.store.danglingConfigs))

requireNotLocked(t, dispatcher.store)

// Register new node as healthy
dispatcher.processNodeStatus("nodeB", types.NodeStatus{})

// Ensure we have 1 dangling to schedule, as new available node is registered
assert.True(t, dispatcher.shouldDispatchDanling())
configs := dispatcher.retrieveAndClearDangling()
// Assert the check is scheduled
dispatcher.reschedule(configs)
danglingConfig, err := dispatcher.getAllConfigs()
assert.NoError(t, err)
assert.Equal(t, 1, len(danglingConfig))
assert.Equal(t, []string{"A"}, extractCheckNames(danglingConfig))

// Make sure make sure the dangling check is rescheduled on the new node
configsB, _, err := dispatcher.getNodeConfigs("nodeB")
assert.NoError(t, err)
assert.Equal(t, 1, len(configsB))
}

func TestDispatchFourConfigsTwoNodes(t *testing.T) {
dispatcher := newDispatcher()

Expand Down

0 comments on commit 52053af

Please sign in to comment.