Skip to content

Commit

Permalink
Merge pull request #2363 from pradipd/windows_routingmesh
Browse files Browse the repository at this point in the history
Enabling ILB/ELB on windows using per-node, per-network LB endpoint.
  • Loading branch information
nishanttotla authored Sep 14, 2017
2 parents 99d44a9 + 9fa9ce1 commit bd7bafb
Show file tree
Hide file tree
Showing 6 changed files with 626 additions and 225 deletions.
256 changes: 160 additions & 96 deletions api/objects.pb.go

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions api/objects.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ message Node {
// component, if the node is a manager.
ManagerStatus manager_status = 6;

// DEPRECATED: Use lb_attachments to find the ingress network
// The node attachment to the ingress network.
NetworkAttachment attachment = 7;
NetworkAttachment attachment = 7 [deprecated=true];

// Certificate is the TLS certificate issued for the node, if any.
Certificate certificate = 8 [(gogoproto.nullable) = false];
Expand All @@ -75,6 +76,11 @@ message Node {
// shows the privilege level that the CA would currently grant when
// issuing or renewing the node's certificate.
NodeRole role = 9;

// Each node uses the network attachment to set up an endpoint on the
// node to be used for load balancing. Each overlay network, including
// ingress network, will have an NetworkAttachment.
repeated NetworkAttachment lb_attachments = 10;
}

message Service {
Expand Down Expand Up @@ -257,7 +263,7 @@ message NetworkAttachment {

// List of aliases by which a task is resolved in a network
repeated string aliases = 3;

// Map of all the driver attachment options for this network
map<string,string> driver_attachment_opts = 4;
}
Expand Down
188 changes: 188 additions & 0 deletions manager/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,157 @@ func TestNoDuplicateIPs(t *testing.T) {
}
}

func TestNodeAllocator(t *testing.T) {
s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()

a, err := New(s, nil)
assert.NoError(t, err)
assert.NotNil(t, a)

var node1FromStore *api.Node
node1 := &api.Node{
ID: "nodeID1",
}

// Try adding some objects to store before allocator is started
assert.NoError(t, s.Update(func(tx store.Tx) error {
// populate ingress network
in := &api.Network{
ID: "ingress",
Spec: api.NetworkSpec{
Annotations: api.Annotations{
Name: "ingress",
},
Ingress: true,
},
}
assert.NoError(t, store.CreateNetwork(tx, in))

n1 := &api.Network{
ID: "overlayID1",
Spec: api.NetworkSpec{
Annotations: api.Annotations{
Name: "overlayID1",
},
},
}
assert.NoError(t, store.CreateNetwork(tx, n1))

assert.NoError(t, store.CreateNode(tx, node1))
return nil
}))

nodeWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateNode{}, api.EventDeleteNode{})
defer cancel()
netWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateNetwork{}, api.EventDeleteNetwork{})
defer cancel()

// Start allocator
go func() {
assert.NoError(t, a.Run(context.Background()))
}()
defer a.Stop()

// Validate node has 2 LB IP address (1 for each network).
watchNetwork(t, netWatch, false, isValidNetwork) // ingress
watchNetwork(t, netWatch, false, isValidNetwork) // overlayID1
watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1

// Add a node and validate it gets a LB ip on each network.
node2 := &api.Node{
ID: "nodeID2",
}
assert.NoError(t, s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateNode(tx, node2))
return nil
}))
watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1"}) // node2

// Add a network and validate each node has 3 LB IP addresses
n2 := &api.Network{
ID: "overlayID2",
Spec: api.NetworkSpec{
Annotations: api.Annotations{
Name: "overlayID2",
},
},
}
assert.NoError(t, s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateNetwork(tx, n2))
return nil
}))
watchNetwork(t, netWatch, false, isValidNetwork) // overlayID2
watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1", "overlayID3"}) // node1
watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1", "overlayID3"}) // node2

// Remove a network and validate each node has 2 LB IP addresses
assert.NoError(t, s.Update(func(tx store.Tx) error {
assert.NoError(t, store.DeleteNetwork(tx, n2.ID))
return nil
}))
watchNetwork(t, netWatch, false, isValidNetwork) // overlayID2
watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "overlayID1"}) // node1
watchNode(t, nodeWatch, false, isValidNode, node2, []string{"ingress", "overlayID1"}) // node2

// Remove a node and validate remaining node has 2 LB IP addresses
assert.NoError(t, s.Update(func(tx store.Tx) error {
assert.NoError(t, store.DeleteNode(tx, node2.ID))
return nil
}))
watchNode(t, nodeWatch, false, nil, nil, nil) // node2
s.View(func(tx store.ReadTx) {
node1FromStore = store.GetNode(tx, node1.ID)
})

