diff --git a/calico/datamodel_v1.py b/calico/datamodel_v1.py index 77c51499a4..e9b40720e5 100644 --- a/calico/datamodel_v1.py +++ b/calico/datamodel_v1.py @@ -190,6 +190,12 @@ def path_for_status(self): def __str__(self): return self.__class__.__name__ + ("<%s>" % self.endpoint) + def __repr__(self): + return self.__class__.__name__ + ("(%r,%r,%r,%r)" % (self.host, + self.orchestrator, + self.workload, + self.endpoint)) + def __eq__(self, other): if other is self: return True diff --git a/calico/etcddriver/driver.py b/calico/etcddriver/driver.py index 097e100d84..f303fcf033 100644 --- a/calico/etcddriver/driver.py +++ b/calico/etcddriver/driver.py @@ -61,6 +61,18 @@ FLUSH_THRESHOLD = 200 +# TODO: trigger immediate resync if these are deleted? +# RESYNC_KEYS = [ +# VERSION_DIR, +# POLICY_DIR, +# PROFILE_DIR, +# CONFIG_DIR, +# HOST_DIR, +# IPAM_DIR, +# IPAM_V4_DIR, +# POOL_V4_DIR, +# ] + class EtcdDriver(object): def __init__(self, felix_sck): diff --git a/calico/felix/fetcd.py b/calico/felix/fetcd.py index f7a858be90..ed4145c6d8 100644 --- a/calico/felix/fetcd.py +++ b/calico/felix/fetcd.py @@ -27,7 +27,6 @@ import socket import subprocess import msgpack -import time from calico.etcddriver.protocol import * from calico.monotonic import monotonic_time @@ -41,15 +40,15 @@ from calico.datamodel_v1 import (VERSION_DIR, CONFIG_DIR, RULES_KEY_RE, TAGS_KEY_RE, dir_for_per_host_config, - PROFILE_DIR, HOST_DIR, EndpointId, POLICY_DIR, + PROFILE_DIR, HOST_DIR, EndpointId, HOST_IP_KEY_RE, IPAM_V4_CIDR_KEY_RE, key_for_last_status, key_for_status, FELIX_STATUS_DIR, get_endpoint_id_from_key, dir_for_felix_status, ENDPOINT_STATUS_ERROR, ENDPOINT_STATUS_DOWN, ENDPOINT_STATUS_UP) from calico.etcdutils import ( - EtcdClientOwner, EtcdWatcher, ResyncRequired, - delete_empty_parents) + EtcdClientOwner, delete_empty_parents, PathDispatcher +) from calico.felix.actor import Actor, actor_message from calico.felix.futils import (intern_dict, intern_list, logging_exceptions, iso_utc_timestamp, IPV4, IPV6) @@ -79,17 +78,6 @@ POOL_V4_DIR = IPAM_V4_DIR + "/pool" CIDR_V4_KEY = POOL_V4_DIR + "/" -RESYNC_KEYS = [ - VERSION_DIR, - POLICY_DIR, - PROFILE_DIR, - CONFIG_DIR, - HOST_DIR, - IPAM_DIR, - IPAM_V4_DIR, - POOL_V4_DIR, -] - # Max number of events from driver process before we yield to another greenlet. MAX_EVENTS_BEFORE_YIELD = 200 @@ -268,26 +256,17 @@ def _on_worker_died(self, watch_greenlet): sys.exit(1) -class _FelixEtcdWatcher(EtcdWatcher, gevent.Greenlet): +class _FelixEtcdWatcher(gevent.Greenlet): """ - Greenlet that watches the etcd data model for changes. - - (1) Waits for the load_config event to be triggered. - (2) Connects to etcd and waits for the Ready flag to be set, - indicating the data model is consistent. - (3) Loads the config from etcd and passes it to the config object. - (4) Waits for the begin_polling Event to be triggered. - (5) Loads a complete snapshot from etcd and passes it to the - UpdateSplitter. - (6) Watches etcd for changes, sending them incrementally to the - UpdateSplitter. - (On etcd error) starts again from step (5) - - This greenlet is expected to be managed by the EtcdAPI Actor. + Greenlet that communicates with the etcd driver over a socket. + + * Handles initial configuration of the driver. + * Processes the initial config responses. + * Then fans out the stream of updates. """ def __init__(self, config, etcd_api, status_reporter, hosts_ipset): - super(_FelixEtcdWatcher, self).__init__(config.ETCD_ADDR, VERSION_DIR) + super(_FelixEtcdWatcher, self).__init__() self._config = config self._etcd_api = etcd_api self._status_reporter = status_reporter @@ -316,6 +295,12 @@ def __init__(self, config, etcd_api, status_reporter, hosts_ipset): # Next-hop IP addresses of our hosts, if populated in etcd. self.ipv4_by_hostname = {} + # Forces a resync after the current poll if set. Safe to set from + # another thread. Automatically reset to False after the resync is + # triggered. + self.resync_after_current_poll = False # FIXME Periodic resync + self.dispatcher = PathDispatcher() + # Register for events when values change. self._register_paths() @@ -330,11 +315,7 @@ def _register_paths(self): deletion, we have to handle deletes for lots of directories that we otherwise wouldn't care about. """ - reg = self.register_path - # Top-level directories etc. If these go away, stop polling and - # resync. - for key in RESYNC_KEYS: - reg(key, on_del=self._resync) + reg = self.dispatcher.register # Profiles and their contents. reg(TAGS_KEY, on_set=self.on_tags_set, on_del=self.on_tags_delete) reg(RULES_KEY, on_set=self.on_rules_set, on_del=self.on_rules_delete) @@ -361,63 +342,45 @@ def _register_paths(self): @logging_exceptions def _run(self): - """ - Greenlet main loop: loads the initial dump from etcd and then - monitors for changes and feeds them to the splitter. - """ + _log.info("Waiting for load_config event...") self.load_config.wait() - self.loop() - - @logging_exceptions - def loop(self): - _log.info("Started %s loop", self) - while not self._stopped: - try: - _log.info("Reconnecting and loading snapshot from etcd...") - self.reconnect(copy_cluster_id=False) - - driver_sck = self.start_driver() - unpacker = msgpack.Unpacker() - msgs_processed = 0 - while True: - data = driver_sck.recv(16384) - unpacker.feed(data) - for msg in unpacker: - # Optimization: put update first in the "switch" - # block because it's on the critical path. - msg_type = msg[MSG_KEY_TYPE] - if msg_type == MSG_TYPE_UPDATE: - self.begin_polling.wait() - self._on_update_from_driver(msg) - elif msg_type == MSG_TYPE_CONFIG_LOADED: - self._on_config_loaded_from_driver(msg, driver_sck) - elif msg_type == MSG_TYPE_STATUS: - self._on_status_from_driver(msg) - else: - raise RuntimeError("Unexpected message %s" % msg) - msgs_processed += 1 - if msgs_processed % MAX_EVENTS_BEFORE_YIELD == 0: - # Yield to ensure that other actors make progress. - # Sleep must be non-zero to work around gevent - # issue where we could be immediately rescheduled. - gevent.sleep(0.000001) - - except EtcdException as e: - # Most likely a timeout or other error in the pre-resync; - # start over. These exceptions have good semantic error text - # so the stack trace would just add log spam. - _log.error("Unexpected IO or etcd error, triggering " - "resync with etcd: %r.", e) - time.sleep(1) # Prevent tight loop due to unexpected error. - except: - _log.exception("Exception reading from socket?") - raise + _log.info("...load_config set. Starting driver read %s loop", self) + driver_sck = self.start_driver() + unpacker = msgpack.Unpacker() + msgs_processed = 0 + while True: + data = driver_sck.recv(16384) + unpacker.feed(data) + for msg in unpacker: + # Optimization: put update first in the "switch" + # block because it's on the critical path. + msg_type = msg[MSG_KEY_TYPE] + if msg_type == MSG_TYPE_UPDATE: + self.begin_polling.wait() + self._on_update_from_driver(msg) + elif msg_type == MSG_TYPE_CONFIG_LOADED: + self._on_config_loaded_from_driver(msg, driver_sck) + elif msg_type == MSG_TYPE_STATUS: + self._on_status_from_driver(msg) + else: + raise RuntimeError("Unexpected message %s" % msg) + msgs_processed += 1 + if msgs_processed % MAX_EVENTS_BEFORE_YIELD == 0: + # Yield to ensure that other actors make progress. + # Sleep must be non-zero to work around gevent + # issue where we could be immediately rescheduled. + gevent.sleep(0.000001) _log.info("%s.loop() stopped due to self.stop == True", self) def _on_update_from_driver(self, msg): - assert self.configured.is_set() + """ + Called when the driver sends us a key/value pair update. + :param dict msg: The message recived from the driver. + """ + assert self.configured.is_set(), "Received update before config" key = msg[MSG_KEY_KEY] value = msg[MSG_KEY_VALUE] + _log.debug("Update from driver: %s -> %s", key, value) self.read_count += 1 if self.read_count % 1000 == 0: now = monotonic_time() @@ -425,16 +388,24 @@ def _on_update_from_driver(self, msg): _log.info("Processed %s updates from driver " "%.1f/s", self.read_count, 1000.0 / delta) self.last_rate_log_time = now + # Create a fake etcd node object. + # FIXME: avoid creating fake node. n = Node() n.action = "set" if value is not None else "delete" n.value = value n.key = key - try: - self.dispatcher.handle_event(n) - except ResyncRequired: - _log.warning("IGNORING RESYNC.") + # And dispatch it. + self.dispatcher.handle_event(n) def _on_config_loaded_from_driver(self, msg, driver_sck): + """ + Called when we receive a config loaded message from the driver. + + Responds to the driver immediately with a config response. + + If the config has changed since a previous call, triggers Felix + to die. + """ global_config = msg[MSG_KEY_GLOBAL_CONFIG] host_config = msg[MSG_KEY_HOST_CONFIG] _log.info("Config loaded by driver:\n" @@ -467,7 +438,7 @@ def _on_config_loaded_from_driver(self, msg, driver_sck): # Config now fully resolved, inform the driver. felix_log_file = self._config.LOGFILE if felix_log_file: - # FIXME PRoper config for driver logfile + # FIXME Proper config for driver logfile driver_log_file = felix_log_file + "-driver" else: driver_log_file = None @@ -481,16 +452,30 @@ def _on_config_loaded_from_driver(self, msg, driver_sck): self.configured.set() def _on_status_from_driver(self, msg): + """ + Called when we receive a status update from the driver. + + If the status is in-sync, triggers the relevant processing. + :param msg: + :return: + """ status = msg[MSG_KEY_STATUS] _log.info("etcd driver status changed to %s", status) if status == STATUS_IN_SYNC: + # We're now in sync, tell the splitter so that we can complete + # our start-of day cleanup etc. self.begin_polling.wait() # Make sure splitter is set. self._been_in_sync = True self.splitter.on_datamodel_in_sync(async=True) + self._status_reporter.clean_up_endpoint_statuses(async=True) self._update_hosts_ipset() - self.clean_up_endpoint_statuses() def start_driver(self): + """ + Starts the driver subprocess, connects to it over the socket + and sends it the init message. + :return: the connected socket to the driver. + """ _log.info("Creating server socket.") try: os.unlink("/run/felix-driver.sck") @@ -522,50 +507,6 @@ def start_driver(self): return update_conn - def clean_up_endpoint_statuses(self): - """ - Mark any endpoint status reports for non-existent endpoints - for cleanup. - """ - if not self._config.REPORT_ENDPOINT_STATUS: - _log.debug("Endpoint status reporting disabled, ignoring.") - return - - our_host_dir = "/".join([FELIX_STATUS_DIR, self._config.HOSTNAME, - "workload"]) - try: - # Grab all the existing status reports. - response = self.client.read(our_host_dir, - recursive=True) - except EtcdKeyNotFound: - _log.info("No endpoint statuses found, nothing to clean up") - else: - # Mark all statuses we find as dirty. This will result in any - # unknown endpoints being cleaned up. - for node in response.leaves: - combined_id = get_endpoint_id_from_key(node.key) - if combined_id: - _log.debug("Endpoint %s removed by resync, marking " - "status key for cleanup", - combined_id) - self._status_reporter.mark_endpoint_dirty(combined_id, - async=True) - elif node.dir: - # This leaf is an empty directory, try to clean it up. - # This is safe even if another thread is adding keys back - # into the directory. - _log.debug("Found empty directory %s, cleaning up", - node.key) - delete_empty_parents(self.client, node.key, our_host_dir) - - def _resync(self, response, **kwargs): - """ - Force a resync. - :raises ResyncRequired: always. - """ - _log.warning("Resync triggered due to change to %s", response.key) - raise ResyncRequired() - def on_endpoint_set(self, response, hostname, orchestrator, workload_id, endpoint_id): """Handler for endpoint updates, passes the update to the splitter.""" @@ -680,6 +621,7 @@ def __init__(self, config): self._newer_dirty_endpoints = set() self._older_dirty_endpoints = set() + self._cleanup_pending = False self._timer_scheduled = False self._reporting_allowed = True @@ -724,6 +666,14 @@ def _mark_endpoint_dirty(self, endpoint_id): _log.debug("Marking endpoint %s dirty", endpoint_id) self._newer_dirty_endpoints.add(endpoint_id) + @actor_message() + def clean_up_endpoint_statuses(self): + """ + Note that we need to do cleanup. We'll then try/retry from + _finish_msg_batch(). + """ + self._cleanup_pending = True + def _finish_msg_batch(self, batch, results): if not self._config.REPORT_ENDPOINT_STATUS: _log.error("StatusReporter called even though status reporting " @@ -733,6 +683,15 @@ def _finish_msg_batch(self, batch, results): self._newer_dirty_endpoints.clear() self._older_dirty_endpoints.clear() return + + if self._cleanup_pending: + try: + self._attempt_cleanup() + except EtcdException as e: + _log.error("Cleanup failed: %r", e) + else: + self._cleanup_pending = False + if self._reporting_allowed: # We're not rate limited, go ahead and do a write to etcd. _log.debug("Status reporting is allowed by rate limit.") @@ -758,8 +717,9 @@ def _finish_msg_batch(self, batch, results): # Reset the rate limit flag. self._reporting_allowed = False - if not self._timer_scheduled and not self._reporting_allowed: - # Schedule a timer to stop our rate limiting. + if not self._timer_scheduled and ((not self._reporting_allowed) or + self._cleanup_pending): + # Schedule a timer to stop our rate limiting or retry cleanup. timeout = self._config.ENDPOINT_REPORT_DELAY timeout *= 0.9 + (random.random() * 0.2) # Jitter by +/- 10%. gevent.spawn_later(timeout, @@ -767,6 +727,33 @@ def _finish_msg_batch(self, batch, results): async=True) self._timer_scheduled = True + def _attempt_cleanup(self): + our_host_dir = "/".join([FELIX_STATUS_DIR, self._config.HOSTNAME, + "workload"]) + try: + # Grab all the existing status reports. + response = self.client.read(our_host_dir, + recursive=True) + except EtcdKeyNotFound: + _log.info("No endpoint statuses found, nothing to clean up") + else: + # Mark all statuses we find as dirty. This will result in any + # unknown endpoints being cleaned up. + for node in response.leaves: + combined_id = get_endpoint_id_from_key(node.key) + if combined_id: + _log.debug("Endpoint %s removed by resync, marking " + "status key for cleanup", + combined_id) + self._mark_endpoint_dirty(combined_id) + elif node.dir: + # This leaf is an empty directory, try to clean it up. + # This is safe even if another thread is adding keys back + # into the directory. + _log.debug("Found empty directory %s, cleaning up", + node.key) + delete_empty_parents(self.client, node.key, our_host_dir) + def _write_endpoint_status_to_etcd(self, ep_id, status): """ Try to actually write the status dict into etcd or delete the key diff --git a/calico/felix/test/test_fetcd.py b/calico/felix/test/test_fetcd.py index fbb1d5015f..225208f56a 100644 --- a/calico/felix/test/test_fetcd.py +++ b/calico/felix/test/test_fetcd.py @@ -25,7 +25,7 @@ from calico.felix.config import Config from calico.felix.futils import IPV4, IPV6 from calico.felix.ipsets import IpsetActor -from calico.felix.fetcd import (_FelixEtcdWatcher, ResyncRequired, EtcdAPI, +from calico.felix.fetcd import (_FelixEtcdWatcher, EtcdAPI, die_and_restart, EtcdStatusReporter, combine_statuses) from calico.felix.splitter import UpdateSplitter from calico.felix.test.base import BaseTestCase, JSONString @@ -151,13 +151,6 @@ def setUp(self): self.client = Mock() self.watcher.client = self.client - def test_resync_flag(self): - self.watcher.resync_after_current_poll = True - self.watcher.next_etcd_index = 1 - self.assertRaises(ResyncRequired, - self.watcher.wait_for_etcd_event) - self.assertFalse(self.watcher.resync_after_current_poll) - def test_endpoint_set(self): self.dispatch("/calico/v1/host/h1/workload/o1/w1/endpoint/e1", "set", value=ENDPOINT_STR) @@ -328,48 +321,6 @@ def dispatch(self, key, action, value=None): m_response.value = value self.watcher.dispatcher.handle_event(m_response) - def test_clean_up_endpoint_status(self): - self.m_config.REPORT_ENDPOINT_STATUS = True - ep_id = EndpointId("hostname", - "openstack", - "workloadid", - "endpointid") - - empty_dir = Mock() - empty_dir.key = ("/calico/felix/v1/host/hostname/workload/" - "openstack/foobar") - empty_dir.dir = True - - missing_ep = Mock() - missing_ep.key = ("/calico/felix/v1/host/hostname/workload/" - "openstack/aworkload/endpoint/anendpoint") - - self.client.read.return_value.leaves = [ - empty_dir, - missing_ep, - ] - self.watcher.clean_up_endpoint_statuses() - - # Missing endpoint should have been marked for cleanup. - self.m_status_rep.mark_endpoint_dirty.assert_called_once_with( - EndpointId("hostname", - "openstack", - "aworkload", - "anendpoint"), - async=True - ) - - def test_clean_up_endpoint_status_not_found(self): - self.m_config.REPORT_ENDPOINT_STATUS = True - self.client.read.side_effect = etcd.EtcdKeyNotFound() - self.watcher.clean_up_endpoint_statuses() - self.assertFalse(self.m_status_rep.mark_endpoint_dirty.called) - - def test_clean_up_endpoint_status_disabled(self): - self.m_config.REPORT_ENDPOINT_STATUS = False - self.client.read.side_effect = self.failureException - self.watcher.clean_up_endpoint_statuses() - class TestEtcdReporting(BaseTestCase): def setUp(self): @@ -635,3 +586,50 @@ def assert_combined_status(self, a, b, expected): self.assertEqual(result, expected, "Expected %r and %r to combine to %s but got %r" % (lhs, rhs, expected, result)) + + def test_clean_up_endpoint_status(self): + self.m_config.REPORT_ENDPOINT_STATUS = True + ep_id = EndpointId("foo", + "openstack", + "workloadid", + "endpointid") + + empty_dir = Mock() + empty_dir.key = ("/calico/felix/v1/host/foo/workload/" + "openstack/foobar") + empty_dir.dir = True + + missing_ep = Mock() + missing_ep.key = ("/calico/felix/v1/host/foo/workload/" + "openstack/aworkload/endpoint/anendpoint") + + self.m_client.read.return_value.leaves = [ + empty_dir, + missing_ep, + ] + with patch.object(self.rep, "_mark_endpoint_dirty") as m_mark: + self.rep.clean_up_endpoint_statuses(async=True) + self.step_actor(self.rep) + + # Missing endpoint should have been marked for cleanup. + m_mark.assert_called_once_with( + EndpointId("foo", + "openstack", + "aworkload", + "anendpoint") + ) + + def test_clean_up_endpoint_status_not_found(self): + self.m_config.REPORT_ENDPOINT_STATUS = True + self.m_client.read.side_effect = etcd.EtcdKeyNotFound() + with patch.object(self.rep, "_mark_endpoint_dirty") as m_mark: + self.rep.clean_up_endpoint_statuses(async=True) + self.step_actor(self.rep) + self.assertFalse(m_mark.called) + + def test_clean_up_endpoint_status_disabled(self): + self.m_config.REPORT_ENDPOINT_STATUS = False + self.m_client.read.side_effect = self.failureException + self.rep.clean_up_endpoint_statuses(async=True) + self.step_actor(self.rep) +