diff --git a/cmd/agent.go b/cmd/agent.go index 3e2a3a234..ccb2ae5dc 100644 --- a/cmd/agent.go +++ b/cmd/agent.go @@ -20,7 +20,7 @@ var agent *dkron.Agent const ( // gracefulTimeout controls how long we wait before forcefully terminating - gracefulTimeout = 3 * time.Second + gracefulTimeout = 3 * time.Hour ) // agentCmd represents the agent command @@ -110,18 +110,30 @@ WAIT: } // Attempt a graceful leave - gracefulCh := make(chan struct{}) log.Info("agent: Gracefully shutting down agent...") go func() { - plugin.CleanupClients() if err := agent.Stop(); err != nil { fmt.Printf("Error: %s", err) log.Error(fmt.Sprintf("Error: %s", err)) return } - close(gracefulCh) }() + gracefulCh := make(chan struct{}) + + for { + log.Info("Waiting for jobs to finish...") + if agent.GetRunningJobs() < 1 { + log.Info("No jobs left. Exiting.") + break + } + time.Sleep(1 * time.Second) + } + + plugin.CleanupClients() + + close(gracefulCh) + // Wait for leave or another signal select { case <-signalCh: diff --git a/dkron/agent.go b/dkron/agent.go index 05c94b432..6958774ff 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -881,3 +881,13 @@ func (a *Agent) applySetJob(job *proto.Job) error { func (a *Agent) RaftApply(cmd []byte) raft.ApplyFuture { return a.raft.Apply(cmd, raftTimeout) } + +// GetRunningJobs returns amount of active jobs +func (a *Agent) GetRunningJobs() int { + job := 0 + runningExecutions.Range(func(k, v interface{}) bool { + job = job + 1 + return true + }) + return job +}