Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2787 from /issues/2784-preclaim-addresses
Browse files Browse the repository at this point in the history
Claim existing IP addresses at startup with no race condition

Fixes #2784
  • Loading branch information
brb authored Feb 13, 2017
2 parents 39f471e + 4e37f5e commit 32d44e0
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 60 deletions.
4 changes: 2 additions & 2 deletions ipam/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ type allocateResult struct {

type allocate struct {
resultChan chan<- allocateResult
ident string
ident string // a container ID, something like "weave:expose", or api.NoContainerID
r address.CIDR // Subnet we are trying to allocate within
isContainer bool
isContainer bool // true if ident is a container ID
hasBeenCancelled func() bool
}

Expand Down
17 changes: 17 additions & 0 deletions ipam/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,21 @@ type Allocator struct {
now func() time.Time
}

// PreClaims are IP addresses discovered before we could initialize IPAM
type PreClaim struct {
Ident string // a container ID, something like "weave:expose", or api.NoContainerID
IsContainer bool // true if Ident is a container ID
Cidr address.CIDR
}

type Config struct {
OurName mesh.PeerName
OurUID mesh.PeerUID
OurNickname string
Seed []mesh.PeerName
Universe address.CIDR
IsObserver bool
PreClaims []PreClaim
Quorum func() uint
Db db.DB
IsKnownPeer func(name mesh.PeerName) bool
Expand Down Expand Up @@ -123,6 +131,12 @@ func NewAllocator(config Config) *Allocator {
dead: make(map[string]time.Time),
now: time.Now,
}

alloc.pendingClaims = make([]operation, len(config.PreClaims))
for i, c := range config.PreClaims {
alloc.pendingClaims[i] = &claim{ident: c.Ident, cidr: c.Cidr}
}

return alloc
}

Expand Down Expand Up @@ -156,6 +170,9 @@ func (alloc *Allocator) Start() {
default:
alloc.infof("Initialising as observer - awaiting IPAM data from another peer")
}
if loadedPersistedData { // do any pre-claims right away
alloc.tryOps(&alloc.pendingClaims)
}
actionChan := make(chan func(), mesh.ChannelSize)
stopChan := make(chan struct{})
alloc.actionChan = actionChan
Expand Down
13 changes: 11 additions & 2 deletions ipam/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,13 @@ func TestAllocatorClaim(t *testing.T) {
container1 = "abcdef"
container3 = "b01df00d"
universe = "10.0.3.0/24"
testAddr1 = "10.0.3.2/24"
testAddr1 = "10.0.3.5/24"
testAddr2 = "10.0.4.2/24"
testPre = "10.0.3.1/24"
)

allocs, router, subnet := makeNetworkOfAllocators(2, universe)
preAddr, _ := address.ParseCIDR(testPre)
allocs, router, subnet := makeNetworkOfAllocators(2, universe, []PreClaim{{container1, true, preAddr}})
defer stopNetworkOfAllocators(allocs, router)
alloc := allocs[1]
addr1, _ := address.ParseCIDR(testAddr1)
Expand All @@ -184,6 +186,8 @@ func TestAllocatorClaim(t *testing.T) {
alloc.Prime()
// Do an allocate on the other peer, which we will try to claim later
addrx, err := allocs[0].Allocate(container1, subnet, true, returnFalse)
// Should not get the address we pre-claimed
require.NotEqual(t, addrx, preAddr)
router.Flush()

// Now try the claim again
Expand All @@ -201,6 +205,11 @@ func TestAllocatorClaim(t *testing.T) {
// claiming the address allocated on the other peer should fail
err = alloc.SimplyClaim(container1, address.MakeCIDR(subnet, addrx))
require.Error(t, err, "claiming address allocated on other peer should fail")
// claiming the pre-claimed address should fail on both peers
err = alloc.SimplyClaim(container3, preAddr)
require.Error(t, err, "claiming address allocated on other peer should fail")
err = allocs[0].SimplyClaim(container3, preAddr)
require.Error(t, err, "claiming address allocated on other peer should fail")
// Check an address outside of our universe
addr2, _ := address.ParseCIDR(testAddr2)
err = alloc.SimplyClaim(container1, addr2)
Expand Down
10 changes: 5 additions & 5 deletions ipam/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

type claim struct {
resultChan chan<- error
ident string
cidr address.CIDR
isContainer bool
noErrorOnUnknown bool
ident string // a container ID, something like "weave:expose", or api.NoContainerID
cidr address.CIDR // single address being claimed
isContainer bool // true if ident is a container ID
noErrorOnUnknown bool // if false, error or block if we don't know; if true return ok but keep trying
hasBeenCancelled func() bool
}

Expand All @@ -35,7 +35,7 @@ func (c *claim) sendResult(result error) {

// Try returns true for success (or failure), false if we need to try again later
func (c *claim) Try(alloc *Allocator) bool {
if c.hasBeenCancelled() {
if c.hasBeenCancelled != nil && c.hasBeenCancelled() {
c.Cancel()
return true
}
Expand Down
11 changes: 8 additions & 3 deletions ipam/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type mockDB struct{}
func (d *mockDB) Load(_ string, _ interface{}) (bool, error) { return false, nil }
func (d *mockDB) Save(_ string, _ interface{}) error { return nil }

func makeAllocator(name string, cidrStr string, quorum uint) (*Allocator, address.CIDR) {
func makeAllocator(name string, cidrStr string, quorum uint, preClaims ...PreClaim) (*Allocator, address.CIDR) {
peername, err := mesh.PeerNameFromString(name)
if err != nil {
panic(err)
Expand All @@ -145,6 +145,7 @@ func makeAllocator(name string, cidrStr string, quorum uint) (*Allocator, addres
OurNickname: "nick-" + name,
Universe: cidr,
IsObserver: quorum == 0,
PreClaims: preClaims,
Quorum: func() uint { return quorum },
Db: new(mockDB),
IsKnownPeer: func(mesh.PeerName) bool { return true },
Expand Down Expand Up @@ -213,15 +214,19 @@ func AssertNothingSentErr(t *testing.T, ch <-chan error) {
}
}

func makeNetworkOfAllocators(size int, cidr string) ([]*Allocator, *gossip.TestRouter, address.CIDR) {
func makeNetworkOfAllocators(size int, cidr string, preClaims ...[]PreClaim) ([]*Allocator, *gossip.TestRouter, address.CIDR) {
gossipRouter := gossip.NewTestRouter(0.0)
allocs := make([]*Allocator, size)
var subnet address.CIDR

for i := 0; i < size; i++ {
var alloc *Allocator
preClaim := []PreClaim{}
if i < len(preClaims) {
preClaim = preClaims[i]
}
alloc, subnet = makeAllocator(fmt.Sprintf("%02d:00:00:02:00:00", i),
cidr, uint(size/2+1))
cidr, uint(size/2+1), preClaim...)
alloc.SetInterfaces(gossipRouter.Connect(alloc.ourName, alloc))
alloc.Start()
allocs[i] = alloc
Expand Down
17 changes: 0 additions & 17 deletions prog/weave-kube/launch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,13 @@ if [ -z "$IPALLOC_INIT" ]; then
IPALLOC_INIT="consensus=$(peer_count $KUBE_PEERS)"
fi

reclaim_ips() {
ID=$1
shift
for CIDR in "$@" ; do
curl -s -S -X PUT "$HTTP_ADDR/ip/$ID/$CIDR" || true
done
}

post_start_actions() {
# Wait for weave process to become responsive
while true ; do
curl $HTTP_ADDR/status >/dev/null 2>&1 && break
sleep 1
done

# Tell the newly-started weave about existing weave bridge IPs
/usr/bin/weaveutil container-addrs weave weave:expose | while read ID IFACE MAC IPS; do
reclaim_ips "weave:expose" $IPS
done
# Tell weave about existing weave process IPs
/usr/bin/weaveutil process-addrs weave | while read ID IFACE MAC IPS; do
reclaim_ips "_" $IPS
done

# Install CNI plugin binary to typical CNI bin location
# with fall-back to CNI directory used by kube-up on GCI OS
if ! mkdir -p $HOST_ROOT/opt/cni/bin ; then
Expand Down
14 changes: 9 additions & 5 deletions prog/weaver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,16 @@ func main() {
}
trackerName = "awsvpc"
}
allocator, defaultSubnet = createAllocator(router, ipamConfig, db, t, isKnownPeer)
observeContainers(allocator)
var allContainerIDs []string
if dockerCli != nil {
ids, err := dockerCli.AllContainerIDs()
allContainerIDs, err = dockerCli.AllContainerIDs()
checkFatal(err)
allocator.PruneOwned(ids)
}
preClaims, err := findExistingAddresses(dockerCli, allContainerIDs, weavenet.WeaveBridgeName)
checkFatal(err)
allocator, defaultSubnet = createAllocator(router, ipamConfig, preClaims, db, t, isKnownPeer)
observeContainers(allocator)
allocator.PruneOwned(allContainerIDs)
}

var (
Expand Down Expand Up @@ -445,7 +448,7 @@ func createOverlay(datapathName string, ifaceName string, isAWSVPC bool, host st
return overlay, bridge
}

func createAllocator(router *weave.NetworkRouter, config ipamConfig, db db.DB, track tracker.LocalRangeTracker, isKnownPeer func(mesh.PeerName) bool) (*ipam.Allocator, address.CIDR) {
func createAllocator(router *weave.NetworkRouter, config ipamConfig, preClaims []ipam.PreClaim, db db.DB, track tracker.LocalRangeTracker, isKnownPeer func(mesh.PeerName) bool) (*ipam.Allocator, address.CIDR) {
ipRange, err := ipam.ParseCIDRSubnet(config.IPRangeCIDR)
checkFatal(err)
defaultSubnet := ipRange
Expand All @@ -464,6 +467,7 @@ func createAllocator(router *weave.NetworkRouter, config ipamConfig, db db.DB, t
Seed: config.SeedPeerNames,
Universe: ipRange,
IsObserver: config.Observer,
PreClaims: preClaims,
Quorum: func() uint { return determineQuorum(config.PeerCount, router) },
Db: db,
IsKnownPeer: isKnownPeer,
Expand Down
84 changes: 84 additions & 0 deletions prog/weaver/reclaim.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

import (
"net"

docker "github.com/fsouza/go-dockerclient"

"github.com/weaveworks/weave/api"
"github.com/weaveworks/weave/common"
weavedocker "github.com/weaveworks/weave/common/docker"
"github.com/weaveworks/weave/ipam"
weavenet "github.com/weaveworks/weave/net"
"github.com/weaveworks/weave/net/address"
)

func a(cidr *net.IPNet) address.CIDR {
prefixLength, _ := cidr.Mask.Size()
return address.CIDR{Addr: address.FromIP4(cidr.IP), PrefixLen: prefixLength}
}

// Get all the existing Weave IPs at startup, so we can stop IPAM
// giving out any as duplicates
func findExistingAddresses(dockerCli *weavedocker.Client, containerIDs []string, bridgeName string) (addrs []ipam.PreClaim, err error) {
Log.Infof("Checking for pre-existing addresses on %s bridge", bridgeName)
// First get the address for the bridge
bridgeNetDev, err := weavenet.GetBridgeNetDev(bridgeName)
if err != nil {
return nil, err
}
for _, cidr := range bridgeNetDev.CIDRs {
Log.Infof("%s bridge has address %v", bridgeName, cidr)
addrs = append(addrs, ipam.PreClaim{Ident: "weave:expose", Cidr: a(cidr)})
}

add := func(cid string, isContainer bool, netDevs []weavenet.Dev) {
for _, netDev := range netDevs {
for _, cidr := range netDev.CIDRs {
Log.Infof("Found address %v for ID %s", cidr, cid)
addrs = append(addrs, ipam.PreClaim{Ident: cid, IsContainer: isContainer, Cidr: a(cidr)})
}
}
}

// Then find all veths connected to the bridge
peerIDs, err := weavenet.ConnectedToBridgeVethPeerIds(bridgeName)
if err != nil {
return nil, err
}

// Now iterate over all containers to see if they have a network
// namespace with an attached interface
if dockerCli != nil {
for _, cid := range containerIDs {
container, err := dockerCli.InspectContainer(cid)
if err != nil {
if _, ok := err.(*docker.NoSuchContainer); ok {
continue
}
return nil, err
}
if container.State.Pid != 0 {
netDevs, err := weavenet.GetNetDevsByVethPeerIds(container.State.Pid, peerIDs)
if err != nil {
return nil, err
}
add(cid, true, netDevs)
}
}
} else {
// If we don't have a Docker connection, iterate over all processes
pids, err := common.AllPids("/proc")
if err != nil {
return nil, err
}
for _, pid := range pids {
netDevs, err := weavenet.GetNetDevsByVethPeerIds(pid, peerIDs)
if err != nil {
return nil, err
}
add(api.NoContainerID, false, netDevs)
}
}
return addrs, nil
}
1 change: 1 addition & 0 deletions test/320_claim_3_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ delete_persistence $HOST1 $HOST2
# Now make host1 attempt to claim from host2, when host2 is stopped
# the point being to check whether host1 will hang trying to talk to host2
weave_on $HOST2 launch-router --ipalloc-range $UNIVERSE
weave_on $HOST2 prime
# Introduce host3 to remember the IPAM CRDT when we stop host2
weave_on $HOST3 launch-router --ipalloc-range $UNIVERSE $HOST2
weave_on $HOST3 prime
Expand Down
27 changes: 1 addition & 26 deletions weave
Original file line number Diff line number Diff line change
Expand Up @@ -1304,25 +1304,6 @@ check_overlap() {
util_op netcheck $1 $BRIDGE
}

# Claim addresses for a container in IPAM. Expects to be called from
# with_container_addresses.
ipam_reclaim() {
CONTAINER_ID="$1"
# The weave IP addresses of containers attached by the plugin are
# recorded specially in IPAM, since the container id is not know
# at the time.
[ "$2" = "$CONTAINER_IFNAME" ] || CONTAINER_ID="_"
for CIDR in $4 ; do
http_call $HTTP_ADDR PUT "/ip/$CONTAINER_ID/$CIDR?noErrorOnUnknown=true&?check-alive=true" || true
done
}

ipam_reclaim_no_check_alive() {
for CIDR in $4 ; do
http_call $HTTP_ADDR PUT /ip/$1/$CIDR?noErrorOnUnknown=true || true
done
}

detect_awsvpc() {
# Ignoring errors here: if we cannot detect AWSVPC we will skip the relevant
# steps, because "attach" should work without the weave router running.
Expand Down Expand Up @@ -1691,6 +1672,7 @@ launch_router() {
ROUTER_CONTAINER=$(docker run -d --name=$CONTAINER_NAME \
$(docker_run_options) \
$RESTART_POLICY \
--pid=host \
--volumes-from $DB_CONTAINER_NAME \
-v $RESOLV_CONF_DIR:/var/run/weave/etc \
-e WEAVE_PASSWORD \
Expand Down Expand Up @@ -1728,17 +1710,10 @@ setup_awsvpc() {
# Recreate the parameter values that are set when the router is first launched
fetch_router_args() {
CONTAINER_ARGS=$(docker inspect -f '{{.Args}}' $CONTAINER_NAME) || return 1
IPRANGE=$(echo $CONTAINER_ARGS | grep -o -E -e '-ipalloc-range [0-9/.]+') || true
NO_DNS_OPT=$(echo $CONTAINER_ARGS | grep -o -e '--no-dns') || true
}

populate_router() {
if [ -n "$IPRANGE" ] ; then
# Tell the newly-started weave IP allocator about existing weave IPs
# In the case of AWSVPC, we do expose before calling populate_router
[ -n "$AWSVPC" ] || with_container_addresses ipam_reclaim_no_check_alive weave:expose
with_container_addresses ipam_reclaim $(docker ps -q --no-trunc)
fi
if [ -z "$NO_DNS_OPT" ] ; then
# Tell the newly-started weaveDNS about existing weave IPs
for CONTAINER in $(docker ps -q --no-trunc) ; do
Expand Down

0 comments on commit 32d44e0

Please sign in to comment.