Skip to content

Commit

Permalink
moved patch to dispatcher run
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlyF committed Feb 14, 2019
1 parent 7e73994 commit 9cd2cea
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 4 deletions.
11 changes: 10 additions & 1 deletion pkg/clusteragent/clusterchecks/dispatcher_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ func (d *dispatcher) Unschedule(configs []integration.Config) {
}
}

// reschdule sends configurations to dispatching without checking or patching them as Schedule does.
func (d *dispatcher) reschedule(configs []integration.Config) {
for _, c := range configs {
log.Debugf("Rescheduling the check %s from %s", c.Name, c.Entity)
d.add(c)
}
}

// add stores and delegates a given configuration
func (d *dispatcher) add(config integration.Config) {
target := d.getLeastBusyNode()
Expand Down Expand Up @@ -130,7 +138,8 @@ func (d *dispatcher) run(ctx context.Context) {

// Re-dispatch dangling configs
if d.shouldDispatchDanling() {
d.Schedule(d.retrieveAndClearDangling())
danglingConfs := d.retrieveAndClearDangling()
d.reschedule(danglingConfs)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/clusteragent/clusterchecks/dispatcher_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func (d *dispatcher) expireNodes() {
}
for digest, config := range node.digestToConfig {
delete(d.store.digestToNode, digest)
// Dangling configs are meant to be rescheduled, ensure they re-enter the pool of schedulable Cluster Checks.
config.ClusterCheck = true
log.Debugf("Adding %s:%s as a dangling Cluster Check config", config.Name, digest)
d.store.danglingConfigs[digest] = config
danglingConfigs.Inc()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/clusterchecks/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestRescheduleDanglingFromExpiredNodes(t *testing.T) {
assert.True(t, dispatcher.shouldDispatchDanling())
configs := dispatcher.retrieveAndClearDangling()
// Assert the check is scheduled
dispatcher.Schedule(configs)
dispatcher.reschedule(configs)
danglingConfig, err := dispatcher.getAllConfigs()
assert.NoError(t, err)
assert.Equal(t, 1, len(danglingConfig))
Expand Down

0 comments on commit 9cd2cea

Please sign in to comment.