Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward port #53412 to 2019.2: Add event_listen_queue_max_seconds to fix #53411 #53588

Merged
merged 2 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion conf/master
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
# Store all returns in the given returner.
# Setting this option requires that any returner-specific configuration also
# be set. See various returners in salt/returners for details on required
# configuration values. (See also, event_return_queue below.)
# configuration values. (See also, event_return_queue, and event_return_queue_max_seconds below.)
#
#event_return: mysql

Expand All @@ -161,6 +161,12 @@
# By default, events are not queued.
#event_return_queue: 0

# In some cases enabling event return queueing can be very helpful, but the bus
# may not busy enough to flush the queue consistently. Setting this to a reasonable
# value (1-30 seconds) will cause the queue to be flushed when the oldest event is older
# than `event_return_queue_max_seconds` regardless of how many events are in the queue.
#event_return_queue_max_seconds: 0

# Only return events matching tags in a whitelist, supports glob matches.
#event_return_whitelist:
# - salt/master/a_tag
Expand Down
10 changes: 10 additions & 0 deletions doc/topics/releases/2018.3.5.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,13 @@ In Progress: Salt 2018.3.5 Release Notes

Version 2018.3.5 is an **unreleased** bugfix release for :ref:`2018.3.0 <release-2018-3-0>`.
This release is still in progress and has not been released yet.

Master Configuration Changes
============================

To fix `#53411`_ a new configuration parameter `event_listen_queue_max_seconds` is provided.
When this is set to a value greater than 0 and `event_listen_queue` is not 0, if the oldest event
in the listen queue is older than `event_listen_queue_max_seconds`, the queue will be flushed to
returners regardless of how many events are in the queue.

.. _`#53411`: https://github.com/saltstack/salt/issues/53411
5 changes: 5 additions & 0 deletions salt/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,11 @@ def _gather_buffer_space():
# returner specified by 'event_return'
'event_return_queue': int,

# The number of seconds that events can languish in the queue before we flush them.
# The goal here is to ensure that if the bus is not busy enough to reach a total
# `event_return_queue` events won't get stale.
'event_return_queue_max_seconds': int,

# Only forward events to an event returner if it matches one of the tags in this list
'event_return_whitelist': list,

Expand Down
92 changes: 59 additions & 33 deletions salt/utils/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,8 @@ def __load_uri(self, sock_dir, node):
sock_dir,
'minion_event_{0}_pull.ipc'.format(id_hash)
)
log.debug(
'{0} PUB socket URI: {1}'.format(self.__class__.__name__, puburi)
)
log.debug(
'{0} PULL socket URI: {1}'.format(self.__class__.__name__, pulluri)
)
log.debug('%s PUB socket URI: %s', self.__class__.__name__, puburi)
log.debug('%s PULL socket URI: %s', self.__class__.__name__, pulluri)
return puburi, pulluri

def subscribe(self, tag=None, match_type=None):
Expand Down Expand Up @@ -370,9 +366,9 @@ def connect_pub(self, timeout=None):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
if self.subscriber is None:
self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
self.puburi,
io_loop=self.io_loop
)
self.puburi,
io_loop=self.io_loop
)
try:
self.io_loop.run_sync(
lambda: self.subscriber.connect(timeout=timeout))
Expand All @@ -382,9 +378,9 @@ def connect_pub(self, timeout=None):
else:
if self.subscriber is None:
self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
self.puburi,
io_loop=self.io_loop
)
self.puburi,
io_loop=self.io_loop
)

# For the asynchronous case, the connect will be defered to when
# set_event_handler() is invoked.
Expand Down Expand Up @@ -997,16 +993,10 @@ def __init__(self, opts, io_loop=None):
epub_uri = epub_sock_path
epull_uri = epull_sock_path

log.debug(
'{0} PUB socket URI: {1}'.format(
self.__class__.__name__, epub_uri
)
)
log.debug(
'{0} PULL socket URI: {1}'.format(
self.__class__.__name__, epull_uri
)
)
log.debug('%s PUB socket URI: %s',
self.__class__.__name__, epub_uri)
log.debug('%s PULL socket URI: %s',
self.__class__.__name__, epull_uri)

minion_sock_dir = self.opts['sock_dir']

