Skip to content

Commit

Permalink
disconnected clients: ensure servers meet minimum required version (#…
Browse files Browse the repository at this point in the history
…12202)

* planner: expose ServerMeetsMinimumVersion via Planner interface
* filterByTainted: add flag indicating disconnect support
* allocReconciler: accept and pass disconnect support flag
* tests: update dependent tests
  • Loading branch information
DerekStrickland authored and DerekStrickland committed Mar 7, 2022
1 parent 060182e commit 0dd270c
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 106 deletions.
38 changes: 38 additions & 0 deletions nomad/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,44 @@ func TestServersMeetMinimumVersionIncludingFailed(t *testing.T) {
}
}

func TestServersMeetMinimumVersionSuffix(t *testing.T) {
t.Parallel()

cases := []struct {
members []serf.Member
ver *version.Version
expected bool
}{
// Multiple servers, meets req version
{
members: []serf.Member{
makeMember("1.3.0", serf.StatusAlive),
makeMember("1.2.6", serf.StatusAlive),
makeMember("1.2.6-dev", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("1.2.6-dev")),
expected: true,
},
// Multiple servers, doesn't meet req version
{
members: []serf.Member{
makeMember("1.1.18", serf.StatusAlive),
makeMember("1.2.6-dev", serf.StatusAlive),
makeMember("1.0.11", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("1.2.6-dev")),
expected: false,
},
}

for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, true)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
}
}

func makeMember(version string, status serf.MemberStatus) serf.Member {
return serf.Member{
Name: "foo",
Expand Down
8 changes: 8 additions & 0 deletions nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -580,6 +581,13 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua
return nil
}

// ServersMeetMinimumVersion allows implementations of the Scheduler interface in
// other packages to perform server version checks without direct references to
// the Nomad server.
func (w *Worker) ServersMeetMinimumVersion(minVersion *version.Version, checkFailedServers bool) bool {
return ServersMeetMinimumVersion(w.srv.Members(), minVersion, checkFailedServers)
}

// SubmitPlan is used to submit a plan for consideration. This allows
// the worker to act as the planner for the scheduler.
func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.State, error) {
Expand Down
8 changes: 7 additions & 1 deletion scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand Down Expand Up @@ -68,6 +69,9 @@ const (
maxPastRescheduleEvents = 5
)

// minVersionMaxClientDisconnect is the minimum version that supports max_client_disconnect.
var minVersionMaxClientDisconnect = version.Must(version.NewVersion("1.2.6"))

// SetStatusError is used to set the status of the evaluation to the given error
type SetStatusError struct {
Err error
Expand Down Expand Up @@ -369,7 +373,9 @@ func (s *GenericScheduler) computeJobAllocs() error {

reconciler := NewAllocReconciler(s.logger,
genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID),
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID, s.eval.Priority)
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID,
s.eval.Priority, s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true))

results := reconciler.Compute()
s.logger.Debug("reconciled current state with desired state", "results", log.Fmt("%#v", results))

Expand Down
35 changes: 20 additions & 15 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type allocReconciler struct {
evalID string
evalPriority int

// supportsDisconnectedClients indicates whether all servers meet the required
// minimum version to allow application of max_client_disconnect configuration.
supportsDisconnectedClients bool

// now is the time used when determining rescheduling eligibility
// defaults to time.Now, and overridden in unit tests
now time.Time
Expand Down Expand Up @@ -171,19 +175,20 @@ func (r *reconcileResults) Changes() int {
func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch bool,
jobID string, job *structs.Job, deployment *structs.Deployment,
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string,
evalPriority int) *allocReconciler {
evalPriority int, supportsDisconnectedClients bool) *allocReconciler {
return &allocReconciler{
logger: logger.Named("reconciler"),
allocUpdateFn: allocUpdateFn,
batch: batch,
jobID: jobID,
job: job,
deployment: deployment.Copy(),
existingAllocs: existingAllocs,
taintedNodes: taintedNodes,
evalID: evalID,
evalPriority: evalPriority,
now: time.Now(),
logger: logger.Named("reconciler"),
allocUpdateFn: allocUpdateFn,
batch: batch,
jobID: jobID,
job: job,
deployment: deployment.Copy(),
existingAllocs: existingAllocs,
taintedNodes: taintedNodes,
evalID: evalID,
evalPriority: evalPriority,
supportsDisconnectedClients: supportsDisconnectedClients,
now: time.Now(),
result: &reconcileResults{
attributeUpdates: make(map[string]*structs.Allocation),
disconnectUpdates: make(map[string]*structs.Allocation),
Expand Down Expand Up @@ -339,7 +344,7 @@ func (a *allocReconciler) handleStop(m allocMatrix) {
// filterAndStopAll stops all allocations in an allocSet. This is useful in when
// stopping an entire job or task group.
func (a *allocReconciler) filterAndStopAll(set allocSet) uint64 {
untainted, migrate, lost, disconnecting, reconnecting := set.filterByTainted(a.taintedNodes)
untainted, migrate, lost, disconnecting, reconnecting := set.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients)
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
Expand Down Expand Up @@ -401,7 +406,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
canaries, all := a.cancelUnneededCanaries(all, desiredChanges)

// Determine what set of allocations are on tainted nodes
untainted, migrate, lost, disconnecting, reconnecting := all.filterByTainted(a.taintedNodes)
untainted, migrate, lost, disconnecting, reconnecting := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients)

// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment)
Expand Down Expand Up @@ -604,7 +609,7 @@ func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *s
}

canaries = all.fromKeys(canaryIDs)
untainted, migrate, lost, _, _ := canaries.filterByTainted(a.taintedNodes)
untainted, migrate, lost, _, _ := canaries.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients)
a.markStop(migrate, "", allocMigrating)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)

Expand Down
Loading

0 comments on commit 0dd270c

Please sign in to comment.