Skip to content

Commit

Permalink
Add event_listen_queue_max_seconds to fix saltstack#53411
Browse files Browse the repository at this point in the history
  • Loading branch information
cro authored and mattp- committed Jul 10, 2019
1 parent 76d5129 commit e13096e
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 34 deletions.
8 changes: 7 additions & 1 deletion conf/master
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
# Store all returns in the given returner.
# Setting this option requires that any returner-specific configuration also
# be set. See various returners in salt/returners for details on required
# configuration values. (See also, event_return_queue below.)
# configuration values. (See also, event_return_queue, and event_return_queue_max_seconds below.)
#
#event_return: mysql

Expand All @@ -161,6 +161,12 @@
# By default, events are not queued.
#event_return_queue: 0

# In some cases enabling event return queueing can be very helpful, but the bus
# may not busy enough to flush the queue consistently. Setting this to a reasonable
# value (1-30 seconds) will cause the queue to be flushed when the oldest event is older
# than `event_return_queue_max_seconds` regardless of how many events are in the queue.
#event_return_queue_max_seconds: 0

# Only return events matching tags in a whitelist, supports glob matches.
#event_return_whitelist:
# - salt/master/a_tag
Expand Down
5 changes: 5 additions & 0 deletions salt/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,11 @@ def _gather_buffer_space():
# returner specified by 'event_return'
'event_return_queue': int,

# The number of seconds that events can languish in the queue before we flush them.
# The goal here is to ensure that if the bus is not busy enough to reach a total
# `event_return_queue` events won't get stale.
'event_return_queue_max_seconds': int,

# Only forward events to an event returner if it matches one of the tags in this list
'event_return_whitelist': list,

Expand Down
92 changes: 59 additions & 33 deletions salt/utils/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,8 @@ def __load_uri(self, sock_dir, node):
sock_dir,
'minion_event_{0}_pull.ipc'.format(id_hash)
)
log.debug(
'{0} PUB socket URI: {1}'.format(self.__class__.__name__, puburi)
)
log.debug(
'{0} PULL socket URI: {1}'.format(self.__class__.__name__, pulluri)
)
log.debug('%s PUB socket URI: %s', self.__class__.__name__, puburi)
log.debug('%s PULL socket URI: %s', self.__class__.__name__, pulluri)
return puburi, pulluri

def subscribe(self, tag=None, match_type=None):
Expand Down Expand Up @@ -372,9 +368,9 @@ def connect_pub(self, timeout=None):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
if self.subscriber is None:
self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
self.puburi,
io_loop=self.io_loop
)
self.puburi,
io_loop=self.io_loop
)
try:
self.io_loop.run_sync(
lambda: self.subscriber.connect(timeout=timeout))
Expand All @@ -384,9 +380,9 @@ def connect_pub(self, timeout=None):
else:
if self.subscriber is None:
self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
self.puburi,
io_loop=self.io_loop
)
self.puburi,
io_loop=self.io_loop
)

# For the asynchronous case, the connect will be defered to when
# set_event_handler() is invoked.
Expand Down Expand Up @@ -984,16 +980,10 @@ def __init__(self, opts, io_loop=None):
epub_uri = epub_sock_path
epull_uri = epull_sock_path

log.debug(
'{0} PUB socket URI: {1}'.format(
self.__class__.__name__, epub_uri
)
)
log.debug(
'{0} PULL socket URI: {1}'.format(
self.__class__.__name__, epull_uri
)
)
log.debug('%s PUB socket URI: %s',
self.__class__.__name__, epub_uri)
log.debug('%s PULL socket URI: %s',
self.__class__.__name__, epull_uri)

minion_sock_dir = self.opts['sock_dir']

Expand All @@ -1003,7 +993,7 @@ def __init__(self, opts, io_loop=None):
try:
os.makedirs(minion_sock_dir, 0o755)
except OSError as exc:
log.error('Could not create SOCK_DIR: {0}'.format(exc))
log.error('Could not create SOCK_DIR: %s', exc)
# Let's not fail yet and try using the default path
if minion_sock_dir == default_minion_sock_dir:
# We're already trying the default system path, stop now!
Expand All @@ -1013,7 +1003,7 @@ def __init__(self, opts, io_loop=None):
try:
os.makedirs(default_minion_sock_dir, 0o755)
except OSError as exc:
log.error('Could not create SOCK_DIR: {0}'.format(exc))
log.error('Could not create SOCK_DIR: %s', exc)
# Let's stop at this stage
raise

