Skip to content

Commit

Permalink
nsd: always set deregister flag after deregistration of group (#16289) (
Browse files Browse the repository at this point in the history
#16536)

(manual cherry-pick of ed498f8)

* services: always set deregister flag after deregistration of group

This PR fixes a bug where the group service hook's deregister flag was
not set in some cases, causing the hook to attempt deregistrations twice
during job updates (alloc replacement).

In the tests ... we used to assert on the wrong behvior (remove twice) which
has now been corrected to assert we remove only once.

This bug was "silent" in the Consul provider world because the error logs for
double deregistration only show up in Consul logs; with the Nomad provider the
error logs are in the Nomad agent logs.

* services: cleanup group service hook tests

Co-authored-by: Seth Hoenig <[email protected]>
  • Loading branch information
hc-github-team-nomad-core and shoenig authored Mar 17, 2023
1 parent 9a434bf commit 2598e4f
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 75 deletions.
3 changes: 3 additions & 0 deletions .changelog/16289.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
services: Fixed a bug where a service would be deregistered twice
```
62 changes: 36 additions & 26 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"sync"
"time"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -42,7 +43,7 @@ type groupServiceHook struct {
// and check registration and deregistration.
serviceRegWrapper *wrapper.HandlerWrapper

logger log.Logger
logger hclog.Logger

// The following fields may be updated
canary bool
Expand All @@ -59,11 +60,11 @@ type groupServiceHook struct {

type groupServiceHookConfig struct {
alloc *structs.Allocation
restarter agentconsul.WorkloadRestarter
restarter serviceregistration.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
networkStatusGetter networkStatusGetter
shutdownDelayCtx context.Context
logger log.Logger
logger hclog.Logger

// namespace is the Nomad or Consul namespace in which service
// registrations will be made.
Expand Down Expand Up @@ -120,23 +121,26 @@ func (h *groupServiceHook) Prerun() error {
h.prerun = true
h.mu.Unlock()
}()
return h.prerunLocked()
return h.preRunLocked()
}

func (h *groupServiceHook) prerunLocked() error {
// caller must hold h.lock
func (h *groupServiceHook) preRunLocked() error {
if len(h.services) == 0 {
return nil
}

services := h.getWorkloadServices()
services := h.getWorkloadServicesLocked()
return h.serviceRegWrapper.RegisterWorkload(services)
}

// Update is run when a job submitter modifies service(s) (but not much else -
// otherwise a full alloc replacement would occur).
func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.mu.Lock()
defer h.mu.Unlock()

oldWorkloadServices := h.getWorkloadServices()
oldWorkloadServices := h.getWorkloadServicesLocked()

// Store new updated values out of request
canary := false
Expand Down Expand Up @@ -168,7 +172,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
h.namespace = req.Alloc.ServiceProviderNamespace()

// Create new task services struct with those new values
newWorkloadServices := h.getWorkloadServices()
newWorkloadServices := h.getWorkloadServicesLocked()

if !h.prerun {
// Update called before Prerun. Update alloc and exit to allow
Expand All @@ -188,21 +192,20 @@ func (h *groupServiceHook) PreTaskRestart() error {
}()

h.preKillLocked()
return h.prerunLocked()
return h.preRunLocked()
}

func (h *groupServiceHook) PreKill() {
h.mu.Lock()
defer h.mu.Unlock()
h.preKillLocked()
helper.WithLock(&h.mu, h.preKillLocked)
}

// implements the PreKill hook but requires the caller hold the lock
// implements the PreKill hook
//
// caller must hold h.lock
func (h *groupServiceHook) preKillLocked() {
// If we have a shutdown delay deregister group services and then wait
// before continuing to kill tasks.
h.deregister()
h.deregistered = true
h.deregisterLocked()

if h.delay == 0 {
return
Expand All @@ -219,24 +222,31 @@ func (h *groupServiceHook) preKillLocked() {
}

func (h *groupServiceHook) Postrun() error {
h.mu.Lock()
defer h.mu.Unlock()

if !h.deregistered {
h.deregister()
}
helper.WithLock(&h.mu, h.deregisterLocked)
return nil
}

// deregister services from Consul.
func (h *groupServiceHook) deregister() {
// deregisterLocked will deregister services from Consul/Nomad service provider.
//
// caller must hold h.lock
func (h *groupServiceHook) deregisterLocked() {
if h.deregistered {
return
}

if len(h.services) > 0 {
workloadServices := h.getWorkloadServices()
workloadServices := h.getWorkloadServicesLocked()
h.serviceRegWrapper.RemoveWorkload(workloadServices)
}

h.deregistered = true
}

func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadServices {
// getWorkloadServicesLocked returns the set of workload services currently
// on the hook.
//
// caller must hold h.lock
func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.WorkloadServices {
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)

Expand Down
94 changes: 45 additions & 49 deletions client/allocrunner/groupservice_hook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
"github.com/shoenig/test/must"
)

var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil)
Expand Down Expand Up @@ -50,22 +50,21 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
must.NoError(t, h.Prerun())

req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
must.NoError(t, h.Update(req))

require.NoError(t, h.Postrun())
must.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())
must.NoError(t, h.PreTaskRestart())

ops := consulMockClient.GetOps()
require.Len(t, ops, 5)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
must.Len(t, 4, ops)
must.Eq(t, "add", ops[0].Op) // Prerun
must.Eq(t, "update", ops[1].Op) // Update
must.Eq(t, "remove", ops[2].Op) // Postrun
must.Eq(t, "add", ops[3].Op) // Restart -> preRun
}

// TestGroupServiceHook_ShutdownDelayUpdate asserts calling group service hooks
Expand All @@ -92,23 +91,23 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) {
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
must.NoError(t, h.Prerun())

// Incease shutdown Delay
alloc.Job.TaskGroups[0].ShutdownDelay = pointer.Of(15 * time.Second)
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
must.NoError(t, h.Update(req))

// Assert that update updated the delay value
require.Equal(t, h.delay, 15*time.Second)
must.Eq(t, h.delay, 15*time.Second)

// Remove shutdown delay
alloc.Job.TaskGroups[0].ShutdownDelay = nil
req = &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
must.NoError(t, h.Update(req))

// Assert that update updated the delay value
require.Equal(t, h.delay, 0*time.Second)
must.Eq(t, h.delay, 0*time.Second)
}

// TestGroupServiceHook_GroupServices asserts group service hooks with group
Expand All @@ -133,22 +132,21 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
must.NoError(t, h.Prerun())

req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
must.NoError(t, h.Update(req))

require.NoError(t, h.Postrun())
must.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())
must.NoError(t, h.PreTaskRestart())

ops := consulMockClient.GetOps()
require.Len(t, ops, 5)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
must.Len(t, 4, ops)
must.Eq(t, "add", ops[0].Op) // Prerun
must.Eq(t, "update", ops[1].Op) // Update
must.Eq(t, "remove", ops[2].Op) // Postrun
must.Eq(t, "add", ops[3].Op) // Restart -> preRun
}

// TestGroupServiceHook_GroupServices_Nomad asserts group service hooks with
Expand Down Expand Up @@ -179,25 +177,24 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) {
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
must.NoError(t, h.Prerun())

// Trigger our hook requests.
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
require.NoError(t, h.Postrun())
require.NoError(t, h.PreTaskRestart())
must.NoError(t, h.Update(req))
must.NoError(t, h.Postrun())
must.NoError(t, h.PreTaskRestart())

// Ensure the Nomad mock provider has the expected operations.
ops := nomadMockClient.GetOps()
require.Len(t, ops, 5)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
must.Len(t, 4, ops)
must.Eq(t, "add", ops[0].Op) // Prerun
must.Eq(t, "update", ops[1].Op) // Update
must.Eq(t, "remove", ops[2].Op) // Postrun
must.Eq(t, "add", ops[3].Op) // Restart -> preRun

// Ensure the Consul mock provider has zero operations.
require.Len(t, consulMockClient.GetOps(), 0)
must.SliceEmpty(t, consulMockClient.GetOps())
}

// TestGroupServiceHook_Error asserts group service hooks with group
Expand Down Expand Up @@ -234,22 +231,21 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) {
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
logger: logger,
})
require.NoError(t, h.Prerun())
must.NoError(t, h.Prerun())

req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
require.NoError(t, h.Update(req))
must.NoError(t, h.Update(req))

require.NoError(t, h.Postrun())
must.NoError(t, h.Postrun())

require.NoError(t, h.PreTaskRestart())
must.NoError(t, h.PreTaskRestart())

ops := consulMockClient.GetOps()
require.Len(t, ops, 5)
require.Equal(t, "add", ops[0].Op) // Prerun
require.Equal(t, "update", ops[1].Op) // Update
require.Equal(t, "remove", ops[2].Op) // Postrun
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
must.Len(t, 4, ops)
must.Eq(t, "add", ops[0].Op) // Prerun
must.Eq(t, "update", ops[1].Op) // Update
must.Eq(t, "remove", ops[2].Op) // Postrun
must.Eq(t, "add", ops[3].Op) // Restart -> preRun
}

func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
Expand Down Expand Up @@ -284,6 +280,6 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
logger: logger,
})

services := h.getWorkloadServices()
require.Len(t, services.Services, 1)
services := h.getWorkloadServicesLocked()
must.Len(t, 1, services.Services)
}
8 changes: 8 additions & 0 deletions helper/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"reflect"
"regexp"
"strings"
"sync"
"time"

multierror "github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -477,3 +478,10 @@ OUTER:
}
return true
}

// WithLock executes a function while holding a lock.
func WithLock(lock sync.Locker, f func()) {
lock.Lock()
defer lock.Unlock()
f()
}

0 comments on commit 2598e4f

Please sign in to comment.