Skip to content

Commit

Permalink
Block ProfileRules startup until we're in sync.
Browse files Browse the repository at this point in the history
* Override _maybe_start() to block startup until we're in sync.
* Retry startup after in-sync message.
* Change IpsetManager to use consistent approach.
  • Loading branch information
Shaun Crampton committed Oct 23, 2015
1 parent d0d6f28 commit 215fd2a
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 29 deletions.
50 changes: 27 additions & 23 deletions calico/felix/ipsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,21 @@ def _create(self, tag_id):
self.ip_type)
return active_ipset

def _maybe_start(self, obj_id):
if self._datamodel_in_sync:
_log.debug("Datamodel is in-sync, deferring to superclass.")
return super(IpsetManager, self)._maybe_start(obj_id)
else:
_log.info("Delaying startup of tag %s because datamodel is"
"not in sync.", obj_id)

def _on_object_started(self, tag_id, active_ipset):
_log.debug("TagIpset actor for %s started", tag_id)
# We defer the update in order to delay updates until we're in-sync
# with the datamodel.
self._dirty_tags.add(tag_id)
# Fill the ipset in with its members, this will trigger its first
# programming, after which it will call us back to tell us it is ready.
# We can't use self._dirty_tags to defer this in case the set becomes
# unreferenced before _finish_msg_batch() is called.
self._update_active_ipset(tag_id)

def _update_active_ipset(self, tag_id):
"""
Expand Down Expand Up @@ -122,6 +132,7 @@ def on_datamodel_in_sync(self):
if not self._datamodel_in_sync:
_log.info("Datamodel now in sync, uncorking updates to TagIpsets")
self._datamodel_in_sync = True
self._maybe_start_all()

@actor_message()
def cleanup(self):
Expand Down Expand Up @@ -415,10 +426,8 @@ def _finish_msg_batch(self, batch, results):
operation. It also avoid wasted effort if tags are flapping.
"""
super(IpsetManager, self)._finish_msg_batch(batch, results)
if self._datamodel_in_sync:
_log.debug("Datamodel in sync, updating active TagIpsets.")
self._update_dirty_active_ipsets()
self._force_reprogram = False
self._update_dirty_active_ipsets()
self._force_reprogram = False


class EndpointData(object):
Expand Down Expand Up @@ -491,7 +500,7 @@ def __init__(self, ipset, qualifier=None):

self._ipset = ipset
# Members - which entries should be in the ipset. None means
# "unknown". The first update to this field triggers programming.
# "unknown", but this is updated immediately on actor startup.
self.members = None
# Members which really are in the ipset; again None means "unknown".
self.programmed_members = None
Expand Down Expand Up @@ -531,7 +540,7 @@ def replace_members(self, members, force_reprogram=False):

def _finish_msg_batch(self, batch, results):
_log.debug("IpsetActor._finish_msg_batch() called")
if not self.stopped and self.members is not None:
if not self.stopped:
self._sync_to_ipset()

def _sync_to_ipset(self):
Expand Down Expand Up @@ -581,24 +590,19 @@ def on_unreferenced(self):
# Mark the object as stopped so that we don't accidentally recreate
# the ipset in _finish_msg_batch.
self.stopped = True
try:
self._ipset.delete()
finally:
self._notify_cleanup_complete()

def _finish_msg_batch(self, batch, results):
_log.debug("_finish_msg_batch on TagIpset")
super(TagIpset, self)._finish_msg_batch(batch, results)
if self.programmed_members is not None:
# We've managed to program the set.
if self.stopped:
# Only clean up if we ever programmed the ipset.
self._ipset.delete()
elif not self.notified_ready:
# Notify that the set is now available for use.
_log.debug("TagIpset _finish_msg_batch notifying ready")
self.notified_ready = True
self._notify_ready()
if self.stopped:
_log.debug("%s stopped, notifying cleanup complete.", self)
self._notify_cleanup_complete()

if not self.notified_ready:
# We have created the set, so we are now ready.
_log.debug("TagIpset _finish_msg_batch notifying ready")
self.notified_ready = True
self._notify_ready()


