Skip to content
This repository has been archived by the owner on Oct 22, 2019. It is now read-only.

Commit

Permalink
Use quorum=True for etcd reads in IPAM.
Browse files Browse the repository at this point in the history
Fixes #47
  • Loading branch information
Spike Curtis committed Nov 5, 2015
1 parent 5ecb2af commit f880765
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 22 deletions.
57 changes: 45 additions & 12 deletions calico_containers/pycalico/ipam.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
get_block_cidr_for_address,
BLOCK_PREFIXLEN,
AlreadyAssignedError,
AddressNotAssignedError)
AddressNotAssignedError,
NoHostAffinityWarning)
from pycalico.handle import (AllocationHandle,
AddressCountTooLow)
from pycalico.util import get_hostname
Expand All @@ -39,6 +40,8 @@

RETRIES = 100

KEY_ERROR_RETRIES = 3


class BlockHandleReaderWriter(DatastoreClient):
"""
Expand All @@ -57,7 +60,10 @@ def _read_block(self, block_cidr):
"""
key = _block_datastore_key(block_cidr)
try:
result = self.etcd_client.read(key)
# Use quorum=True to ensure we don't get stale reads. Without this
# we allow many subtle race conditions, such as creating a block,
# then later reading it and finding it doesn't exist.
result = self.etcd_client.read(key, quorum=True)
except EtcdKeyNotFound:
raise KeyError(str(block_cidr))
block = AllocationBlock.from_etcd_result(result)
Expand Down Expand Up @@ -98,7 +104,7 @@ def _get_affine_blocks(self, host, version, pool):
"version": version}
block_ids = []
try:
result = self.etcd_client.read(path).children
result = self.etcd_client.read(path, quorum=True).children
for child in result:
packed = child.key.split("/")
if len(packed) == 9:
Expand Down Expand Up @@ -141,7 +147,7 @@ def _new_affine_block(self, host, version, pool):
_log.debug("Checking if block %s is free.", block_id)
key = _block_datastore_key(block_cidr)
try:
_ = self.etcd_client.read(key)
_ = self.etcd_client.read(key, quorum=True)
except EtcdKeyNotFound:
_log.debug("Found block %s free.", block_id)
try:
Expand Down Expand Up @@ -292,7 +298,7 @@ def _read_handle(self, handle_id):
"""
key = _handle_datastore_key(handle_id)
try:
result = self.etcd_client.read(key)
result = self.etcd_client.read(key, quorum=True)
except EtcdKeyNotFound:
raise KeyError(handle_id)
handle = AllocationHandle.from_etcd_result(result)
Expand Down Expand Up @@ -444,21 +450,48 @@ def _auto_assign(self, ip_version, num, handle_id,
block_list = self._get_affine_blocks(hostname,
ip_version,
pool)
block_ids = iter(block_list)
block_ids = list(block_list)
key_errors = 0
allocated_ips = []

num_remaining = num
while num_remaining > 0:
try:
block_id = block_ids.next()
except StopIteration:
block_id = block_ids.pop(0)
except IndexError:
_log.info("Ran out of affine blocks for %s in pool %s",
hostname, pool)
break
ips = self._auto_assign_block(block_id,
num_remaining,
handle_id,
attributes)
try:
ips = self._auto_assign_block(block_id,
num_remaining,
handle_id,
attributes)
except KeyError:
# In certain rare race conditions, _get_affine_blocks above
# can return block_ids that don't exist (due to multiple IPAM
# clients on this host running simultaneously). If that
# happens, requeue the block_id for a retry, since we expect
# the other IPAM client to shortly create the block. To stop
# endless looping we limit the number of KeyErrors that will
# generate a retry.
_log.warning("Tried to auto-assign to block %s. Doesn't "
"exist.", block_id)
key_errors += 1
if key_errors <= KEY_ERROR_RETRIES:
_log.debug("Queueing block %s for retry.", block_id)
block_ids.append(block_id)
else:
_log.warning("Stopping retry of block %s.", block_id)
continue
except NoHostAffinityWarning:
# In certain rare race conditions, _get_affine_blocks above
# can return block_ids that don't actually have affinity to
# this host (due to multiple IPAM clients on this host running
# simultaneously). If that happens, just move to the next one.
_log.warning("No host affinity on block %s; skipping.",
block_id)
continue
allocated_ips.extend(ips)
num_remaining = num - len(allocated_ips)

Expand Down
157 changes: 147 additions & 10 deletions calico_containers/tests/unit/ipam_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

network = IPNetwork("192.168.25.0/24")
BLOCK_V4_2 = IPNetwork("10.11.45.0/26")
BLOCK_V4_3 = IPNetwork("10.11.47.0/26")


class TestIPAMClient(unittest.TestCase):
Expand Down Expand Up @@ -252,8 +253,9 @@ def m_get_affine_blocks(self, host, ip_version, pool):
# Read returns appropriate result based on key.
read_results = {m_resultb.key: m_resultb,
m_resulth.key: m_resulth}
def read(key):
def read(key, quorum):
""" Return a copy of the current stored value depending on key."""
assert quorum
return copy.copy(read_results[key])
self.m_etcd_client.read.side_effect = read

Expand Down Expand Up @@ -304,8 +306,9 @@ def m_get_affine_blocks(self, host, ip_version, pool):
m_resultb.value = block.to_json()
m_resultb.key = "/calico/ipam/v2/assignment/ipv4/block/10.11.12.0-24"

def read(key):
def read(key, quorum):
""" Return a copy of the current stored value depending on key."""
assert quorum
return copy.copy(m_resultb)
self.m_etcd_client.read.side_effect = read
self.m_etcd_client.update.side_effect = EtcdCompareFailed()
Expand Down Expand Up @@ -395,6 +398,133 @@ def m_get_ip_pools(self, version):
for ip in ipv4s:
assert_true(ip in rando_block.cidr)

@patch("pycalico.block.get_hostname", return_value="test_host1")
def test_auto_assign_bad_affinity(self, m_get_hostname):
"""
Test auto assign when _get_affine_blocks returns some blocks that
don't exist or don't actually have host affinity.
This is a race condition that occurs because _get_affine_blocks only
checks the IPAM_HOST_AFFINITY_PATH to determine what blocks have
affinity to the host; it does not actually read the blocks themselves
to check affinity.
The race occurs because while attempting to allocate a new block with
affinity to this host, the IPAM client first writes to the
IPAM_HOST_AFFINITY_PATH before it writes to the block itself. If
multiple IPAM clients are running on behalf of the host, the race can
go something like this:
1. Client A is allocating a new affine block, and writes the block_id
to IPAM_HOST_AFFINITY_PATH.
2. Client B needs to assign an address, so it reads the
IPAM_HOST_AFFINITY_PATH.
3. Client B attempts to read the block. This fails, throwing a
KeyError.
4. Client A writes the new block.
If 4 happened before 3 we'd be fine.
Or consider a related scenario.
1. Client A is allocating a new affine block, and writes the block_id
to IPAM_HOST_AFFINITY_PATH.
2. Client B needs to assign an address, so it reads the
IPAM_HOST_AFFINITY_PATH.
3. A different host claims affinity for the block, and writes the new
block.
4. Client A attempts to write the block and fails, and cleans up the
IPAM_HOST_AFFINITY_PATH.
5. Client B attempts to read the block, but when it tries to auto
assign from the block, it fails because a different host has
affinity. This throws a NoHostAffinityWarning.
"""

affine_blocks = [BLOCK_V4_1,
BLOCK_V4_2,
BLOCK_V4_3]

def m_get_affine_blocks(self, host, ip_version, pool):
return affine_blocks

def m_read_block(self, block_cidr):
if block_cidr is BLOCK_V4_1:
# This block doesn't yet exist.
raise KeyError()
elif block_cidr is BLOCK_V4_2:
# This block exists, but we don't have host affinity to it.
block = AllocationBlock(BLOCK_V4_2, "test_host2")
elif block_cidr is BLOCK_V4_3:
# This block exists and we have host affinity. Allocated IPs
# should come from this block.
block = AllocationBlock(BLOCK_V4_3, "test_host1")
else:
# Success on BLOCK_V4_3, so no additional blocks should be
# read.
assert_true(False)
return block

def m_get_ip_pools(self, version):
return [IPPool("10.11.0.0/18")]

with patch("pycalico.ipam.BlockHandleReaderWriter._get_affine_blocks",
m_get_affine_blocks),\
patch("pycalico.datastore.DatastoreClient.get_ip_pools",
m_get_ip_pools),\
patch("pycalico.ipam.BlockHandleReaderWriter._read_block",
m_read_block):
(ipv4s, ipv6s) = self.client.auto_assign_ips(4, 0, None, {})
assert_equal(len(ipv4s), 4)
for ip in ipv4s:
assert_true(ip in BLOCK_V4_3)

@patch("pycalico.block.get_hostname", return_value="test_host1")
def test_auto_assign_affinity_key_err_retries(self, m_get_hostname):
"""
Test auto assign when _get_affine_blocks returns some blocks that
don't exist and we hit the maximum number of retries.
"""

affine_blocks = [BLOCK_V4_1]

def m_get_affine_blocks(self, host, ip_version, pool):
return affine_blocks

# 4 attempts to read BLOCK_V4_1, then one attempt to read
# first_free_block
first_free_block = IPNetwork("10.11.0.0/26")
block = AllocationBlock(first_free_block, "test_host1")
m_read_block = Mock()
m_read_block.side_effect = [KeyError(),
KeyError(),
KeyError(),
KeyError(),
block]
# Note that _get_new_affine_block calls etcd_client.read() directly.
self.m_etcd_client.read.side_effect = EtcdKeyNotFound()

def m_get_ip_pools(self, version):
return [IPPool("10.11.0.0/18")]

with patch("pycalico.ipam.BlockHandleReaderWriter._get_affine_blocks",
m_get_affine_blocks),\
patch("pycalico.datastore.DatastoreClient.get_ip_pools",
m_get_ip_pools),\
patch("pycalico.ipam.BlockHandleReaderWriter._read_block",
m_read_block):
(ipv4s, ipv6s) = self.client.auto_assign_ips(4, 0, None, {})
assert_equal(len(ipv4s), 4)
for ip in ipv4s:
assert_true(ip in first_free_block)
m_read_block.assert_has_calls([
call(BLOCK_V4_1),
call(BLOCK_V4_1),
call(BLOCK_V4_1),
call(BLOCK_V4_1),
call(first_free_block)
])

@patch("pycalico.block.get_hostname", return_value="test_host1")
def test_assign(self, m_get_hostname):
"""
Expand Down Expand Up @@ -464,8 +594,9 @@ def test_assign_with_handle_cas_fails(self, m_get_hostname):
# Read returns appropriate result based on key.
read_results = {m_resultb.key: m_resultb,
m_resulth.key: m_resulth}
def read(key):
def read(key, quorum):
""" Return a copy of the current stored value depending on key."""
assert quorum
return copy.copy(read_results[key])
self.m_etcd_client.read.side_effect = read

