diff --git a/calico_containers/pycalico/ipam.py b/calico_containers/pycalico/ipam.py index a3ce1794..28ed7731 100644 --- a/calico_containers/pycalico/ipam.py +++ b/calico_containers/pycalico/ipam.py @@ -29,7 +29,8 @@ get_block_cidr_for_address, BLOCK_PREFIXLEN, AlreadyAssignedError, - AddressNotAssignedError) + AddressNotAssignedError, + NoHostAffinityWarning) from pycalico.handle import (AllocationHandle, AddressCountTooLow) from pycalico.util import get_hostname @@ -57,7 +58,10 @@ def _read_block(self, block_cidr): """ key = _block_datastore_key(block_cidr) try: - result = self.etcd_client.read(key) + # Use quorum=True to ensure we don't get stale reads. Without this + # we allow many subtle race conditions, such as creating a block, + # then later reading it and finding it doesn't exist. + result = self.etcd_client.read(key, quorum=True) except EtcdKeyNotFound: raise KeyError(str(block_cidr)) block = AllocationBlock.from_etcd_result(result) @@ -98,7 +102,7 @@ def _get_affine_blocks(self, host, version, pool): "version": version} block_ids = [] try: - result = self.etcd_client.read(path).children + result = self.etcd_client.read(path, quorum=True).children for child in result: packed = child.key.split("/") if len(packed) == 9: @@ -141,7 +145,7 @@ def _new_affine_block(self, host, version, pool): _log.debug("Checking if block %s is free.", block_id) key = _block_datastore_key(block_cidr) try: - _ = self.etcd_client.read(key) + _ = self.etcd_client.read(key, quorum=True) except EtcdKeyNotFound: _log.debug("Found block %s free.", block_id) try: @@ -292,7 +296,7 @@ def _read_handle(self, handle_id): """ key = _handle_datastore_key(handle_id) try: - result = self.etcd_client.read(key) + result = self.etcd_client.read(key, quorum=True) except EtcdKeyNotFound: raise KeyError(handle_id) handle = AllocationHandle.from_etcd_result(result) @@ -455,10 +459,18 @@ def _auto_assign(self, ip_version, num, handle_id, _log.info("Ran out of affine blocks for %s in pool %s", hostname, pool) break - ips = self._auto_assign_block(block_id, - num_remaining, - handle_id, - attributes) + try: + ips = self._auto_assign_block(block_id, + num_remaining, + handle_id, + attributes) + except (KeyError, NoHostAffinityWarning): + # In certain rare race conditions, _get_affine_blocks above + # can return block_ids that don't exist or don't actually have + # affinity to this host (due to multiple IPAM clients on this + # host running simultaneously). If that happens, just move to + # the next one. + continue allocated_ips.extend(ips) num_remaining = num - len(allocated_ips) diff --git a/calico_containers/tests/unit/ipam_test.py b/calico_containers/tests/unit/ipam_test.py index 78250f4d..45297f6d 100644 --- a/calico_containers/tests/unit/ipam_test.py +++ b/calico_containers/tests/unit/ipam_test.py @@ -252,8 +252,9 @@ def m_get_affine_blocks(self, host, ip_version, pool): # Read returns appropriate result based on key. read_results = {m_resultb.key: m_resultb, m_resulth.key: m_resulth} - def read(key): + def read(key, quorum): """ Return a copy of the current stored value depending on key.""" + assert quorum return copy.copy(read_results[key]) self.m_etcd_client.read.side_effect = read @@ -304,8 +305,9 @@ def m_get_affine_blocks(self, host, ip_version, pool): m_resultb.value = block.to_json() m_resultb.key = "/calico/ipam/v2/assignment/ipv4/block/10.11.12.0-24" - def read(key): + def read(key, quorum): """ Return a copy of the current stored value depending on key.""" + assert quorum return copy.copy(m_resultb) self.m_etcd_client.read.side_effect = read self.m_etcd_client.update.side_effect = EtcdCompareFailed() @@ -464,7 +466,7 @@ def test_assign_with_handle_cas_fails(self, m_get_hostname): # Read returns appropriate result based on key. read_results = {m_resultb.key: m_resultb, m_resulth.key: m_resulth} - def read(key): + def read(key, quorum): """ Return a copy of the current stored value depending on key.""" return copy.copy(read_results[key]) self.m_etcd_client.read.side_effect = read @@ -513,7 +515,7 @@ def test_assign_persistent_cas_fails(self, m_get_hostname): block = _test_block_empty_v4() m_result0 = Mock(spec=EtcdResult) m_result0.value = block.to_json() - def read(key): + def read(key, quorum): return copy.copy(m_result0) self.m_etcd_client.read.side_effect = read @@ -941,7 +943,8 @@ def test_release_ip_by_handle_cas_error(self): read_results = {m_resulth.key: m_resulth, m_resultb4.key: m_resultb4, m_resultb6.key: m_resultb6} - def read(key): + def read(key, quorum): + assert quorum return copy.copy(read_results[key]) self.m_etcd_client.read.side_effect = read @@ -1022,7 +1025,8 @@ def test_release_ip_by_handle_no_ips(self): # Mock out read. read_results = {m_resulth.key: m_resulth, m_resultb4.key: m_resultb4} - def read(key): + def read(key, quorum): + assert quorum return read_results[key] self.m_etcd_client.read.side_effect = read @@ -1172,7 +1176,8 @@ def test_get_affine_blocks(self): expected_ids = ["192.168.3.0/26", "192.168.5.0/26"] # Return some blocks. - def m_read(path): + def m_read(path, quorum): + assert quorum assert path == "/calico/ipam/v2/host/test_host/ipv4/block/" result = Mock(spec=EtcdResult) children = [] @@ -1195,7 +1200,8 @@ def test_get_affine_blocks_empty(self): expected_ids = [] # Return some blocks. - def m_read(path): + def m_read(path, quorum): + assert quorum assert path == "/calico/ipam/v2/host/test_host/ipv4/block/" result = Mock(spec=EtcdResult) result.children = iter([]) @@ -1224,7 +1230,8 @@ def test_get_affine_blocks_pool(self): returned_ids = ["192.168.3.0/26", "10.10.1.0/26"] # Return some blocks. - def m_read(path): + def m_read(path, quorum): + assert quorum assert path == "/calico/ipam/v2/host/test_host/ipv4/block/" result = Mock(spec=EtcdResult) children = [] @@ -1269,7 +1276,7 @@ def test_claim_block_affinity_already_owned(self): self.m_etcd_client.write.assert_has_calls([call(ANY, ""), call(key, value, prevExist=False)]) - self.m_etcd_client.read.assert_called_once_with(key) + self.m_etcd_client.read.assert_called_once_with(key, quorum=True) def test_new_affine_block_race(self): """