diff --git a/.changelog/16401.txt b/.changelog/16401.txt new file mode 100644 index 00000000000..a737f43e2b9 --- /dev/null +++ b/.changelog/16401.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fixed a bug where collisions in dynamic port offerings would result in spurious plan-for-node-rejected errors +``` diff --git a/nomad/structs/bitmap.go b/nomad/structs/bitmap.go index 63758a0be67..5763ce3a964 100644 --- a/nomad/structs/bitmap.go +++ b/nomad/structs/bitmap.go @@ -1,6 +1,10 @@ package structs -import "fmt" +import ( + "fmt" + + "golang.org/x/exp/slices" +) // Bitmap is a simple uncompressed bitmap type Bitmap []byte @@ -76,3 +80,25 @@ func (b Bitmap) IndexesInRange(set bool, from, to uint) []int { return indexes } + +// IndexesInRangeFiltered returns the indexes in which the values are either set +// or unset based on the passed parameter in the passed range, and do not appear +// in the filter slice +func (b Bitmap) IndexesInRangeFiltered(set bool, from, to uint, filter []int) []int { + var indexes []int + for i := from; i <= to && i < b.Size(); i++ { + c := b.Check(i) + if c == set { + if len(filter) < 1 || !slices.Contains(filter, int(i)) { + indexes = append(indexes, int(i)) + } + } + } + + return indexes +} + +// String represents the Bitmap the same as slice of the Bitmap's set values +func (b Bitmap) String() string { + return fmt.Sprintf("%v", b.IndexesInRange(true, 0, b.Size())) +} diff --git a/nomad/structs/network.go b/nomad/structs/network.go index 761c1559a5f..36169303c1c 100644 --- a/nomad/structs/network.go +++ b/nomad/structs/network.go @@ -7,6 +7,7 @@ import ( "sync" "golang.org/x/exp/maps" + "golang.org/x/exp/slices" ) const ( @@ -502,6 +503,7 @@ func incIP(ip net.IP) { // AssignTaskNetwork supports the deprecated task.resources.network block. func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, error) { var offer AllocatedPorts + var portsInOffer []int // index of host network name to slice of reserved ports, used during dynamic port assignment reservedIdx := map[string][]Port{} @@ -543,6 +545,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro } offer = append(offer, *allocPort) + portsInOffer = append(portsInOffer, allocPort.Value) } for _, port := range ask.DynamicPorts { @@ -554,10 +557,14 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro // lower memory usage. var dynPorts []int // TODO: its more efficient to find multiple dynamic ports at once - dynPorts, addrErr = getDynamicPortsStochastic(used, idx.MinDynamicPort, idx.MaxDynamicPort, reservedIdx[port.HostNetwork], 1) + dynPorts, addrErr = getDynamicPortsStochastic( + used, portsInOffer, idx.MinDynamicPort, idx.MaxDynamicPort, + reservedIdx[port.HostNetwork], 1) if addrErr != nil { // Fall back to the precise method if the random sampling failed. - dynPorts, addrErr = getDynamicPortsPrecise(used, idx.MinDynamicPort, idx.MaxDynamicPort, reservedIdx[port.HostNetwork], 1) + dynPorts, addrErr = getDynamicPortsPrecise(used, portsInOffer, + idx.MinDynamicPort, idx.MaxDynamicPort, + reservedIdx[port.HostNetwork], 1) if addrErr != nil { continue } @@ -583,6 +590,7 @@ func (idx *NetworkIndex) AssignPorts(ask *NetworkResource) (AllocatedPorts, erro return nil, fmt.Errorf("no addresses available for %s network", port.HostNetwork) } offer = append(offer, *allocPort) + portsInOffer = append(portsInOffer, allocPort.Value) } return offer, nil @@ -641,13 +649,15 @@ func (idx *NetworkIndex) AssignTaskNetwork(ask *NetworkResource) (out *NetworkRe // lower memory usage. var dynPorts []int var dynErr error - dynPorts, dynErr = getDynamicPortsStochastic(used, idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts)) + dynPorts, dynErr = getDynamicPortsStochastic(used, nil, + idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts)) if dynErr == nil { goto BUILD_OFFER } // Fall back to the precise method if the random sampling failed. - dynPorts, dynErr = getDynamicPortsPrecise(used, idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts)) + dynPorts, dynErr = getDynamicPortsPrecise(used, nil, + idx.MinDynamicPort, idx.MaxDynamicPort, ask.ReservedPorts, len(ask.DynamicPorts)) if dynErr != nil { err = dynErr return @@ -673,10 +683,11 @@ func (idx *NetworkIndex) AssignTaskNetwork(ask *NetworkResource) (out *NetworkRe } // getDynamicPortsPrecise takes the nodes used port bitmap which may be nil if -// no ports have been allocated yet, the network ask and returns a set of unused -// ports to fulfil the ask's DynamicPorts or an error if it failed. An error -// means the ask can not be satisfied as the method does a precise search. -func getDynamicPortsPrecise(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int, reserved []Port, numDyn int) ([]int, error) { +// no ports have been allocated yet, any ports already offered in the caller, +// and the network ask. It returns a set of unused ports to fulfil the ask's +// DynamicPorts or an error if it failed. An error means the ask can not be +// satisfied as the method does a precise search. +func getDynamicPortsPrecise(nodeUsed Bitmap, portsInOffer []int, minDynamicPort, maxDynamicPort int, reserved []Port, numDyn int) ([]int, error) { // Create a copy of the used ports and apply the new reserves var usedSet Bitmap var err error @@ -696,8 +707,10 @@ func getDynamicPortsPrecise(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int, usedSet.Set(uint(port.Value)) } - // Get the indexes of the unset - availablePorts := usedSet.IndexesInRange(false, uint(minDynamicPort), uint(maxDynamicPort)) + // Get the indexes of the unset ports, less those which have already been + // picked as part of this offer + availablePorts := usedSet.IndexesInRangeFiltered( + false, uint(minDynamicPort), uint(maxDynamicPort), portsInOffer) // Randomize the amount we need if len(availablePorts) < numDyn { @@ -713,12 +726,13 @@ func getDynamicPortsPrecise(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int, return availablePorts[:numDyn], nil } -// getDynamicPortsStochastic takes the nodes used port bitmap which may be nil if -// no ports have been allocated yet, the network ask and returns a set of unused -// ports to fulfil the ask's DynamicPorts or an error if it failed. An error -// does not mean the ask can not be satisfied as the method has a fixed amount -// of random probes and if these fail, the search is aborted. -func getDynamicPortsStochastic(nodeUsed Bitmap, minDynamicPort, maxDynamicPort int, reservedPorts []Port, count int) ([]int, error) { +// getDynamicPortsStochastic takes the nodes used port bitmap which may be nil +// if no ports have been allocated yet, any ports already offered in the caller, +// and the network ask. It returns a set of unused ports to fulfil the ask's +// DynamicPorts or an error if it failed. An error does not mean the ask can not +// be satisfied as the method has a fixed amount of random probes and if these +// fail, the search is aborted. +func getDynamicPortsStochastic(nodeUsed Bitmap, portsInOffer []int, minDynamicPort, maxDynamicPort int, reservedPorts []Port, count int) ([]int, error) { var reserved, dynamic []int for _, port := range reservedPorts { reserved = append(reserved, port.Value) @@ -742,6 +756,12 @@ func getDynamicPortsStochastic(nodeUsed Bitmap, minDynamicPort, maxDynamicPort i goto PICK } } + // the pick conflicted with a previous pick that hasn't been saved to + // the index yet + if slices.Contains(portsInOffer, randPort) { + goto PICK + } + dynamic = append(dynamic, randPort) } diff --git a/nomad/structs/network_test.go b/nomad/structs/network_test.go index 9dcfd91d747..ba3fdbc63ce 100644 --- a/nomad/structs/network_test.go +++ b/nomad/structs/network_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -317,6 +318,72 @@ func TestNetworkIndex_yieldIP(t *testing.T) { } } +// TestNetworkIndex_AssignPorts exercises assigning ports on group networks. +func TestNetworkIndex_AssignPorts(t *testing.T) { + ci.Parallel(t) + + // Create a node that only has one free port + idx := NewNetworkIndex() + n := &Node{ + NodeResources: &NodeResources{ + Networks: []*NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + IP: "192.168.0.100", + MBits: 1000, + }, + }, + NodeNetworks: []*NodeNetworkResource{ + { + Mode: "host", + Device: "eth0", + Speed: 1000, + Addresses: []NodeNetworkAddress{ + { + Alias: "default", + Address: "192.168.0.100", + Family: NodeNetworkAF_IPv4, + }, + }, + }, + }, + }, + ReservedResources: &NodeReservedResources{ + Networks: NodeReservedNetworkResources{ + ReservedHostPorts: fmt.Sprintf("%d-%d", idx.MinDynamicPort, idx.MaxDynamicPort-2), + }, + }, + } + + idx.SetNode(n) + + // Ask for 2 dynamic ports + ask := &NetworkResource{ + ReservedPorts: []Port{{"static", 443, 443, "default"}}, + DynamicPorts: []Port{{"http", 0, 80, "default"}, {"admin", 0, 8080, "default"}}, + } + offer, err := idx.AssignPorts(ask) + must.NoError(t, err) + must.NotNil(t, offer, must.Sprint("did not get an offer")) + + staticPortMapping, ok := offer.Get("static") + must.True(t, ok) + + httpPortMapping, ok := offer.Get("http") + must.True(t, ok) + + adminPortMapping, ok := offer.Get("admin") + must.True(t, ok) + + must.NotEq(t, httpPortMapping.Value, adminPortMapping.Value, + must.Sprint("assigned dynamic ports must not conflict")) + + must.Eq(t, 443, staticPortMapping.Value) + must.Between(t, idx.MaxDynamicPort-1, httpPortMapping.Value, idx.MaxDynamicPort) + must.Between(t, idx.MaxDynamicPort-1, adminPortMapping.Value, idx.MaxDynamicPort) +} + func TestNetworkIndex_AssignTaskNetwork(t *testing.T) { ci.Parallel(t) idx := NewNetworkIndex() @@ -439,32 +506,27 @@ func TestNetworkIndex_AssignTaskNetwork_Dynamic_Contention(t *testing.T) { }, ReservedResources: &NodeReservedResources{ Networks: NodeReservedNetworkResources{ - ReservedHostPorts: fmt.Sprintf("%d-%d", idx.MinDynamicPort, idx.MaxDynamicPort-1), + ReservedHostPorts: fmt.Sprintf("%d-%d", idx.MinDynamicPort, idx.MaxDynamicPort-2), }, }, } + idx.SetNode(n) - // Ask for dynamic ports + // Ask for 2 dynamic ports ask := &NetworkResource{ - DynamicPorts: []Port{{"http", 0, 80, ""}}, + DynamicPorts: []Port{{"http", 0, 80, ""}, {"admin", 0, 443, ""}}, } offer, err := idx.AssignTaskNetwork(ask) - if err != nil { - t.Fatalf("err: %v", err) - } - if offer == nil { - t.Fatalf("bad") - } - if offer.IP != "192.168.0.100" { - t.Fatalf("bad: %#v", offer) - } - if len(offer.DynamicPorts) != 1 { - t.Fatalf("There should be one dynamic ports") - } - if p := offer.DynamicPorts[0].Value; p != idx.MaxDynamicPort { - t.Fatalf("Dynamic Port: should have been assigned %d; got %d", p, idx.MaxDynamicPort) - } + must.NoError(t, err) + must.NotNil(t, offer, must.Sprint("did not get an offer")) + must.Eq(t, "192.168.0.100", offer.IP) + must.Len(t, 2, offer.DynamicPorts, must.Sprint("There should be one dynamic ports")) + + must.NotEq(t, offer.DynamicPorts[0].Value, offer.DynamicPorts[1].Value, + must.Sprint("assigned dynamic ports must not conflict")) + must.Between(t, idx.MaxDynamicPort-1, offer.DynamicPorts[0].Value, idx.MaxDynamicPort) + must.Between(t, idx.MaxDynamicPort-1, offer.DynamicPorts[1].Value, idx.MaxDynamicPort) } // COMPAT(0.11): Remove in 0.11 diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 13e32303810..4b2dc87c12b 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -792,6 +792,10 @@ func TestServiceSched_Spread(t *testing.T) { if i%2 == 0 { node.Datacenter = "dc2" } + // setting a narrow range makes it more likely for this test to + // hit bugs in NetworkIndex + node.NodeResources.MinDynamicPort = 20000 + node.NodeResources.MaxDynamicPort = 20005 nodes = append(nodes, node) assert.Nil(h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node), "UpsertNode") nodeMap[node.ID] = node diff --git a/scheduler/rank.go b/scheduler/rank.go index 6aa32afcfd6..eadb465d86b 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -199,7 +199,8 @@ OUTER: return nil } - // Get the proposed allocations + // Get the allocations that already exist on the node + those allocs + // that have been placed as part of this same evaluation proposed, err := option.ProposedAllocs(iter.ctx) if err != nil { iter.ctx.Logger().Named("binpack").Error("failed retrieving proposed allocations", "error", err) @@ -400,7 +401,6 @@ OUTER: continue OUTER } } - // Reserve this to prevent another task from colliding netIdx.AddReserved(offer)