class Ipset(object):
Expand Down
25 changes: 23 additions & 2 deletions calico/felix/profilerules.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(self, ip_version, iptables_updater, ipset_manager):
self.iptables_updater = iptables_updater
self.ipset_manager = ipset_manager
self.rules_by_profile_id = {}
self._datamodel_in_sync = False

def _create(self, profile_id):
return ProfileRules(profile_id,
Expand All @@ -57,9 +58,23 @@ def _on_object_started(self, profile_id, active_profile):
profile_or_none)
active_profile.on_profile_update(profile_or_none, async=True)

def _maybe_start(self, obj_id, in_sync=False):
in_sync |= self._datamodel_in_sync
if in_sync or obj_id in self.rules_by_profile_id:
_log.debug("Profile %s is in-sync, deferring to superclass.",
obj_id)
return super(RulesManager, self)._maybe_start(obj_id)
else:
_log.info("Delaying startup of profile %s because datamodel is"
"not in sync.", obj_id)

@actor_message()
def on_datamodel_in_sync(self):
_log.error("NOT IMPLEMENTED: RulesManager.on_datamodel_in_sync()")
if not self._datamodel_in_sync:
_log.error("%s: datamodel now in sync, unblocking profile startup",
self)
self._datamodel_in_sync = True
self._maybe_start_all()

@actor_message()
def on_rules_update(self, profile_id, profile, force_reprogram=False):
Expand All @@ -75,6 +90,12 @@ def on_rules_update(self, profile_id, profile, force_reprogram=False):
ap = self.objects_by_id[profile_id]
ap.on_profile_update(profile, force_reprogram=force_reprogram,
async=True)
elif profile_id in self.objects_by_id:
_log.debug("Checking if the update allows us to start profile %s",
profile_id)
# Pass in_sync=True because we now explicitly know this profile is
# in sync, even if this is a deletion.
self._maybe_start(profile_id, in_sync=True)


class ProfileRules(RefCountedActor):
Expand Down Expand Up @@ -159,7 +180,7 @@ def _finish_msg_batch(self, batch, results):
_log.info("%s unreferenced, removing our chains", self)
self._delete_chains()
self._ipset_refs.discard_all()
self._ipset_refs = None # Break ref cycle.
self._ipset_refs = None # Break ref cycle.
self._profile = None
self._pending_profile = None
finally:
Expand Down
21 changes: 17 additions & 4 deletions calico/felix/refcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,24 @@ def on_object_cleanup_complete(self, object_id, obj):
# May have unblocked start of new object...
self._maybe_start(object_id)

def _maybe_start_all(self):
_log.debug("Checking all objects to see if they can be started")
for obj_id in self.objects_by_id:
self._maybe_start(obj_id)

def _maybe_start(self, obj_id):
"""
Starts the actor with the given ID if it is present and there
are no pending cleanups for that ID.
Subclasses may override this method to place additional
pre-requisites on starting the object. They should call
this implementation if they are happy for the start to
proceed.
If the subclass chooses to block startup, it must later call
this method (or the convenience method _maybe_start_all())
when it wants to allow startup to proceed.
"""
obj = self.objects_by_id.get(obj_id)
if (obj and
Expand Down Expand Up @@ -196,9 +210,8 @@ def _create(self, object_id):
raise NotImplementedError() # pragma nocover

def _is_starting_or_live(self, obj_id):
return (obj_id in self.objects_by_id
and self.objects_by_id[obj_id].ref_mgmt_state in
(STARTING, LIVE))
return (obj_id in self.objects_by_id and
self.objects_by_id[obj_id].ref_mgmt_state in (STARTING, LIVE))


class RefHelper(object):
Expand Down Expand Up @@ -366,4 +379,4 @@ def _notify_cleanup_complete(self):
is complete. Notifies the manager.
"""
_log.debug("Notifying manager that %s is done cleaning up", self)
self._manager.on_object_cleanup_complete(self._id, self, async=True)
self._manager.on_object_cleanup_complete(self._id, self, async=True)

0 comments on commit 215fd2a

Please sign in to comment.