isValidNode(t, node1, node1FromStore, []string{"ingress", "overlayID1"})

// Validate that a LB IP address is not allocated for node-local networks
p := &api.Network{
ID: "bridge",
Spec: api.NetworkSpec{
Annotations: api.Annotations{
Name: "pred_bridge_network",
Labels: map[string]string{
"com.docker.swarm.predefined": "true",
},
},
DriverConfig: &api.Driver{Name: "bridge"},
},
}
assert.NoError(t, s.Update(func(tx store.Tx) error {
assert.NoError(t, store.CreateNetwork(tx, p))
return nil
}))
watchNetwork(t, netWatch, false, isValidNetwork) // bridge

s.View(func(tx store.ReadTx) {
node1FromStore = store.GetNode(tx, node1.ID)
})

isValidNode(t, node1, node1FromStore, []string{"ingress", "overlayID1"})
}

func isValidNode(t assert.TestingT, originalNode, updatedNode *api.Node, networks []string) bool {

if !assert.Equal(t, originalNode.ID, updatedNode.ID) {
return false
}

if !assert.Equal(t, len(updatedNode.LbAttachments), len(networks)) {
return false
}

for _, na := range updatedNode.LbAttachments {
if !assert.Equal(t, len(na.Addresses), 1) {
return false
}
}

return true
}

