Skip to content
This repository has been archived by the owner on Oct 22, 2019. It is now read-only.

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use quorum=True for etcd reads in IPAM.
Browse files Browse the repository at this point in the history
Fixes #47
Spike Curtis committed Nov 4, 2015
1 parent 5ecb2af commit 1b43dcf
Showing 2 changed files with 122 additions and 19 deletions.
30 changes: 21 additions & 9 deletions calico_containers/pycalico/ipam.py
Original file line number Diff line number Diff line change
@@ -29,7 +29,8 @@
get_block_cidr_for_address,
BLOCK_PREFIXLEN,
AlreadyAssignedError,
AddressNotAssignedError)
AddressNotAssignedError,
NoHostAffinityWarning)
from pycalico.handle import (AllocationHandle,
AddressCountTooLow)
from pycalico.util import get_hostname
@@ -57,7 +58,10 @@ def _read_block(self, block_cidr):
"""
key = _block_datastore_key(block_cidr)
try:
result = self.etcd_client.read(key)
# Use quorum=True to ensure we don't get stale reads. Without this
# we allow many subtle race conditions, such as creating a block,
# then later reading it and finding it doesn't exist.
result = self.etcd_client.read(key, quorum=True)
except EtcdKeyNotFound:
raise KeyError(str(block_cidr))
block = AllocationBlock.from_etcd_result(result)
@@ -98,7 +102,7 @@ def _get_affine_blocks(self, host, version, pool):
"version": version}
block_ids = []
try:
result = self.etcd_client.read(path).children
result = self.etcd_client.read(path, quorum=True).children
for child in result:
packed = child.key.split("/")
if len(packed) == 9:
@@ -141,7 +145,7 @@ def _new_affine_block(self, host, version, pool):
_log.debug("Checking if block %s is free.", block_id)
key = _block_datastore_key(block_cidr)
try:
_ = self.etcd_client.read(key)
_ = self.etcd_client.read(key, quorum=True)
except EtcdKeyNotFound:
_log.debug("Found block %s free.", block_id)
try:
@@ -292,7 +296,7 @@ def _read_handle(self, handle_id):
"""
key = _handle_datastore_key(handle_id)
try:
result = self.etcd_client.read(key)
result = self.etcd_client.read(key, quorum=True)
except EtcdKeyNotFound:
raise KeyError(handle_id)
handle = AllocationHandle.from_etcd_result(result)
@@ -455,10 +459,18 @@ def _auto_assign(self, ip_version, num, handle_id,
_log.info("Ran out of affine blocks for %s in pool %s",
hostname, pool)
break
ips = self._auto_assign_block(block_id,
num_remaining,
handle_id,
attributes)
try:
ips = self._auto_assign_block(block_id,
num_remaining,
handle_id,
attributes)
except (KeyError, NoHostAffinityWarning):
# In certain rare race conditions, _get_affine_blocks above
# can return block_ids that don't exist or don't actually have
# affinity to this host (due to multiple IPAM clients on this
# host running simultaneously). If that happens, just move to
# the next one.
continue
allocated_ips.extend(ips)
num_remaining = num - len(allocated_ips)

111 changes: 101 additions & 10 deletions calico_containers/tests/unit/ipam_test.py
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@

network = IPNetwork("192.168.25.0/24")
BLOCK_V4_2 = IPNetwork("10.11.45.0/26")
BLOCK_V4_3 = IPNetwork("10.11.47.0/26")


class TestIPAMClient(unittest.TestCase):
@@ -252,8 +253,9 @@ def m_get_affine_blocks(self, host, ip_version, pool):
# Read returns appropriate result based on key.
read_results = {m_resultb.key: m_resultb,
m_resulth.key: m_resulth}
def read(key):
def read(key, quorum):
""" Return a copy of the current stored value depending on key."""
assert quorum
return copy.copy(read_results[key])
self.m_etcd_client.read.side_effect = read

@@ -304,8 +306,9 @@ def m_get_affine_blocks(self, host, ip_version, pool):
m_resultb.value = block.to_json()
m_resultb.key = "/calico/ipam/v2/assignment/ipv4/block/10.11.12.0-24"

def read(key):
def read(key, quorum):
""" Return a copy of the current stored value depending on key."""
assert quorum
return copy.copy(m_resultb)
self.m_etcd_client.read.side_effect = read
self.m_etcd_client.update.side_effect = EtcdCompareFailed()
@@ -395,6 +398,87 @@ def m_get_ip_pools(self, version):
for ip in ipv4s:
assert_true(ip in rando_block.cidr)

@patch("pycalico.block.get_hostname", return_value="test_host1")
def test_auto_assign_bad_affinity(self, m_get_hostname):
"""
Test auto assign when _get_affine_blocks returns some blocks that
don't exist or don't actually have host affinity.
This is a race condition that occurs because _get_affine_blocks only
checks the IPAM_HOST_AFFINITY_PATH to determine what blocks have
affinity to the host; it does not actually read the blocks themselves
to check affinity.
The race occurs because while attempting to allocate a new block with
affinity to this host, the IPAM client first writes to the
IPAM_HOST_AFFINITY_PATH before it writes to the block itself. If
multiple IPAM clients are running on behalf of the host, the race can
go something like this:
1. Client A is allocating a new affine block, and writes the block_id
to IPAM_HOST_AFFINITY_PATH.
2. Client B needs to assign an address, so it reads the
IPAM_HOST_AFFINITY_PATH.
3. Client B attempts to read the block. This fails, throwing a
KeyError.
4. Client A writes the new block.
If 4 happened before 3 we'd be fine.
Or consider a related scenario.
1. Client A is allocating a new affine block, and writes the block_id
to IPAM_HOST_AFFINITY_PATH.
2. Client B needs to assign an address, so it reads the
IPAM_HOST_AFFINITY_PATH.
3. A different host claims affinity for the block, and writes the new
block.
4. Client A attempts to write the block and fails, and cleans up the
IPAM_HOST_AFFINITY_PATH.
5. Client B attempts to read the block, but when it tries to auto
assign from the block, it fails because a different host has
affinity. This throws a NoHostAffinityWarning.
"""

affine_blocks = [BLOCK_V4_1,
BLOCK_V4_2,
BLOCK_V4_3]

def m_get_affine_blocks(self, host, ip_version, pool):
return affine_blocks

def m_read_block(self, block_cidr):
if block_cidr is BLOCK_V4_1:
# This block doesn't yet exist.
raise KeyError()
elif block_cidr is BLOCK_V4_2:
# This block exists, but we don't have host affinity to it.
block = AllocationBlock(BLOCK_V4_2, "test_host2")
elif block_cidr is BLOCK_V4_3:
# This block exists and we have host affinity. Allocated IPs
# should come from this block.
block = AllocationBlock(BLOCK_V4_3, "test_host1")
else:
# Success on BLOCK_V4_3, so no additional blocks should be
# read.
assert_true(False)
return block

def m_get_ip_pools(self, version):
return [IPPool("10.11.0.0/18")]

with patch("pycalico.ipam.BlockHandleReaderWriter._get_affine_blocks",
m_get_affine_blocks),\
patch("pycalico.datastore.DatastoreClient.get_ip_pools",
m_get_ip_pools),\
patch("pycalico.ipam.BlockHandleReaderWriter._read_block",
m_read_block):
(ipv4s, ipv6s) = self.client.auto_assign_ips(4, 0, None, {})
assert_equal(len(ipv4s), 4)
for ip in ipv4s:
assert_true(ip in BLOCK_V4_3)

@patch("pycalico.block.get_hostname", return_value="test_host1")
def test_assign(self, m_get_hostname):
"""
@@ -464,8 +548,9 @@ def test_assign_with_handle_cas_fails(self, m_get_hostname):
# Read returns appropriate result based on key.
read_results = {m_resultb.key: m_resultb,
m_resulth.key: m_resulth}
def read(key):
def read(key, quorum):
""" Return a copy of the current stored value depending on key."""
assert quorum
return copy.copy(read_results[key])
self.m_etcd_client.read.side_effect = read

@@ -513,7 +598,8 @@ def test_assign_persistent_cas_fails(self, m_get_hostname):
block = _test_block_empty_v4()
m_result0 = Mock(spec=EtcdResult)
m_result0.value = block.to_json()
def read(key):
def read(key, quorum):
assert quorum
return copy.copy(m_result0)
self.m_etcd_client.read.side_effect = read

@@ -941,7 +1027,8 @@ def test_release_ip_by_handle_cas_error(self):
read_results = {m_resulth.key: m_resulth,
m_resultb4.key: m_resultb4,
m_resultb6.key: m_resultb6}
def read(key):
def read(key, quorum):
assert quorum
return copy.copy(read_results[key])
self.m_etcd_client.read.side_effect = read

@@ -1022,7 +1109,8 @@ def test_release_ip_by_handle_no_ips(self):
# Mock out read.
read_results = {m_resulth.key: m_resulth,
m_resultb4.key: m_resultb4}
def read(key):
def read(key, quorum):
assert quorum
return read_results[key]
self.m_etcd_client.read.side_effect = read

@@ -1172,7 +1260,8 @@ def test_get_affine_blocks(self):
expected_ids = ["192.168.3.0/26", "192.168.5.0/26"]

# Return some blocks.
def m_read(path):
def m_read(path, quorum):
assert quorum
assert path == "/calico/ipam/v2/host/test_host/ipv4/block/"
result = Mock(spec=EtcdResult)
children = []
@@ -1195,7 +1284,8 @@ def test_get_affine_blocks_empty(self):
expected_ids = []

# Return some blocks.
def m_read(path):
def m_read(path, quorum):
assert quorum
assert path == "/calico/ipam/v2/host/test_host/ipv4/block/"
result = Mock(spec=EtcdResult)
result.children = iter([])
@@ -1224,7 +1314,8 @@ def test_get_affine_blocks_pool(self):
returned_ids = ["192.168.3.0/26", "10.10.1.0/26"]

# Return some blocks.
def m_read(path):
def m_read(path, quorum):
assert quorum
assert path == "/calico/ipam/v2/host/test_host/ipv4/block/"
result = Mock(spec=EtcdResult)
children = []
@@ -1269,7 +1360,7 @@ def test_claim_block_affinity_already_owned(self):
self.m_etcd_client.write.assert_has_calls([call(ANY, ""),
call(key, value,
prevExist=False)])
self.m_etcd_client.read.assert_called_once_with(key)
self.m_etcd_client.read.assert_called_once_with(key, quorum=True)

def test_new_affine_block_race(self):
"""

0 comments on commit 1b43dcf

Please sign in to comment.