From 39ac1a0037aaa86282084f7e9462d681926ccd0d Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 9 Feb 2017 21:01:52 +0000 Subject: [PATCH 1/6] Collect existing weave addresses at start-up, to avoid a race when doing it from the script Note we need to run weaver in the host pid namespace, so it can search for existing container IPs. Also now we are firing off sub-processes it's better to not be PID 1 so Linux doesn't expect us to reap zombies. --- ipam/allocator.go | 15 ++++++++++++ ipam/allocator_test.go | 13 ++++++++-- ipam/claim.go | 2 +- ipam/testutils_test.go | 11 ++++++--- prog/weaver/main.go | 7 ++++-- prog/weaver/reclaim.go | 55 ++++++++++++++++++++++++++++++++++++++++++ weave | 1 + 7 files changed, 96 insertions(+), 8 deletions(-) create mode 100644 prog/weaver/reclaim.go diff --git a/ipam/allocator.go b/ipam/allocator.go index fa7ddb26e1..fa8cfd3021 100644 --- a/ipam/allocator.go +++ b/ipam/allocator.go @@ -76,6 +76,11 @@ type Allocator struct { now func() time.Time } +type PreClaim struct { + Ident string + Cidr address.CIDR +} + type Config struct { OurName mesh.PeerName OurUID mesh.PeerUID @@ -83,6 +88,7 @@ type Config struct { Seed []mesh.PeerName Universe address.CIDR IsObserver bool + PreClaims []PreClaim Quorum func() uint Db db.DB IsKnownPeer func(name mesh.PeerName) bool @@ -123,6 +129,12 @@ func NewAllocator(config Config) *Allocator { dead: make(map[string]time.Time), now: time.Now, } + + alloc.pendingClaims = make([]operation, len(config.PreClaims)) + for i, c := range config.PreClaims { + alloc.pendingClaims[i] = &claim{ident: c.Ident, cidr: c.Cidr} + } + return alloc } @@ -156,6 +168,9 @@ func (alloc *Allocator) Start() { default: alloc.infof("Initialising as observer - awaiting IPAM data from another peer") } + if loadedPersistedData { // do any pre-claims right away + alloc.tryOps(&alloc.pendingClaims) + } actionChan := make(chan func(), mesh.ChannelSize) stopChan := make(chan struct{}) alloc.actionChan = actionChan diff --git a/ipam/allocator_test.go b/ipam/allocator_test.go index ce7c3759dc..64addc3ffa 100644 --- a/ipam/allocator_test.go +++ b/ipam/allocator_test.go @@ -168,11 +168,13 @@ func TestAllocatorClaim(t *testing.T) { container1 = "abcdef" container3 = "b01df00d" universe = "10.0.3.0/24" - testAddr1 = "10.0.3.2/24" + testAddr1 = "10.0.3.5/24" testAddr2 = "10.0.4.2/24" + testPre = "10.0.3.1/24" ) - allocs, router, subnet := makeNetworkOfAllocators(2, universe) + preAddr, _ := address.ParseCIDR(testPre) + allocs, router, subnet := makeNetworkOfAllocators(2, universe, []PreClaim{{container1, preAddr}}) defer stopNetworkOfAllocators(allocs, router) alloc := allocs[1] addr1, _ := address.ParseCIDR(testAddr1) @@ -184,6 +186,8 @@ func TestAllocatorClaim(t *testing.T) { alloc.Prime() // Do an allocate on the other peer, which we will try to claim later addrx, err := allocs[0].Allocate(container1, subnet, true, returnFalse) + // Should not get the address we pre-claimed + require.NotEqual(t, addrx, preAddr) router.Flush() // Now try the claim again @@ -201,6 +205,11 @@ func TestAllocatorClaim(t *testing.T) { // claiming the address allocated on the other peer should fail err = alloc.SimplyClaim(container1, address.MakeCIDR(subnet, addrx)) require.Error(t, err, "claiming address allocated on other peer should fail") + // claiming the pre-claimed address should fail on both peers + err = alloc.SimplyClaim(container3, preAddr) + require.Error(t, err, "claiming address allocated on other peer should fail") + err = allocs[0].SimplyClaim(container3, preAddr) + require.Error(t, err, "claiming address allocated on other peer should fail") // Check an address outside of our universe addr2, _ := address.ParseCIDR(testAddr2) err = alloc.SimplyClaim(container1, addr2) diff --git a/ipam/claim.go b/ipam/claim.go index e027f72364..dfd3ab769a 100644 --- a/ipam/claim.go +++ b/ipam/claim.go @@ -35,7 +35,7 @@ func (c *claim) sendResult(result error) { // Try returns true for success (or failure), false if we need to try again later func (c *claim) Try(alloc *Allocator) bool { - if c.hasBeenCancelled() { + if c.hasBeenCancelled != nil && c.hasBeenCancelled() { c.Cancel() return true } diff --git a/ipam/testutils_test.go b/ipam/testutils_test.go index 13fa8171bd..d6b2f07fa6 100644 --- a/ipam/testutils_test.go +++ b/ipam/testutils_test.go @@ -128,7 +128,7 @@ type mockDB struct{} func (d *mockDB) Load(_ string, _ interface{}) (bool, error) { return false, nil } func (d *mockDB) Save(_ string, _ interface{}) error { return nil } -func makeAllocator(name string, cidrStr string, quorum uint) (*Allocator, address.CIDR) { +func makeAllocator(name string, cidrStr string, quorum uint, preClaims ...PreClaim) (*Allocator, address.CIDR) { peername, err := mesh.PeerNameFromString(name) if err != nil { panic(err) @@ -145,6 +145,7 @@ func makeAllocator(name string, cidrStr string, quorum uint) (*Allocator, addres OurNickname: "nick-" + name, Universe: cidr, IsObserver: quorum == 0, + PreClaims: preClaims, Quorum: func() uint { return quorum }, Db: new(mockDB), IsKnownPeer: func(mesh.PeerName) bool { return true }, @@ -213,15 +214,19 @@ func AssertNothingSentErr(t *testing.T, ch <-chan error) { } } -func makeNetworkOfAllocators(size int, cidr string) ([]*Allocator, *gossip.TestRouter, address.CIDR) { +func makeNetworkOfAllocators(size int, cidr string, preClaims ...[]PreClaim) ([]*Allocator, *gossip.TestRouter, address.CIDR) { gossipRouter := gossip.NewTestRouter(0.0) allocs := make([]*Allocator, size) var subnet address.CIDR for i := 0; i < size; i++ { var alloc *Allocator + preClaim := []PreClaim{} + if i < len(preClaims) { + preClaim = preClaims[i] + } alloc, subnet = makeAllocator(fmt.Sprintf("%02d:00:00:02:00:00", i), - cidr, uint(size/2+1)) + cidr, uint(size/2+1), preClaim...) alloc.SetInterfaces(gossipRouter.Connect(alloc.ourName, alloc)) alloc.Start() allocs[i] = alloc diff --git a/prog/weaver/main.go b/prog/weaver/main.go index 92609b796d..b4f7fcc665 100644 --- a/prog/weaver/main.go +++ b/prog/weaver/main.go @@ -306,7 +306,9 @@ func main() { } trackerName = "awsvpc" } - allocator, defaultSubnet = createAllocator(router, ipamConfig, db, t, isKnownPeer) + preClaims, err := findExistingAddresses(weavenet.WeaveBridgeName) + checkFatal(err) + allocator, defaultSubnet = createAllocator(router, ipamConfig, preClaims, db, t, isKnownPeer) observeContainers(allocator) if dockerCli != nil { ids, err := dockerCli.AllContainerIDs() @@ -445,7 +447,7 @@ func createOverlay(datapathName string, ifaceName string, isAWSVPC bool, host st return overlay, bridge } -func createAllocator(router *weave.NetworkRouter, config ipamConfig, db db.DB, track tracker.LocalRangeTracker, isKnownPeer func(mesh.PeerName) bool) (*ipam.Allocator, address.CIDR) { +func createAllocator(router *weave.NetworkRouter, config ipamConfig, preClaims []ipam.PreClaim, db db.DB, track tracker.LocalRangeTracker, isKnownPeer func(mesh.PeerName) bool) (*ipam.Allocator, address.CIDR) { ipRange, err := ipam.ParseCIDRSubnet(config.IPRangeCIDR) checkFatal(err) defaultSubnet := ipRange @@ -464,6 +466,7 @@ func createAllocator(router *weave.NetworkRouter, config ipamConfig, db db.DB, t Seed: config.SeedPeerNames, Universe: ipRange, IsObserver: config.Observer, + PreClaims: preClaims, Quorum: func() uint { return determineQuorum(config.PeerCount, router) }, Db: db, IsKnownPeer: isKnownPeer, diff --git a/prog/weaver/reclaim.go b/prog/weaver/reclaim.go new file mode 100644 index 0000000000..8678fe8ddf --- /dev/null +++ b/prog/weaver/reclaim.go @@ -0,0 +1,55 @@ +package main + +import ( + "net" + + "github.com/weaveworks/weave/api" + "github.com/weaveworks/weave/common" + "github.com/weaveworks/weave/ipam" + weavenet "github.com/weaveworks/weave/net" + "github.com/weaveworks/weave/net/address" +) + +func a(cidr *net.IPNet) address.CIDR { + prefixLength, _ := cidr.Mask.Size() + return address.CIDR{Addr: address.FromIP4(cidr.IP), PrefixLen: prefixLength} +} + +// Get all the existing Weave IPs at startup, so we can stop IPAM +// giving out any as duplicates +func findExistingAddresses(bridgeName string) (addrs []ipam.PreClaim, err error) { + // First get the address for the bridge + bridgeNetDev, err := weavenet.GetBridgeNetDev(bridgeName) + if err != nil { + return nil, err + } + for _, cidr := range bridgeNetDev.CIDRs { + addrs = append(addrs, ipam.PreClaim{Ident: "weave:expose", Cidr: a(cidr)}) + } + + // Then find all veths connected to the bridge + peerIDs, err := weavenet.ConnectedToBridgeVethPeerIds(bridgeName) + if err != nil { + return nil, err + } + + pids, err := common.AllPids("/proc") + if err != nil { + return nil, err + } + + // Now iterate over all processes to see if they have a network namespace with an attached interface + for _, pid := range pids { + netDevs, err := weavenet.GetNetDevsByVethPeerIds(pid, peerIDs) + if err != nil { + return nil, err + } + for _, netDev := range netDevs { + for _, cidr := range netDev.CIDRs { + // We don't know the container ID, so use special magic string + addrs = append(addrs, ipam.PreClaim{Ident: api.NoContainerID, Cidr: a(cidr)}) + } + } + } + return addrs, nil +} diff --git a/weave b/weave index d4c79e12d6..576b058b06 100755 --- a/weave +++ b/weave @@ -1691,6 +1691,7 @@ launch_router() { ROUTER_CONTAINER=$(docker run -d --name=$CONTAINER_NAME \ $(docker_run_options) \ $RESTART_POLICY \ + --pid=host \ --volumes-from $DB_CONTAINER_NAME \ -v $RESOLV_CONF_DIR:/var/run/weave/etc \ -e WEAVE_PASSWORD \ From e26dbf11e9145940afaded9062fcf3a26b08f00c Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 10 Feb 2017 13:33:41 +0000 Subject: [PATCH 2/6] Stop collecting addresses in launch script now we do it in Go --- prog/weave-kube/launch.sh | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/prog/weave-kube/launch.sh b/prog/weave-kube/launch.sh index 080a11b04f..658bf097f6 100755 --- a/prog/weave-kube/launch.sh +++ b/prog/weave-kube/launch.sh @@ -66,14 +66,6 @@ if [ -z "$IPALLOC_INIT" ]; then IPALLOC_INIT="consensus=$(peer_count $KUBE_PEERS)" fi -reclaim_ips() { - ID=$1 - shift - for CIDR in "$@" ; do - curl -s -S -X PUT "$HTTP_ADDR/ip/$ID/$CIDR" || true - done -} - post_start_actions() { # Wait for weave process to become responsive while true ; do @@ -81,15 +73,6 @@ post_start_actions() { sleep 1 done - # Tell the newly-started weave about existing weave bridge IPs - /usr/bin/weaveutil container-addrs weave weave:expose | while read ID IFACE MAC IPS; do - reclaim_ips "weave:expose" $IPS - done - # Tell weave about existing weave process IPs - /usr/bin/weaveutil process-addrs weave | while read ID IFACE MAC IPS; do - reclaim_ips "_" $IPS - done - # Install CNI plugin binary to typical CNI bin location # with fall-back to CNI directory used by kube-up on GCI OS if ! mkdir -p $HOST_ROOT/opt/cni/bin ; then From 7075b5d9fd6872c647633e9ec9580e25753dc901 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 10 Feb 2017 14:58:20 +0000 Subject: [PATCH 3/6] Extend pre-existing address reclaim to docker containers --- ipam/allocator.go | 5 ++-- ipam/allocator_test.go | 2 +- prog/weaver/main.go | 13 +++++----- prog/weaver/reclaim.go | 55 ++++++++++++++++++++++++++++++++---------- 4 files changed, 53 insertions(+), 22 deletions(-) diff --git a/ipam/allocator.go b/ipam/allocator.go index fa8cfd3021..71b21309a1 100644 --- a/ipam/allocator.go +++ b/ipam/allocator.go @@ -77,8 +77,9 @@ type Allocator struct { } type PreClaim struct { - Ident string - Cidr address.CIDR + Ident string + IsContainer bool + Cidr address.CIDR } type Config struct { diff --git a/ipam/allocator_test.go b/ipam/allocator_test.go index 64addc3ffa..273d7cb130 100644 --- a/ipam/allocator_test.go +++ b/ipam/allocator_test.go @@ -174,7 +174,7 @@ func TestAllocatorClaim(t *testing.T) { ) preAddr, _ := address.ParseCIDR(testPre) - allocs, router, subnet := makeNetworkOfAllocators(2, universe, []PreClaim{{container1, preAddr}}) + allocs, router, subnet := makeNetworkOfAllocators(2, universe, []PreClaim{{container1, true, preAddr}}) defer stopNetworkOfAllocators(allocs, router) alloc := allocs[1] addr1, _ := address.ParseCIDR(testAddr1) diff --git a/prog/weaver/main.go b/prog/weaver/main.go index b4f7fcc665..f266618817 100644 --- a/prog/weaver/main.go +++ b/prog/weaver/main.go @@ -306,15 +306,16 @@ func main() { } trackerName = "awsvpc" } - preClaims, err := findExistingAddresses(weavenet.WeaveBridgeName) - checkFatal(err) - allocator, defaultSubnet = createAllocator(router, ipamConfig, preClaims, db, t, isKnownPeer) - observeContainers(allocator) + var allContainerIDs []string if dockerCli != nil { - ids, err := dockerCli.AllContainerIDs() + allContainerIDs, err = dockerCli.AllContainerIDs() checkFatal(err) - allocator.PruneOwned(ids) } + preClaims, err := findExistingAddresses(dockerCli, allContainerIDs, weavenet.WeaveBridgeName) + checkFatal(err) + allocator, defaultSubnet = createAllocator(router, ipamConfig, preClaims, db, t, isKnownPeer) + observeContainers(allocator) + allocator.PruneOwned(allContainerIDs) } var ( diff --git a/prog/weaver/reclaim.go b/prog/weaver/reclaim.go index 8678fe8ddf..5ccae97b0f 100644 --- a/prog/weaver/reclaim.go +++ b/prog/weaver/reclaim.go @@ -3,8 +3,11 @@ package main import ( "net" + docker "github.com/fsouza/go-dockerclient" + "github.com/weaveworks/weave/api" "github.com/weaveworks/weave/common" + weavedocker "github.com/weaveworks/weave/common/docker" "github.com/weaveworks/weave/ipam" weavenet "github.com/weaveworks/weave/net" "github.com/weaveworks/weave/net/address" @@ -17,38 +20,64 @@ func a(cidr *net.IPNet) address.CIDR { // Get all the existing Weave IPs at startup, so we can stop IPAM // giving out any as duplicates -func findExistingAddresses(bridgeName string) (addrs []ipam.PreClaim, err error) { +func findExistingAddresses(dockerCli *weavedocker.Client, containerIDs []string, bridgeName string) (addrs []ipam.PreClaim, err error) { + Log.Infof("Checking for pre-existing addresses on %s bridge", bridgeName) // First get the address for the bridge bridgeNetDev, err := weavenet.GetBridgeNetDev(bridgeName) if err != nil { return nil, err } for _, cidr := range bridgeNetDev.CIDRs { + Log.Infof("%s bridge has address %v", bridgeName, cidr) addrs = append(addrs, ipam.PreClaim{Ident: "weave:expose", Cidr: a(cidr)}) } - // Then find all veths connected to the bridge - peerIDs, err := weavenet.ConnectedToBridgeVethPeerIds(bridgeName) - if err != nil { - return nil, err + add := func(cid string, isContainer bool, netDevs []weavenet.Dev) { + for _, netDev := range netDevs { + for _, cidr := range netDev.CIDRs { + Log.Infof("Found address %v for ID %s", cidr, cid) + addrs = append(addrs, ipam.PreClaim{Ident: cid, IsContainer: isContainer, Cidr: a(cidr)}) + } + } } - pids, err := common.AllPids("/proc") + // Then find all veths connected to the bridge + peerIDs, err := weavenet.ConnectedToBridgeVethPeerIds(bridgeName) if err != nil { return nil, err } - // Now iterate over all processes to see if they have a network namespace with an attached interface - for _, pid := range pids { - netDevs, err := weavenet.GetNetDevsByVethPeerIds(pid, peerIDs) + // Now iterate over all containers to see if they have a network + // namespace with an attached interface + if dockerCli != nil { + for _, cid := range containerIDs { + container, err := dockerCli.InspectContainer(cid) + if err != nil { + if _, ok := err.(*docker.NoSuchContainer); ok { + continue + } + return nil, err + } + if container.State.Pid != 0 { + netDevs, err := weavenet.GetNetDevsByVethPeerIds(container.State.Pid, peerIDs) + if err != nil { + return nil, err + } + add(cid, true, netDevs) + } + } + } else { + // If we don't have a Docker connection, iterate over all processes + pids, err := common.AllPids("/proc") if err != nil { return nil, err } - for _, netDev := range netDevs { - for _, cidr := range netDev.CIDRs { - // We don't know the container ID, so use special magic string - addrs = append(addrs, ipam.PreClaim{Ident: api.NoContainerID, Cidr: a(cidr)}) + for _, pid := range pids { + netDevs, err := weavenet.GetNetDevsByVethPeerIds(pid, peerIDs) + if err != nil { + return nil, err } + add(api.NoContainerID, false, netDevs) } } return addrs, nil From fde7dcdde906a6c3c330d8f10df82010903860c3 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 10 Feb 2017 15:08:42 +0000 Subject: [PATCH 4/6] Stop collecting addresses in weave script now we do it in Go --- weave | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/weave b/weave index 576b058b06..68d589ef47 100755 --- a/weave +++ b/weave @@ -1304,25 +1304,6 @@ check_overlap() { util_op netcheck $1 $BRIDGE } -# Claim addresses for a container in IPAM. Expects to be called from -# with_container_addresses. -ipam_reclaim() { - CONTAINER_ID="$1" - # The weave IP addresses of containers attached by the plugin are - # recorded specially in IPAM, since the container id is not know - # at the time. - [ "$2" = "$CONTAINER_IFNAME" ] || CONTAINER_ID="_" - for CIDR in $4 ; do - http_call $HTTP_ADDR PUT "/ip/$CONTAINER_ID/$CIDR?noErrorOnUnknown=true&?check-alive=true" || true - done -} - -ipam_reclaim_no_check_alive() { - for CIDR in $4 ; do - http_call $HTTP_ADDR PUT /ip/$1/$CIDR?noErrorOnUnknown=true || true - done -} - detect_awsvpc() { # Ignoring errors here: if we cannot detect AWSVPC we will skip the relevant # steps, because "attach" should work without the weave router running. @@ -1729,17 +1710,10 @@ setup_awsvpc() { # Recreate the parameter values that are set when the router is first launched fetch_router_args() { CONTAINER_ARGS=$(docker inspect -f '{{.Args}}' $CONTAINER_NAME) || return 1 - IPRANGE=$(echo $CONTAINER_ARGS | grep -o -E -e '-ipalloc-range [0-9/.]+') || true NO_DNS_OPT=$(echo $CONTAINER_ARGS | grep -o -e '--no-dns') || true } populate_router() { - if [ -n "$IPRANGE" ] ; then - # Tell the newly-started weave IP allocator about existing weave IPs - # In the case of AWSVPC, we do expose before calling populate_router - [ -n "$AWSVPC" ] || with_container_addresses ipam_reclaim_no_check_alive weave:expose - with_container_addresses ipam_reclaim $(docker ps -q --no-trunc) - fi if [ -z "$NO_DNS_OPT" ] ; then # Tell the newly-started weaveDNS about existing weave IPs for CONTAINER in $(docker ps -q --no-trunc) ; do From 293abd953eb3298177e494e52bfeba2ce085660f Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Fri, 10 Feb 2017 17:27:40 +0000 Subject: [PATCH 5/6] Fix flaky test 320 The intention, per the comments, is that host1 should claim from host2. But host2 and host3 can carve up the space either way round. If host1 claims from host3 then there is a race whether anyone finds out about it. Adding a 'prime' on host2 ensures the space is all owned by host2 at that point. --- test/320_claim_3_test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/test/320_claim_3_test.sh b/test/320_claim_3_test.sh index c7bd900eec..4c135ab671 100755 --- a/test/320_claim_3_test.sh +++ b/test/320_claim_3_test.sh @@ -49,6 +49,7 @@ delete_persistence $HOST1 $HOST2 # Now make host1 attempt to claim from host2, when host2 is stopped # the point being to check whether host1 will hang trying to talk to host2 weave_on $HOST2 launch-router --ipalloc-range $UNIVERSE +weave_on $HOST2 prime # Introduce host3 to remember the IPAM CRDT when we stop host2 weave_on $HOST3 launch-router --ipalloc-range $UNIVERSE $HOST2 weave_on $HOST3 prime From 4e37f5ec4213c0129bda2cdcc8e1b61e9066344e Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 13 Feb 2017 17:31:02 +0000 Subject: [PATCH 6/6] Add explanations of various claim and allocation fields --- ipam/allocate.go | 4 ++-- ipam/allocator.go | 5 +++-- ipam/claim.go | 8 ++++---- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/ipam/allocate.go b/ipam/allocate.go index ac542aac83..c4092a1286 100644 --- a/ipam/allocate.go +++ b/ipam/allocate.go @@ -14,9 +14,9 @@ type allocateResult struct { type allocate struct { resultChan chan<- allocateResult - ident string + ident string // a container ID, something like "weave:expose", or api.NoContainerID r address.CIDR // Subnet we are trying to allocate within - isContainer bool + isContainer bool // true if ident is a container ID hasBeenCancelled func() bool } diff --git a/ipam/allocator.go b/ipam/allocator.go index 71b21309a1..6ad7a54042 100644 --- a/ipam/allocator.go +++ b/ipam/allocator.go @@ -76,9 +76,10 @@ type Allocator struct { now func() time.Time } +// PreClaims are IP addresses discovered before we could initialize IPAM type PreClaim struct { - Ident string - IsContainer bool + Ident string // a container ID, something like "weave:expose", or api.NoContainerID + IsContainer bool // true if Ident is a container ID Cidr address.CIDR } diff --git a/ipam/claim.go b/ipam/claim.go index dfd3ab769a..9a827e546a 100644 --- a/ipam/claim.go +++ b/ipam/claim.go @@ -12,10 +12,10 @@ import ( type claim struct { resultChan chan<- error - ident string - cidr address.CIDR - isContainer bool - noErrorOnUnknown bool + ident string // a container ID, something like "weave:expose", or api.NoContainerID + cidr address.CIDR // single address being claimed + isContainer bool // true if ident is a container ID + noErrorOnUnknown bool // if false, error or block if we don't know; if true return ok but keep trying hasBeenCancelled func() bool }