func isValidNetwork(t assert.TestingT, n *api.Network) bool {
if _, ok := n.Spec.Annotations.Labels["com.docker.swarm.predefined"]; ok {
return true
Expand Down Expand Up @@ -734,6 +885,43 @@ func getWatchTimeout(expectTimeout bool) time.Duration {
return 5 * time.Second
}

func watchNode(t *testing.T, watch chan events.Event, expectTimeout bool,
fn func(t assert.TestingT, originalNode, updatedNode *api.Node, networks []string) bool,
originalNode *api.Node,
networks []string) {
for {

var node *api.Node
select {
case event := <-watch:
if n, ok := event.(api.EventUpdateNode); ok {
node = n.Node.Copy()
if fn == nil || (fn != nil && fn(mockTester{}, originalNode, node, networks)) {
return
}
}

if n, ok := event.(api.EventDeleteNode); ok {
node = n.Node.Copy()
if fn == nil || (fn != nil && fn(mockTester{}, originalNode, node, networks)) {
return
}
}

case <-time.After(1 * time.Millisecond):
if !expectTimeout {
if node != nil && fn != nil {
fn(t, originalNode, node, networks)
}

t.Fatal("timed out before watchNode found expected node state")
}

return
}
}
}

func watchNetwork(t *testing.T, watch chan events.Event, expectTimeout bool, fn func(t assert.TestingT, n *api.Network) bool) {
for {
var network *api.Network
Expand Down
107 changes: 64 additions & 43 deletions manager/allocator/cnmallocator/networkallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ type cnmNetworkAllocator struct {
tasks map[string]struct{}

// Allocator state to indicate if allocation has been
// successfully completed for this node.
nodes map[string]struct{}
// successfully completed for this node on this network.
// outer map key: node id
// inner map key: network id
nodes map[string]map[string]struct{}
}

// Local in-memory state related to network that need to be tracked by cnmNetworkAllocator
Expand Down Expand Up @@ -89,7 +91,7 @@ func New(pg plugingetter.PluginGetter) (networkallocator.NetworkAllocator, error
networks: make(map[string]*network),
services: make(map[string]struct{}),
tasks: make(map[string]struct{}),
nodes: make(map[string]struct{}),
nodes: make(map[string]map[string]struct{}),
}

// There are no driver configurations and notification
Expand Down Expand Up @@ -430,81 +432,100 @@ func (na *cnmNetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func(
return true
}

// IsNodeAllocated returns if the passed node has its network resources allocated or not.
func (na *cnmNetworkAllocator) IsNodeAllocated(node *api.Node) bool {
// AllocateTask allocates all the endpoint resources for all the
// networks that a task is attached to.
func (na *cnmNetworkAllocator) AllocateTask(t *api.Task) error {
for i, nAttach := range t.Networks {
if localNet := na.getNetwork(nAttach.Network.ID); localNet != nil && localNet.isNodeLocal {
continue
}
if err := na.allocateNetworkIPs(nAttach); err != nil {
if err := na.releaseEndpoints(t.Networks[:i]); err != nil {
log.G(context.TODO()).WithError(err).Errorf("failed to release IP addresses while rolling back allocation for task %s network %s", t.ID, nAttach.Network.ID)
}
return errors.Wrapf(err, "failed to allocate network IP for task %s network %s", t.ID, nAttach.Network.ID)
}
}

na.tasks[t.ID] = struct{}{}

return nil
}

// DeallocateTask releases all the endpoint resources for all the
// networks that a task is attached to.
func (na *cnmNetworkAllocator) DeallocateTask(t *api.Task) error {
delete(na.tasks, t.ID)
return na.releaseEndpoints(t.Networks)
}

// IsLBAttachmentAllocated returns if the passed node and network has resources allocated or not.
func (na *cnmNetworkAllocator) IsLBAttachmentAllocated(node *api.Node, networkAttachment *api.NetworkAttachment) bool {
if node == nil {
return false
}

if networkAttachment == nil {
return false
}

// If the node is not found in the allocated set, then it is
// not allocated.
if _, ok := na.nodes[node.ID]; !ok {
return false
}

// If no attachment, not allocated.
if node.Attachment == nil {
// If the nework is not found in the allocated set, then it is
// not allocated.
if _, ok := na.nodes[node.ID][networkAttachment.Network.ID]; !ok {
return false
}

// If the network is not allocated, the node cannot be allocated.
localNet, ok := na.networks[node.Attachment.Network.ID]
localNet, ok := na.networks[networkAttachment.Network.ID]
if !ok {
return false
}

// Addresses empty, not allocated.
if len(node.Attachment.Addresses) == 0 {
if len(networkAttachment.Addresses) == 0 {
return false
}

// The allocated IP address not found in local endpoint state. Not allocated.
if _, ok := localNet.endpoints[node.Attachment.Addresses[0]]; !ok {
if _, ok := localNet.endpoints[networkAttachment.Addresses[0]]; !ok {
return false
}

return true
}

// AllocateNode allocates the IP addresses for the network to which
// the node is attached.
func (na *cnmNetworkAllocator) AllocateNode(node *api.Node) error {
if err := na.allocateNetworkIPs(node.Attachment); err != nil {
// AllocateLBAttachment allocates the IP addresses for a LB in a network
// on a given node
func (na *cnmNetworkAllocator) AllocateLBAttachment(node *api.Node, networkAttachment *api.NetworkAttachment) error {

if err := na.allocateNetworkIPs(networkAttachment); err != nil {
return err
}

na.nodes[node.ID] = struct{}{}
if na.nodes[node.ID] == nil {
na.nodes[node.ID] = make(map[string]struct{})
}
na.nodes[node.ID][networkAttachment.Network.ID] = struct{}{}

return nil
}

// DeallocateNode deallocates the IP addresses for the network to
// DeallocateLBAttachment deallocates the IP addresses for a LB in a network to
// which the node is attached.
func (na *cnmNetworkAllocator) DeallocateNode(node *api.Node) error {
delete(na.nodes, node.ID)
return na.releaseEndpoints([]*api.NetworkAttachment{node.Attachment})
}
func (na *cnmNetworkAllocator) DeallocateLBAttachment(node *api.Node, networkAttachment *api.NetworkAttachment) error {

// AllocateTask allocates all the endpoint resources for all the
// networks that a task is attached to.
func (na *cnmNetworkAllocator) AllocateTask(t *api.Task) error {
for i, nAttach := range t.Networks {
if localNet := na.getNetwork(nAttach.Network.ID); localNet != nil && localNet.isNodeLocal {
continue
}
if err := na.allocateNetworkIPs(nAttach); err != nil {
if err := na.releaseEndpoints(t.Networks[:i]); err != nil {
log.G(context.TODO()).WithError(err).Errorf("Failed to release IP addresses while rolling back allocation for task %s network %s", t.ID, nAttach.Network.ID)
}
return errors.Wrapf(err, "failed to allocate network IP for task %s network %s", t.ID, nAttach.Network.ID)
}
delete(na.nodes[node.ID], networkAttachment.Network.ID)
if len(na.nodes[node.ID]) == 0 {
delete(na.nodes, node.ID)
}

na.tasks[t.ID] = struct{}{}

return nil
}

// DeallocateTask releases all the endpoint resources for all the
// networks that a task is attached to.
func (na *cnmNetworkAllocator) DeallocateTask(t *api.Task) error {
delete(na.tasks, t.ID)
return na.releaseEndpoints(t.Networks)
return na.releaseEndpoints([]*api.NetworkAttachment{networkAttachment})
}

func (na *cnmNetworkAllocator) releaseEndpoints(networks []*api.NetworkAttachment) error {
Expand Down
Loading

0 comments on commit bd7bafb

Please sign in to comment.