Expand All @@ -1016,7 +1006,7 @@ def __init__(self, opts, io_loop=None):
try:
os.makedirs(minion_sock_dir, 0o755)
except OSError as exc:
log.error('Could not create SOCK_DIR: {0}'.format(exc))
log.error('Could not create SOCK_DIR: %s', exc)
# Let's not fail yet and try using the default path
if minion_sock_dir == default_minion_sock_dir:
# We're already trying the default system path, stop now!
Expand All @@ -1026,7 +1016,7 @@ def __init__(self, opts, io_loop=None):
try:
os.makedirs(default_minion_sock_dir, 0o755)
except OSError as exc:
log.error('Could not create SOCK_DIR: {0}'.format(exc))
log.error('Could not create SOCK_DIR: %s', exc)
# Let's stop at this stage
raise

Expand All @@ -1042,7 +1032,7 @@ def __init__(self, opts, io_loop=None):
payload_handler=self.handle_publish
)

log.info('Starting pull socket on {0}'.format(epull_uri))
log.info('Starting pull socket on %s', epull_uri)
with salt.utils.files.set_umask(0o177):
self.publisher.start()
self.puller.start()
Expand Down Expand Up @@ -1207,6 +1197,7 @@ def __init__(self, opts, **kwargs):

self.opts = opts
self.event_return_queue = self.opts['event_return_queue']
self.event_return_queue_max_seconds = self.opts.get('event_return_queue_max_seconds', 0)
local_minion_opts = self.opts.copy()
local_minion_opts['file_client'] = 'local'
self.minion = salt.minion.MasterMinion(local_minion_opts)
Expand Down Expand Up @@ -1242,13 +1233,13 @@ def flush_events(self):
if isinstance(self.opts['event_return'], list):
# Multiple event returners
for r in self.opts['event_return']:
log.debug('Calling event returner {0}, one of many.'.format(r))
log.debug('Calling event returner %s, one of many.', r)
event_return = '{0}.event_return'.format(r)
self._flush_event_single(event_return)
else:
# Only a single event returner
log.debug('Calling event returner {0}, only one '
'configured.'.format(self.opts['event_return']))
log.debug('Calling event returner %s, only one '
'configured.', self.opts['event_return'])
event_return = '{0}.event_return'.format(
self.opts['event_return']
)
Expand All @@ -1260,13 +1251,13 @@ def _flush_event_single(self, event_return):
try:
self.minion.returners[event_return](self.event_queue)
except Exception as exc:
log.error('Could not store events - returner \'{0}\' raised '
'exception: {1}'.format(event_return, exc))
log.error('Could not store events - returner \'%s\' raised '
'exception: %s', event_return, exc)
# don't waste processing power unnecessarily on converting a
# potentially huge dataset to a string
if log.level <= logging.DEBUG:
log.debug('Event data that caused an exception: {0}'.format(
self.event_queue))
log.debug('Event data that caused an exception: %s',
self.event_queue)
else:
log.error('Could not store return for event(s) - returner '
'\'%s\' not found.', event_return)
Expand All @@ -1280,17 +1271,52 @@ def run(self):
events = self.event.iter_events(full=True)
self.event.fire_event({}, 'salt/event_listen/start')
try:
# events below is a generator, we will iterate until we get the salt/event/exit tag
oldestevent = None
for event in events:

if event['tag'] == 'salt/event/exit':
# We're done eventing
self.stop = True
if self._filter(event):
# This event passed the filter, add it to the queue
self.event_queue.append(event)
if len(self.event_queue) >= self.event_return_queue:
too_long_in_queue = False

# if max_seconds is >0, then we want to make sure we flush the queue
# every event_return_queue_max_seconds seconds, If it's 0, don't
# apply any of this logic
if self.event_return_queue_max_seconds > 0:
rightnow = datetime.datetime.now()
if not oldestevent:
oldestevent = rightnow
age_in_seconds = (rightnow - oldestevent).seconds
if age_in_seconds > 0:
log.debug('Oldest event in queue is %s seconds old.', age_in_seconds)
if age_in_seconds >= self.event_return_queue_max_seconds:
too_long_in_queue = True
oldestevent = None
else:
too_long_in_queue = False

if too_long_in_queue:
log.debug('Oldest event has been in queue too long, will flush queue')

# If we are over the max queue size or the oldest item in the queue has been there too long
# then flush the queue
if len(self.event_queue) >= self.event_return_queue or too_long_in_queue:
log.debug('Flushing %s events.', len(self.event_queue))
self.flush_events()
oldestevent = None
if self.stop:
# We saw the salt/event/exit tag, we can stop eventing
break
finally: # flush all we have at this moment
# No matter what, make sure we flush the queue even when we are exiting
# and there will be no more events.
if self.event_queue:
log.debug('Flushing %s events.', len(self.event_queue))

self.flush_events()

def _filter(self, event):
Expand Down