Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Claim existing IP addresses at startup with no race condition #2787

Merged
merged 6 commits into from
Feb 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ipam/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ type allocateResult struct {

type allocate struct {
resultChan chan<- allocateResult
ident string
ident string // a container ID, something like "weave:expose", or api.NoContainerID
r address.CIDR // Subnet we are trying to allocate within
isContainer bool
isContainer bool // true if ident is a container ID
hasBeenCancelled func() bool
}

Expand Down
17 changes: 17 additions & 0 deletions ipam/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,21 @@ type Allocator struct {
now func() time.Time
}

// PreClaims are IP addresses discovered before we could initialize IPAM
type PreClaim struct {
Ident string // a container ID, something like "weave:expose", or api.NoContainerID
IsContainer bool // true if Ident is a container ID
Cidr address.CIDR
}

type Config struct {
OurName mesh.PeerName
OurUID mesh.PeerUID
OurNickname string
Seed []mesh.PeerName
Universe address.CIDR
IsObserver bool
PreClaims []PreClaim
Quorum func() uint
Db db.DB
IsKnownPeer func(name mesh.PeerName) bool
Expand Down Expand Up @@ -123,6 +131,12 @@ func NewAllocator(config Config) *Allocator {
dead: make(map[string]time.Time),
now: time.Now,
}

alloc.pendingClaims = make([]operation, len(config.PreClaims))
for i, c := range config.PreClaims {
alloc.pendingClaims[i] = &claim{ident: c.Ident, cidr: c.Cidr}
}

return alloc
}

Expand Down Expand Up @@ -156,6 +170,9 @@ func (alloc *Allocator) Start() {
default:
alloc.infof("Initialising as observer - awaiting IPAM data from another peer")
}
if loadedPersistedData { // do any pre-claims right away
alloc.tryOps(&alloc.pendingClaims)
}
actionChan := make(chan func(), mesh.ChannelSize)
stopChan := make(chan struct{})
alloc.actionChan = actionChan
Expand Down
13 changes: 11 additions & 2 deletions ipam/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,13 @@ func TestAllocatorClaim(t *testing.T) {
container1 = "abcdef"
container3 = "b01df00d"
universe = "10.0.3.0/24"
testAddr1 = "10.0.3.2/24"
testAddr1 = "10.0.3.5/24"
testAddr2 = "10.0.4.2/24"
testPre = "10.0.3.1/24"
)

allocs, router, subnet := makeNetworkOfAllocators(2, universe)
preAddr, _ := address.ParseCIDR(testPre)
allocs, router, subnet := makeNetworkOfAllocators(2, universe, []PreClaim{{container1, true, preAddr}})
defer stopNetworkOfAllocators(allocs, router)
alloc := allocs[1]
addr1, _ := address.ParseCIDR(testAddr1)
Expand All @@ -184,6 +186,8 @@ func TestAllocatorClaim(t *testing.T) {
alloc.Prime()
// Do an allocate on the other peer, which we will try to claim later
addrx, err := allocs[0].Allocate(container1, subnet, true, returnFalse)
// Should not get the address we pre-claimed
require.NotEqual(t, addrx, preAddr)
router.Flush()

// Now try the claim again
Expand All @@ -201,6 +205,11 @@ func TestAllocatorClaim(t *testing.T) {
// claiming the address allocated on the other peer should fail
err = alloc.SimplyClaim(container1, address.MakeCIDR(subnet, addrx))
require.Error(t, err, "claiming address allocated on other peer should fail")
// claiming the pre-claimed address should fail on both peers
err = alloc.SimplyClaim(container3, preAddr)
require.Error(t, err, "claiming address allocated on other peer should fail")
err = allocs[0].SimplyClaim(container3, preAddr)
require.Error(t, err, "claiming address allocated on other peer should fail")
// Check an address outside of our universe
addr2, _ := address.ParseCIDR(testAddr2)
err = alloc.SimplyClaim(container1, addr2)
Expand Down
10 changes: 5 additions & 5 deletions ipam/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

type claim struct {
resultChan chan<- error
ident string
cidr address.CIDR
isContainer bool
noErrorOnUnknown bool
ident string // a container ID, something like "weave:expose", or api.NoContainerID
cidr address.CIDR // single address being claimed
isContainer bool // true if ident is a container ID
noErrorOnUnknown bool // if false, error or block if we don't know; if true return ok but keep trying
hasBeenCancelled func() bool
}

Expand All @@ -35,7 +35,7 @@ func (c *claim) sendResult(result error) {

// Try returns true for success (or failure), false if we need to try again later
func (c *claim) Try(alloc *Allocator) bool {
if c.hasBeenCancelled() {
if c.hasBeenCancelled != nil && c.hasBeenCancelled() {
c.Cancel()
return true
}
Expand Down
11 changes: 8 additions & 3 deletions ipam/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type mockDB struct{}
func (d *mockDB) Load(_ string, _ interface{}) (bool, error) { return false, nil }
func (d *mockDB) Save(_ string, _ interface{}) error { return nil }

func makeAllocator(name string, cidrStr string, quorum uint) (*Allocator, address.CIDR) {
func makeAllocator(name string, cidrStr string, quorum uint, preClaims ...PreClaim) (*Allocator, address.CIDR) {
peername, err := mesh.PeerNameFromString(name)
if err != nil {
panic(err)
Expand All @@ -145,6 +145,7 @@ func makeAllocator(name string, cidrStr string, quorum uint) (*Allocator, addres
OurNickname: "nick-" + name,
Universe: cidr,
IsObserver: quorum == 0,
PreClaims: preClaims,
Quorum: func() uint { return quorum },
Db: new(mockDB),
IsKnownPeer: func(mesh.PeerName) bool { return true },
Expand Down Expand Up @@ -213,15 +214,19 @@ func AssertNothingSentErr(t *testing.T, ch <-chan error) {
}
}

func makeNetworkOfAllocators(size int, cidr string) ([]*Allocator, *gossip.TestRouter, address.CIDR) {
func makeNetworkOfAllocators(size int, cidr string, preClaims ...[]PreClaim) ([]*Allocator, *gossip.TestRouter, address.CIDR) {
gossipRouter := gossip.NewTestRouter(0.0)
allocs := make([]*Allocator, size)
var subnet address.CIDR

for i := 0; i < size; i++ {
var alloc *Allocator
preClaim := []PreClaim{}
if i < len(preClaims) {
preClaim = preClaims[i]
}
alloc, subnet = makeAllocator(fmt.Sprintf("%02d:00:00:02:00:00", i),
cidr, uint(size/2+1))
cidr, uint(size/2+1), preClaim...)
alloc.SetInterfaces(gossipRouter.Connect(alloc.ourName, alloc))
alloc.Start()
allocs[i] = alloc
Expand Down
17 changes: 0 additions & 17 deletions prog/weave-kube/launch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,13 @@ if [ -z "$IPALLOC_INIT" ]; then
IPALLOC_INIT="consensus=$(peer_count $KUBE_PEERS)"
fi

reclaim_ips() {
ID=$1
shift
for CIDR in "$@" ; do
curl -s -S -X PUT "$HTTP_ADDR/ip/$ID/$CIDR" || true
done
}

post_start_actions() {
# Wait for weave process to become responsive
while true ; do
curl $HTTP_ADDR/status >/dev/null 2>&1 && break
sleep 1
done

# Tell the newly-started weave about existing weave bridge IPs
/usr/bin/weaveutil container-addrs weave weave:expose | while read ID IFACE MAC IPS; do
reclaim_ips "weave:expose" $IPS
done
# Tell weave about existing weave process IPs
/usr/bin/weaveutil process-addrs weave | while read ID IFACE MAC IPS; do
reclaim_ips "_" $IPS
done

# Install CNI plugin binary to typical CNI bin location
# with fall-back to CNI directory used by kube-up on GCI OS
if ! mkdir -p $HOST_ROOT/opt/cni/bin ; then
Expand Down
14 changes: 9 additions & 5 deletions prog/weaver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,16 @@ func main() {
}
trackerName = "awsvpc"
}
allocator, defaultSubnet = createAllocator(router, ipamConfig, db, t, isKnownPeer)
observeContainers(allocator)
var allContainerIDs []string
if dockerCli != nil {
ids, err := dockerCli.AllContainerIDs()
allContainerIDs, err = dockerCli.AllContainerIDs()
checkFatal(err)
allocator.PruneOwned(ids)
}
preClaims, err := findExistingAddresses(dockerCli, allContainerIDs, weavenet.WeaveBridgeName)

This comment was marked as abuse.

This comment was marked as abuse.

checkFatal(err)
allocator, defaultSubnet = createAllocator(router, ipamConfig, preClaims, db, t, isKnownPeer)
observeContainers(allocator)
allocator.PruneOwned(allContainerIDs)
}

var (
Expand Down Expand Up @@ -445,7 +448,7 @@ func createOverlay(datapathName string, ifaceName string, isAWSVPC bool, host st
return overlay, bridge
}

func createAllocator(router *weave.NetworkRouter, config ipamConfig, db db.DB, track tracker.LocalRangeTracker, isKnownPeer func(mesh.PeerName) bool) (*ipam.Allocator, address.CIDR) {
func createAllocator(router *weave.NetworkRouter, config ipamConfig, preClaims []ipam.PreClaim, db db.DB, track tracker.LocalRangeTracker, isKnownPeer func(mesh.PeerName) bool) (*ipam.Allocator, address.CIDR) {
ipRange, err := ipam.ParseCIDRSubnet(config.IPRangeCIDR)
checkFatal(err)
defaultSubnet := ipRange
Expand All @@ -464,6 +467,7 @@ func createAllocator(router *weave.NetworkRouter, config ipamConfig, db db.DB, t
Seed: config.SeedPeerNames,
Universe: ipRange,
IsObserver: config.Observer,
PreClaims: preClaims,
Quorum: func() uint { return determineQuorum(config.PeerCount, router) },
Db: db,
IsKnownPeer: isKnownPeer,
Expand Down
84 changes: 84 additions & 0 deletions prog/weaver/reclaim.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.


import (
"net"

docker "github.com/fsouza/go-dockerclient"

"github.com/weaveworks/weave/api"
"github.com/weaveworks/weave/common"
weavedocker "github.com/weaveworks/weave/common/docker"
"github.com/weaveworks/weave/ipam"
weavenet "github.com/weaveworks/weave/net"
"github.com/weaveworks/weave/net/address"
)

func a(cidr *net.IPNet) address.CIDR {
prefixLength, _ := cidr.Mask.Size()
return address.CIDR{Addr: address.FromIP4(cidr.IP), PrefixLen: prefixLength}
}

// Get all the existing Weave IPs at startup, so we can stop IPAM
// giving out any as duplicates
func findExistingAddresses(dockerCli *weavedocker.Client, containerIDs []string, bridgeName string) (addrs []ipam.PreClaim, err error) {
Log.Infof("Checking for pre-existing addresses on %s bridge", bridgeName)
// First get the address for the bridge
bridgeNetDev, err := weavenet.GetBridgeNetDev(bridgeName)
if err != nil {
return nil, err
}
for _, cidr := range bridgeNetDev.CIDRs {
Log.Infof("%s bridge has address %v", bridgeName, cidr)
addrs = append(addrs, ipam.PreClaim{Ident: "weave:expose", Cidr: a(cidr)})
}

add := func(cid string, isContainer bool, netDevs []weavenet.Dev) {
for _, netDev := range netDevs {
for _, cidr := range netDev.CIDRs {
Log.Infof("Found address %v for ID %s", cidr, cid)
addrs = append(addrs, ipam.PreClaim{Ident: cid, IsContainer: isContainer, Cidr: a(cidr)})
}
}
}

// Then find all veths connected to the bridge
peerIDs, err := weavenet.ConnectedToBridgeVethPeerIds(bridgeName)
if err != nil {
return nil, err
}

// Now iterate over all containers to see if they have a network
// namespace with an attached interface
if dockerCli != nil {
for _, cid := range containerIDs {

This comment was marked as abuse.

This comment was marked as abuse.

container, err := dockerCli.InspectContainer(cid)
if err != nil {
if _, ok := err.(*docker.NoSuchContainer); ok {
continue
}
return nil, err
}
if container.State.Pid != 0 {
netDevs, err := weavenet.GetNetDevsByVethPeerIds(container.State.Pid, peerIDs)
if err != nil {
return nil, err
}
add(cid, true, netDevs)
}
}
} else {
// If we don't have a Docker connection, iterate over all processes
pids, err := common.AllPids("/proc")
if err != nil {
return nil, err
}
for _, pid := range pids {
netDevs, err := weavenet.GetNetDevsByVethPeerIds(pid, peerIDs)
if err != nil {
return nil, err
}
add(api.NoContainerID, false, netDevs)
}
}
return addrs, nil
}
1 change: 1 addition & 0 deletions test/320_claim_3_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ delete_persistence $HOST1 $HOST2
# Now make host1 attempt to claim from host2, when host2 is stopped
# the point being to check whether host1 will hang trying to talk to host2
weave_on $HOST2 launch-router --ipalloc-range $UNIVERSE
weave_on $HOST2 prime
# Introduce host3 to remember the IPAM CRDT when we stop host2
weave_on $HOST3 launch-router --ipalloc-range $UNIVERSE $HOST2
weave_on $HOST3 prime
Expand Down
27 changes: 1 addition & 26 deletions weave
Original file line number Diff line number Diff line change
Expand Up @@ -1304,25 +1304,6 @@ check_overlap() {
util_op netcheck $1 $BRIDGE
}

# Claim addresses for a container in IPAM. Expects to be called from
# with_container_addresses.
ipam_reclaim() {
CONTAINER_ID="$1"
# The weave IP addresses of containers attached by the plugin are
# recorded specially in IPAM, since the container id is not know
# at the time.
[ "$2" = "$CONTAINER_IFNAME" ] || CONTAINER_ID="_"
for CIDR in $4 ; do
http_call $HTTP_ADDR PUT "/ip/$CONTAINER_ID/$CIDR?noErrorOnUnknown=true&?check-alive=true" || true
done
}

ipam_reclaim_no_check_alive() {
for CIDR in $4 ; do
http_call $HTTP_ADDR PUT /ip/$1/$CIDR?noErrorOnUnknown=true || true
done
}

detect_awsvpc() {
# Ignoring errors here: if we cannot detect AWSVPC we will skip the relevant
# steps, because "attach" should work without the weave router running.
Expand Down Expand Up @@ -1691,6 +1672,7 @@ launch_router() {
ROUTER_CONTAINER=$(docker run -d --name=$CONTAINER_NAME \
$(docker_run_options) \
$RESTART_POLICY \
--pid=host \
--volumes-from $DB_CONTAINER_NAME \
-v $RESOLV_CONF_DIR:/var/run/weave/etc \
-e WEAVE_PASSWORD \
Expand Down Expand Up @@ -1728,17 +1710,10 @@ setup_awsvpc() {
# Recreate the parameter values that are set when the router is first launched
fetch_router_args() {
CONTAINER_ARGS=$(docker inspect -f '{{.Args}}' $CONTAINER_NAME) || return 1
IPRANGE=$(echo $CONTAINER_ARGS | grep -o -E -e '-ipalloc-range [0-9/.]+') || true
NO_DNS_OPT=$(echo $CONTAINER_ARGS | grep -o -e '--no-dns') || true
}

populate_router() {
if [ -n "$IPRANGE" ] ; then
# Tell the newly-started weave IP allocator about existing weave IPs
# In the case of AWSVPC, we do expose before calling populate_router
[ -n "$AWSVPC" ] || with_container_addresses ipam_reclaim_no_check_alive weave:expose
with_container_addresses ipam_reclaim $(docker ps -q --no-trunc)
fi
if [ -z "$NO_DNS_OPT" ] ; then
# Tell the newly-started weaveDNS about existing weave IPs
for CONTAINER in $(docker ps -q --no-trunc) ; do
Expand Down