Expand All @@ -1030,7 +1020,7 @@ def __init__(self, opts, io_loop=None):
payload_handler=self.handle_publish
)

log.info('Starting pull socket on {0}'.format(epull_uri))
log.info('Starting pull socket on %s', epull_uri)
with salt.utils.files.set_umask(0o177):
self.publisher.start()
self.puller.start()
Expand Down Expand Up @@ -1197,6 +1187,7 @@ def __init__(self, opts, log_queue=None):
self.local_minion_opts['file_client'] = 'local'

self.event_return_queue = self.opts['event_return_queue']
self.event_return_queue_max_seconds = self.opts.get('event_return_queue_max_seconds', 0)
self.event_queue = []
self.stop = False

Expand All @@ -1222,13 +1213,13 @@ def flush_events(self):
if isinstance(self.opts['event_return'], list):
# Multiple event returners
for r in self.opts['event_return']:
log.debug('Calling event returner {0}, one of many.'.format(r))
log.debug('Calling event returner %s, one of many.', r)
event_return = '{0}.event_return'.format(r)
self._flush_event_single(event_return)
else:
# Only a single event returner
log.debug('Calling event returner {0}, only one '
'configured.'.format(self.opts['event_return']))
log.debug('Calling event returner %s, only one '
'configured.', self.opts['event_return'])
event_return = '{0}.event_return'.format(
self.opts['event_return']
)
Expand All @@ -1240,13 +1231,13 @@ def _flush_event_single(self, event_return):
try:
self.minion.returners[event_return](self.event_queue)
except Exception as exc:
log.error('Could not store events - returner \'{0}\' raised '
'exception: {1}'.format(event_return, exc))
log.error('Could not store events - returner \'%s\' raised '
'exception: %s', event_return, exc)
# don't waste processing power unnecessarily on converting a
# potentially huge dataset to a string
if log.level <= logging.DEBUG:
log.debug('Event data that caused an exception: {0}'.format(
self.event_queue))
log.debug('Event data that caused an exception: %s',
self.event_queue)
else:
log.error('Could not store return for event(s) - returner '
'\'%s\' not found.', event_return)
Expand All @@ -1266,17 +1257,52 @@ def run(self):
events = self.event.iter_events(full=True)
self.event.fire_event({}, 'salt/event_listen/start')
try:
# events below is a generator, we will iterate until we get the salt/event/exit tag
oldestevent = None
for event in events:

if event['tag'] == 'salt/event/exit':
# We're done eventing
self.stop = True
if self._filter(event):
# This event passed the filter, add it to the queue
self.event_queue.append(event)
if len(self.event_queue) >= self.event_return_queue:
too_long_in_queue = False

# if max_seconds is >0, then we want to make sure we flush the queue
# every event_return_queue_max_seconds seconds, If it's 0, don't
# apply any of this logic
if self.event_return_queue_max_seconds > 0:
rightnow = datetime.datetime.now()
if not oldestevent:
oldestevent = rightnow
age_in_seconds = (rightnow - oldestevent).seconds
if age_in_seconds > 0:
log.debug('Oldest event in queue is %s seconds old.', age_in_seconds)
if age_in_seconds >= self.event_return_queue_max_seconds:
too_long_in_queue = True
oldestevent = None
else:
too_long_in_queue = False

if too_long_in_queue:
log.debug('Oldest event has been in queue too long, will flush queue')

# If we are over the max queue size or the oldest item in the queue has been there too long
# then flush the queue
if len(self.event_queue) >= self.event_return_queue or too_long_in_queue:
log.debug('Flushing %s events.', len(self.event_queue))
self.flush_events()
oldestevent = None
if self.stop:
# We saw the salt/event/exit tag, we can stop eventing
break
finally: # flush all we have at this moment
# No matter what, make sure we flush the queue even when we are exiting
# and there will be no more events.
if self.event_queue:
log.debug('Flushing %s events.', len(self.event_queue))

self.flush_events()

def _filter(self, event):
Expand Down

0 comments on commit e13096e

Please sign in to comment.