Expand Down Expand Up @@ -513,7 +644,8 @@ def test_assign_persistent_cas_fails(self, m_get_hostname):
block = _test_block_empty_v4()
m_result0 = Mock(spec=EtcdResult)
m_result0.value = block.to_json()
def read(key):
def read(key, quorum):
assert quorum
return copy.copy(m_result0)
self.m_etcd_client.read.side_effect = read

Expand Down Expand Up @@ -941,7 +1073,8 @@ def test_release_ip_by_handle_cas_error(self):
read_results = {m_resulth.key: m_resulth,
m_resultb4.key: m_resultb4,
m_resultb6.key: m_resultb6}
def read(key):
def read(key, quorum):
assert quorum
return copy.copy(read_results[key])
self.m_etcd_client.read.side_effect = read

Expand Down Expand Up @@ -1022,7 +1155,8 @@ def test_release_ip_by_handle_no_ips(self):
# Mock out read.
read_results = {m_resulth.key: m_resulth,
m_resultb4.key: m_resultb4}
def read(key):
def read(key, quorum):
assert quorum
return read_results[key]
self.m_etcd_client.read.side_effect = read

Expand Down Expand Up @@ -1172,7 +1306,8 @@ def test_get_affine_blocks(self):
expected_ids = ["192.168.3.0/26", "192.168.5.0/26"]

