Skip to content

Commit

Permalink
make version checks specific to region
Browse files Browse the repository at this point in the history
* One-time tokens are not replicated between regions, so we don't want to enforce
  that the version check across all of serf, just members in the same region.
* Cleans up a bunch of legacy checks.

This changeset is specific to 1.2.x and the changes for other versions of
Nomad will be manually backported in separate PRs
  • Loading branch information
tgross committed Oct 17, 2022
1 parent ef13918 commit 77e9a4e
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 42 deletions.
3 changes: 3 additions & 0 deletions .changelog/14910.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
acl: Fixed a bug where Nomad version checking for one-time tokens was enforced across regions
```
6 changes: 3 additions & 3 deletions nomad/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ func (a *ACL) UpsertOneTimeToken(args *structs.OneTimeTokenUpsertRequest, reply
defer metrics.MeasureSince(
[]string{"nomad", "acl", "upsert_one_time_token"}, time.Now())

if !ServersMeetMinimumVersion(a.srv.Members(), minOneTimeAuthenticationTokenVersion, false) {
if !ServersMeetMinimumVersion(a.srv.Members(), a.srv.Region(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minAutopilotVersion)
}

Expand Down Expand Up @@ -903,7 +903,7 @@ func (a *ACL) ExchangeOneTimeToken(args *structs.OneTimeTokenExchangeRequest, re
defer metrics.MeasureSince(
[]string{"nomad", "acl", "exchange_one_time_token"}, time.Now())

if !ServersMeetMinimumVersion(a.srv.Members(), minOneTimeAuthenticationTokenVersion, false) {
if !ServersMeetMinimumVersion(a.srv.Members(), a.srv.Region(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minAutopilotVersion)
}

Expand Down Expand Up @@ -960,7 +960,7 @@ func (a *ACL) ExpireOneTimeTokens(args *structs.OneTimeTokenExpireRequest, reply
defer metrics.MeasureSince(
[]string{"nomad", "acl", "expire_one_time_tokens"}, time.Now())

if !ServersMeetMinimumVersion(a.srv.Members(), minOneTimeAuthenticationTokenVersion, false) {
if !ServersMeetMinimumVersion(a.srv.Members(), a.srv.Region(), minOneTimeAuthenticationTokenVersion, false) {
return fmt.Errorf("All servers should be running version %v or later to use one-time authentication tokens", minAutopilotVersion)
}

Expand Down
2 changes: 1 addition & 1 deletion nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ OUTER:
func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) error {
// For old clusters, send single deregistration messages COMPAT(0.11)
minVersionBatchNodeDeregister := version.Must(version.NewVersion("0.9.4"))
if !ServersMeetMinimumVersion(c.srv.Members(), minVersionBatchNodeDeregister, true) {
if !ServersMeetMinimumVersion(c.srv.Members(), c.srv.Region(), minVersionBatchNodeDeregister, true) {
for _, id := range nodeIDs {
req := structs.NodeDeregisterRequest{
NodeID: id,
Expand Down
4 changes: 2 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis

// COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check to always set args.Eval
// 0.12.1 introduced atomic eval job registration
if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
if eval != nil && ServersMeetMinimumVersion(j.srv.Members(), j.srv.Region(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
submittedEval = true
}
Expand Down Expand Up @@ -881,7 +881,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
}

// COMPAT(1.1.0): remove conditional and always set args.Eval
if ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) {
if ServersMeetMinimumVersion(j.srv.Members(), j.srv.Region(), minJobRegisterAtomicEvalVersion, false) {
args.Eval = eval
}

Expand Down
8 changes: 4 additions & 4 deletions nomad/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobCSIVolumeClaimGC, index))
}
case <-oneTimeTokenGC.C:
if !ServersMeetMinimumVersion(s.Members(), minOneTimeAuthenticationTokenVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minOneTimeAuthenticationTokenVersion, false) {
continue
}

Expand Down Expand Up @@ -1578,7 +1578,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
return config
}

if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), AllRegions, minAutopilotVersion, false) {
s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion)
return nil
}
Expand All @@ -1605,7 +1605,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {
if config != nil {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minSchedulerConfigVersion, false) {
s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion)
return nil
}
Expand All @@ -1620,7 +1620,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {
}

func (s *Server) generateClusterID() (string, error) {
if !ServersMeetMinimumVersion(s.Members(), minClusterIDVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), AllRegions, minClusterIDVersion, false) {
s.logger.Named("core").Warn("cannot initialize cluster ID until all servers are above minimum version", "min_version", minClusterIDVersion)
return "", errors.Errorf("cluster ID cannot be created until all servers are above minimum version %s", minClusterIDVersion)
}
Expand Down
4 changes: 2 additions & 2 deletions nomad/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
}

// All servers should be at or above 0.8.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) {
if !ServersMeetMinimumVersion(op.srv.Members(), op.srv.Region(), minAutopilotVersion, false) {
return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion)
}

Expand Down Expand Up @@ -316,7 +316,7 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
}

// All servers should be at or above 0.9.0 to apply this operation
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) {
if !ServersMeetMinimumVersion(op.srv.Members(), op.srv.Region(), minSchedulerConfigVersion, false) {
return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion)
}

Expand Down
2 changes: 1 addition & 1 deletion nomad/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
eval.ModifyIndex = index

// COMPAT(1.1): Remove in 1.1.0 - 0.12.1 introduced atomic eval job registration
if !ServersMeetMinimumVersion(s.Members(), minJobRegisterAtomicEvalVersion, false) {
if !ServersMeetMinimumVersion(s.Members(), s.Region(), minJobRegisterAtomicEvalVersion, false) {
// Create a new evaluation
eval.JobModifyIndex = index
update := &structs.EvalUpdateRequest{
Expand Down
2 changes: 1 addition & 1 deletion nomad/plan_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
now := time.Now().UTC().UnixNano()

if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) {
if ServersMeetMinimumVersion(p.Members(), p.Region(), MinVersionPlanNormalization, true) {
// Initialize the allocs request using the new optimized log entry format.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
Expand Down
34 changes: 9 additions & 25 deletions nomad/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/serf/serf"
"golang.org/x/exp/slices"
)

// MinVersionPlanNormalization is the minimum version to support the
Expand Down Expand Up @@ -149,15 +150,20 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
return true, parts
}

const AllRegions = ""

// ServersMeetMinimumVersion returns whether the Nomad servers are at least on the
// given Nomad version. The checkFailedServers parameter specifies whether version
// for the failed servers should be verified.
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version, checkFailedServers bool) bool {
func ServersMeetMinimumVersion(members []serf.Member, region string, minVersion *version.Version, checkFailedServers bool) bool {
for _, member := range members {
if valid, parts := isNomadServer(member); valid && (parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) {
valid, parts := isNomadServer(member)
if valid &&
(parts.Region == region || region == AllRegions) &&
(parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) {
// Check if the versions match - version.LessThan will return true for
// 0.8.0-rc1 < 0.8.0, so we want to ignore the metadata
versionsMatch := slicesMatch(minVersion.Segments(), parts.Build.Segments())
versionsMatch := slices.Equal(minVersion.Segments(), parts.Build.Segments())
if parts.Build.LessThan(minVersion) && !versionsMatch {
return false
}
Expand All @@ -167,28 +173,6 @@ func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Versio
return true
}

func slicesMatch(a, b []int) bool {
if a == nil && b == nil {
return true
}

if a == nil || b == nil {
return false
}

if len(a) != len(b) {
return false
}

for i := range a {
if a[i] != b[i] {
return false
}
}

return true
}

// shuffleStrings randomly shuffles the list of strings
func shuffleStrings(list []string) {
for i := range list {
Expand Down
42 changes: 40 additions & 2 deletions nomad/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestServersMeetMinimumVersionExcludingFailed(t *testing.T) {
}

for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, false)
result := ServersMeetMinimumVersion(tc.members, AllRegions, tc.ver, false)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
Expand Down Expand Up @@ -188,7 +188,45 @@ func TestServersMeetMinimumVersionIncludingFailed(t *testing.T) {
}

for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, true)
result := ServersMeetMinimumVersion(tc.members, AllRegions, tc.ver, true)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
}
}

func TestServersMeetMinimumVersionSuffix(t *testing.T) {
t.Parallel()

cases := []struct {
members []serf.Member
ver *version.Version
expected bool
}{
// Multiple servers, meets req version
{
members: []serf.Member{
makeMember("1.3.0", serf.StatusAlive),
makeMember("1.2.6", serf.StatusAlive),
makeMember("1.2.6-dev", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("1.2.6-dev")),
expected: true,
},
// Multiple servers, doesn't meet req version
{
members: []serf.Member{
makeMember("1.1.18", serf.StatusAlive),
makeMember("1.2.6-dev", serf.StatusAlive),
makeMember("1.0.11", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("1.2.6-dev")),
expected: false,
},
}

for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, AllRegions, tc.ver, true)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
Expand Down
2 changes: 1 addition & 1 deletion nomad/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.
plan.SnapshotIndex = w.snapshotIndex

// Normalize stopped and preempted allocs before RPC
normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanNormalization, true)
normalizePlan := ServersMeetMinimumVersion(w.srv.Members(), w.srv.Region(), MinVersionPlanNormalization, true)
if normalizePlan {
plan.NormalizeAllocations()
}
Expand Down

0 comments on commit 77e9a4e

Please sign in to comment.