diff --git a/.changelog/20348.txt b/.changelog/20348.txt new file mode 100644 index 000000000000..f8ac880234c3 --- /dev/null +++ b/.changelog/20348.txt @@ -0,0 +1,3 @@ +```release-note:bug +deployments: Fixed a goroutine leak when jobs are purged +``` diff --git a/nomad/deploymentwatcher/deployments_watcher.go b/nomad/deploymentwatcher/deployments_watcher.go index b32b9a30a9cb..8ad464e710c5 100644 --- a/nomad/deploymentwatcher/deployments_watcher.go +++ b/nomad/deploymentwatcher/deployments_watcher.go @@ -180,8 +180,7 @@ func (w *Watcher) watchDeployments(ctx context.Context) { // Update the latest index dindex = idx - // Ensure we are tracking the things we should and not tracking what we - // shouldn't be + // Ensure we are tracking only active deployments for _, d := range deployments { if d.Active() { if err := w.add(d); err != nil { @@ -191,6 +190,21 @@ func (w *Watcher) watchDeployments(ctx context.Context) { w.remove(d) } } + + // Ensure we're not tracking deployments that have been deleted because + // the job was purged + for _, watcher := range w.watchers { + var found bool + for _, d := range deployments { + if watcher.deploymentID == d.ID { + found = true + break + } + } + if !found { + w.removeByID(watcher.deploymentID) + } + } } } @@ -285,6 +299,10 @@ func (w *Watcher) addLocked(d *structs.Deployment) (*deploymentWatcher, error) { // remove stops watching a deployment. This can be because the deployment is // complete or being deleted. func (w *Watcher) remove(d *structs.Deployment) { + w.removeByID(d.ID) +} + +func (w *Watcher) removeByID(id string) { w.l.Lock() defer w.l.Unlock() @@ -293,9 +311,9 @@ func (w *Watcher) remove(d *structs.Deployment) { return } - if watcher, ok := w.watchers[d.ID]; ok { + if watcher, ok := w.watchers[id]; ok { watcher.StopWatch() - delete(w.watchers, d.ID) + delete(w.watchers, id) } } diff --git a/nomad/deploymentwatcher/deployments_watcher_test.go b/nomad/deploymentwatcher/deployments_watcher_test.go index cde0334bb088..a6b880441f71 100644 --- a/nomad/deploymentwatcher/deployments_watcher_test.go +++ b/nomad/deploymentwatcher/deployments_watcher_test.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" "github.com/stretchr/testify/assert" mocker "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -2081,3 +2082,53 @@ func watchersCount(w *Watcher) int { return len(w.watchers) } + +// TestWatcher_PurgeDeployment tests that we don't leak watchers if a job is purged +func TestWatcher_PurgeDeployment(t *testing.T) { + ci.Parallel(t) + w, m := defaultTestDeploymentWatcher(t) + + // clear UpdateDeploymentStatus default expectation + m.Mock.ExpectedCalls = nil + + // Create a job and a deployment + j := mock.Job() + d := mock.Deployment() + d.JobID = j.ID + must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j)) + must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d)) + + // require that we get a call to UpsertDeploymentStatusUpdate + matchConfig := &matchDeploymentStatusUpdateConfig{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusPaused, + StatusDescription: structs.DeploymentStatusDescriptionPaused, + } + matcher := matchDeploymentStatusUpdateRequest(matchConfig) + m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil) + + w.SetEnabled(true, m.state) + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if watchersCount(w) != 1 { + return fmt.Errorf("expected 1 deployment") + } + return nil + }), + wait.Attempts(100), + wait.Gap(10*time.Millisecond), + )) + + must.NoError(t, m.state.DeleteJob(m.nextIndex(), j.Namespace, j.ID)) + + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if watchersCount(w) != 0 { + return fmt.Errorf("expected deployment watcher to be stopped") + } + return nil + }), + wait.Attempts(500), + wait.Gap(10*time.Millisecond), + )) +}