# Return some blocks.
def m_read(path):
def m_read(path, quorum):
assert quorum
assert path == "/calico/ipam/v2/host/test_host/ipv4/block/"
result = Mock(spec=EtcdResult)
children = []
Expand All @@ -1195,7 +1330,8 @@ def test_get_affine_blocks_empty(self):
expected_ids = []

# Return some blocks.
def m_read(path):
def m_read(path, quorum):
assert quorum
assert path == "/calico/ipam/v2/host/test_host/ipv4/block/"
result = Mock(spec=EtcdResult)
result.children = iter([])
Expand Down Expand Up @@ -1224,7 +1360,8 @@ def test_get_affine_blocks_pool(self):
returned_ids = ["192.168.3.0/26", "10.10.1.0/26"]

# Return some blocks.
def m_read(path):
def m_read(path, quorum):
assert quorum
assert path == "/calico/ipam/v2/host/test_host/ipv4/block/"
result = Mock(spec=EtcdResult)
children = []
Expand Down Expand Up @@ -1269,7 +1406,7 @@ def test_claim_block_affinity_already_owned(self):
self.m_etcd_client.write.assert_has_calls([call(ANY, ""),
call(key, value,
prevExist=False)])
self.m_etcd_client.read.assert_called_once_with(key)
self.m_etcd_client.read.assert_called_once_with(key, quorum=True)

def test_new_affine_block_race(self):
"""
Expand Down

0 comments on commit f880765

Please sign in to comment.