diff --git a/calico_containers/pycalico/ipam.py b/calico_containers/pycalico/ipam.py index a3ce1794..8e603e34 100644 --- a/calico_containers/pycalico/ipam.py +++ b/calico_containers/pycalico/ipam.py @@ -29,7 +29,8 @@ get_block_cidr_for_address, BLOCK_PREFIXLEN, AlreadyAssignedError, - AddressNotAssignedError) + AddressNotAssignedError, + NoHostAffinityWarning) from pycalico.handle import (AllocationHandle, AddressCountTooLow) from pycalico.util import get_hostname @@ -39,6 +40,8 @@ RETRIES = 100 +KEY_ERROR_RETRIES = 3 + class BlockHandleReaderWriter(DatastoreClient): """ @@ -57,7 +60,10 @@ def _read_block(self, block_cidr): """ key = _block_datastore_key(block_cidr) try: - result = self.etcd_client.read(key) + # Use quorum=True to ensure we don't get stale reads. Without this + # we allow many subtle race conditions, such as creating a block, + # then later reading it and finding it doesn't exist. + result = self.etcd_client.read(key, quorum=True) except EtcdKeyNotFound: raise KeyError(str(block_cidr)) block = AllocationBlock.from_etcd_result(result) @@ -98,7 +104,7 @@ def _get_affine_blocks(self, host, version, pool): "version": version} block_ids = [] try: - result = self.etcd_client.read(path).children + result = self.etcd_client.read(path, quorum=True).children for child in result: packed = child.key.split("/") if len(packed) == 9: @@ -141,7 +147,7 @@ def _new_affine_block(self, host, version, pool): _log.debug("Checking if block %s is free.", block_id) key = _block_datastore_key(block_cidr) try: - _ = self.etcd_client.read(key) + _ = self.etcd_client.read(key, quorum=True) except EtcdKeyNotFound: _log.debug("Found block %s free.", block_id) try: @@ -292,7 +298,7 @@ def _read_handle(self, handle_id): """ key = _handle_datastore_key(handle_id) try: - result = self.etcd_client.read(key) + result = self.etcd_client.read(key, quorum=True) except EtcdKeyNotFound: raise KeyError(handle_id) handle = AllocationHandle.from_etcd_result(result) @@ -444,21 +450,48 @@ def _auto_assign(self, ip_version, num, handle_id, block_list = self._get_affine_blocks(hostname, ip_version, pool) - block_ids = iter(block_list) + block_ids = list(block_list) + key_errors = 0 allocated_ips = [] num_remaining = num while num_remaining > 0: try: - block_id = block_ids.next() - except StopIteration: + block_id = block_ids.pop(0) + except IndexError: _log.info("Ran out of affine blocks for %s in pool %s", hostname, pool) break - ips = self._auto_assign_block(block_id, - num_remaining, - handle_id, - attributes) + try: + ips = self._auto_assign_block(block_id, + num_remaining, + handle_id, + attributes) + except KeyError: + # In certain rare race conditions, _get_affine_blocks above + # can return block_ids that don't exist (due to multiple IPAM + # clients on this host running simultaneously). If that + # happens, requeue the block_id for a retry, since we expect + # the other IPAM client to shortly create the block. To stop + # endless looping we limit the number of KeyErrors that will + # generate a retry. + _log.warning("Tried to auto-assign to block %s. Doesn't " + "exist.", block_id) + key_errors += 1 + if key_errors <= KEY_ERROR_RETRIES: + _log.debug("Queueing block %s for retry.", block_id) + block_ids.append(block_id) + else: + _log.warning("Stopping retry of block %s.", block_id) + continue + except NoHostAffinityWarning: + # In certain rare race conditions, _get_affine_blocks above + # can return block_ids that don't actually have affinity to + # this host (due to multiple IPAM clients on this host running + # simultaneously). If that happens, just move to the next one. + _log.warning("No host affinity on block %s; skipping.", + block_id) + continue allocated_ips.extend(ips) num_remaining = num - len(allocated_ips) diff --git a/calico_containers/tests/unit/ipam_test.py b/calico_containers/tests/unit/ipam_test.py index 78250f4d..c5b42bcd 100644 --- a/calico_containers/tests/unit/ipam_test.py +++ b/calico_containers/tests/unit/ipam_test.py @@ -31,6 +31,7 @@ network = IPNetwork("192.168.25.0/24") BLOCK_V4_2 = IPNetwork("10.11.45.0/26") +BLOCK_V4_3 = IPNetwork("10.11.47.0/26") class TestIPAMClient(unittest.TestCase): @@ -252,8 +253,9 @@ def m_get_affine_blocks(self, host, ip_version, pool): # Read returns appropriate result based on key. read_results = {m_resultb.key: m_resultb, m_resulth.key: m_resulth} - def read(key): + def read(key, quorum): """ Return a copy of the current stored value depending on key.""" + assert quorum return copy.copy(read_results[key]) self.m_etcd_client.read.side_effect = read @@ -304,8 +306,9 @@ def m_get_affine_blocks(self, host, ip_version, pool): m_resultb.value = block.to_json() m_resultb.key = "/calico/ipam/v2/assignment/ipv4/block/10.11.12.0-24" - def read(key): + def read(key, quorum): """ Return a copy of the current stored value depending on key.""" + assert quorum return copy.copy(m_resultb) self.m_etcd_client.read.side_effect = read self.m_etcd_client.update.side_effect = EtcdCompareFailed() @@ -395,6 +398,133 @@ def m_get_ip_pools(self, version): for ip in ipv4s: assert_true(ip in rando_block.cidr) + @patch("pycalico.block.get_hostname", return_value="test_host1") + def test_auto_assign_bad_affinity(self, m_get_hostname): + """ + Test auto assign when _get_affine_blocks returns some blocks that + don't exist or don't actually have host affinity. + + This is a race condition that occurs because _get_affine_blocks only + checks the IPAM_HOST_AFFINITY_PATH to determine what blocks have + affinity to the host; it does not actually read the blocks themselves + to check affinity. + + The race occurs because while attempting to allocate a new block with + affinity to this host, the IPAM client first writes to the + IPAM_HOST_AFFINITY_PATH before it writes to the block itself. If + multiple IPAM clients are running on behalf of the host, the race can + go something like this: + + 1. Client A is allocating a new affine block, and writes the block_id + to IPAM_HOST_AFFINITY_PATH. + 2. Client B needs to assign an address, so it reads the + IPAM_HOST_AFFINITY_PATH. + 3. Client B attempts to read the block. This fails, throwing a + KeyError. + 4. Client A writes the new block. + + If 4 happened before 3 we'd be fine. + + Or consider a related scenario. + + 1. Client A is allocating a new affine block, and writes the block_id + to IPAM_HOST_AFFINITY_PATH. + 2. Client B needs to assign an address, so it reads the + IPAM_HOST_AFFINITY_PATH. + 3. A different host claims affinity for the block, and writes the new + block. + 4. Client A attempts to write the block and fails, and cleans up the + IPAM_HOST_AFFINITY_PATH. + 5. Client B attempts to read the block, but when it tries to auto + assign from the block, it fails because a different host has + affinity. This throws a NoHostAffinityWarning. + + """ + + affine_blocks = [BLOCK_V4_1, + BLOCK_V4_2, + BLOCK_V4_3] + + def m_get_affine_blocks(self, host, ip_version, pool): + return affine_blocks + + def m_read_block(self, block_cidr): + if block_cidr is BLOCK_V4_1: + # This block doesn't yet exist. + raise KeyError() + elif block_cidr is BLOCK_V4_2: + # This block exists, but we don't have host affinity to it. + block = AllocationBlock(BLOCK_V4_2, "test_host2") + elif block_cidr is BLOCK_V4_3: + # This block exists and we have host affinity. Allocated IPs + # should come from this block. + block = AllocationBlock(BLOCK_V4_3, "test_host1") + else: + # Success on BLOCK_V4_3, so no additional blocks should be + # read. + assert_true(False) + return block + + def m_get_ip_pools(self, version): + return [IPPool("10.11.0.0/18")] + + with patch("pycalico.ipam.BlockHandleReaderWriter._get_affine_blocks", + m_get_affine_blocks),\ + patch("pycalico.datastore.DatastoreClient.get_ip_pools", + m_get_ip_pools),\ + patch("pycalico.ipam.BlockHandleReaderWriter._read_block", + m_read_block): + (ipv4s, ipv6s) = self.client.auto_assign_ips(4, 0, None, {}) + assert_equal(len(ipv4s), 4) + for ip in ipv4s: + assert_true(ip in BLOCK_V4_3) + + @patch("pycalico.block.get_hostname", return_value="test_host1") + def test_auto_assign_affinity_key_err_retries(self, m_get_hostname): + """ + Test auto assign when _get_affine_blocks returns some blocks that + don't exist and we hit the maximum number of retries. + """ + + affine_blocks = [BLOCK_V4_1] + + def m_get_affine_blocks(self, host, ip_version, pool): + return affine_blocks + + # 4 attempts to read BLOCK_V4_1, then one attempt to read + # first_free_block + first_free_block = IPNetwork("10.11.0.0/26") + block = AllocationBlock(first_free_block, "test_host1") + m_read_block = Mock() + m_read_block.side_effect = [KeyError(), + KeyError(), + KeyError(), + KeyError(), + block] + # Note that _get_new_affine_block calls etcd_client.read() directly. + self.m_etcd_client.read.side_effect = EtcdKeyNotFound() + + def m_get_ip_pools(self, version): + return [IPPool("10.11.0.0/18")] + + with patch("pycalico.ipam.BlockHandleReaderWriter._get_affine_blocks", + m_get_affine_blocks),\ + patch("pycalico.datastore.DatastoreClient.get_ip_pools", + m_get_ip_pools),\ + patch("pycalico.ipam.BlockHandleReaderWriter._read_block", + m_read_block): + (ipv4s, ipv6s) = self.client.auto_assign_ips(4, 0, None, {}) + assert_equal(len(ipv4s), 4) + for ip in ipv4s: + assert_true(ip in first_free_block) + m_read_block.assert_has_calls([ + call(BLOCK_V4_1), + call(BLOCK_V4_1), + call(BLOCK_V4_1), + call(BLOCK_V4_1), + call(first_free_block) + ]) + @patch("pycalico.block.get_hostname", return_value="test_host1") def test_assign(self, m_get_hostname): """ @@ -464,8 +594,9 @@ def test_assign_with_handle_cas_fails(self, m_get_hostname): # Read returns appropriate result based on key. read_results = {m_resultb.key: m_resultb, m_resulth.key: m_resulth} - def read(key): + def read(key, quorum): """ Return a copy of the current stored value depending on key.""" + assert quorum return copy.copy(read_results[key]) self.m_etcd_client.read.side_effect = read @@ -513,7 +644,8 @@ def test_assign_persistent_cas_fails(self, m_get_hostname): block = _test_block_empty_v4() m_result0 = Mock(spec=EtcdResult) m_result0.value = block.to_json() - def read(key): + def read(key, quorum): + assert quorum return copy.copy(m_result0) self.m_etcd_client.read.side_effect = read @@ -941,7 +1073,8 @@ def test_release_ip_by_handle_cas_error(self): read_results = {m_resulth.key: m_resulth, m_resultb4.key: m_resultb4, m_resultb6.key: m_resultb6} - def read(key): + def read(key, quorum): + assert quorum return copy.copy(read_results[key]) self.m_etcd_client.read.side_effect = read @@ -1022,7 +1155,8 @@ def test_release_ip_by_handle_no_ips(self): # Mock out read. read_results = {m_resulth.key: m_resulth, m_resultb4.key: m_resultb4} - def read(key): + def read(key, quorum): + assert quorum return read_results[key] self.m_etcd_client.read.side_effect = read @@ -1172,7 +1306,8 @@ def test_get_affine_blocks(self): expected_ids = ["192.168.3.0/26", "192.168.5.0/26"] # Return some blocks. - def m_read(path): + def m_read(path, quorum): + assert quorum assert path == "/calico/ipam/v2/host/test_host/ipv4/block/" result = Mock(spec=EtcdResult) children = [] @@ -1195,7 +1330,8 @@ def test_get_affine_blocks_empty(self): expected_ids = [] # Return some blocks. - def m_read(path): + def m_read(path, quorum): + assert quorum assert path == "/calico/ipam/v2/host/test_host/ipv4/block/" result = Mock(spec=EtcdResult) result.children = iter([]) @@ -1224,7 +1360,8 @@ def test_get_affine_blocks_pool(self): returned_ids = ["192.168.3.0/26", "10.10.1.0/26"] # Return some blocks. - def m_read(path): + def m_read(path, quorum): + assert quorum assert path == "/calico/ipam/v2/host/test_host/ipv4/block/" result = Mock(spec=EtcdResult) children = [] @@ -1269,7 +1406,7 @@ def test_claim_block_affinity_already_owned(self): self.m_etcd_client.write.assert_has_calls([call(ANY, ""), call(key, value, prevExist=False)]) - self.m_etcd_client.read.assert_called_once_with(key) + self.m_etcd_client.read.assert_called_once_with(key, quorum=True) def test_new_affine_block_race(self): """