Skip to content

Commit

Permalink
Revert "Add event_listen_queue_max_seconds to fix saltstack#53411"
Browse files Browse the repository at this point in the history
This reverts commit eb3d8b8.
  • Loading branch information
cro committed Jun 14, 2019
1 parent c9da89e commit 8925efb
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 79 deletions.
8 changes: 1 addition & 7 deletions conf/master
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
# Store all returns in the given returner.
# Setting this option requires that any returner-specific configuration also
# be set. See various returners in salt/returners for details on required
# configuration values. (See also, event_return_queue, and event_return_queue_max_seconds below.)
# configuration values. (See also, event_return_queue below.)
#
#event_return: mysql

Expand All @@ -161,12 +161,6 @@
# By default, events are not queued.
#event_return_queue: 0

# In some cases enabling event return queueing can be very helpful, but the bus
# may not busy enough to flush the queue consistently. Setting this to a reasonable
# value (1-30 seconds) will cause the queue to be flushed when the oldest event is older
# than `event_return_queue_max_seconds` regardless of how many events are in the queue.
#event_return_queue_max_seconds: 0

# Only return events matching tags in a whitelist, supports glob matches.
#event_return_whitelist:
# - salt/master/a_tag
Expand Down
8 changes: 0 additions & 8 deletions doc/topics/releases/2018.3.5.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,3 @@ In Progress: Salt 2018.3.5 Release Notes

Version 2018.3.5 is an **unreleased** bugfix release for :ref:`2018.3.0 <release-2018-3-0>`.
This release is still in progress and has not been released yet.

Master Configuration Changes
============================

To fix `#53411`_ a new configuration parameter `event_listen_queue_max_seconds` is provided.
When this is set to a value greater than 0 and `event_listen_queue` is not 0, if the oldest event
in the listen queue is older than `event_listen_queue_max_seconds`, the queue will be flushed to
returners regardless of how many events are in the queue.
5 changes: 0 additions & 5 deletions salt/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,11 +546,6 @@ def _gather_buffer_space():
# returner specified by 'event_return'
'event_return_queue': int,

# The number of seconds that events can languish in the queue before we flush them.
# The goal here is to ensure that if the bus is not busy enough to reach a total
# `event_return_queue` events won't get stale.
'event_return_queue_max_seconds': int,

# Only forward events to an event returner if it matches one of the tags in this list
'event_return_whitelist': list,

Expand Down
92 changes: 33 additions & 59 deletions salt/utils/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,12 @@ def __load_uri(self, sock_dir, node):
sock_dir,
'minion_event_{0}_pull.ipc'.format(id_hash)
)
log.debug('%s PUB socket URI: %s', self.__class__.__name__, puburi)
log.debug('%s PULL socket URI: %s', self.__class__.__name__, pulluri)
log.debug(
'{0} PUB socket URI: {1}'.format(self.__class__.__name__, puburi)
)
log.debug(
'{0} PULL socket URI: {1}'.format(self.__class__.__name__, pulluri)
)
return puburi, pulluri

def subscribe(self, tag=None, match_type=None):
Expand Down Expand Up @@ -368,9 +372,9 @@ def connect_pub(self, timeout=None):
with salt.utils.asynchronous.current_ioloop(self.io_loop):
if self.subscriber is None:
self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
self.puburi,
io_loop=self.io_loop
)
self.puburi,
io_loop=self.io_loop
)
try:
self.io_loop.run_sync(
lambda: self.subscriber.connect(timeout=timeout))
Expand All @@ -380,9 +384,9 @@ def connect_pub(self, timeout=None):
else:
if self.subscriber is None:
self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
self.puburi,
io_loop=self.io_loop
)
self.puburi,
io_loop=self.io_loop
)

# For the asynchronous case, the connect will be defered to when
# set_event_handler() is invoked.
Expand Down Expand Up @@ -997,10 +1001,16 @@ def __init__(self, opts, io_loop=None):
epub_uri = epub_sock_path
epull_uri = epull_sock_path

log.debug('%s PUB socket URI: %s',
self.__class__.__name__, epub_uri)
log.debug('%s PULL socket URI: %s',
self.__class__.__name__, epull_uri)
log.debug(
'{0} PUB socket URI: {1}'.format(
self.__class__.__name__, epub_uri
)
)
log.debug(
'{0} PULL socket URI: {1}'.format(
self.__class__.__name__, epull_uri
)
)

minion_sock_dir = self.opts['sock_dir']

Expand All @@ -1010,7 +1020,7 @@ def __init__(self, opts, io_loop=None):
try:
os.makedirs(minion_sock_dir, 0o755)
except OSError as exc:
log.error('Could not create SOCK_DIR: %s', exc)
log.error('Could not create SOCK_DIR: {0}'.format(exc))
# Let's not fail yet and try using the default path
if minion_sock_dir == default_minion_sock_dir:
# We're already trying the default system path, stop now!
Expand All @@ -1020,7 +1030,7 @@ def __init__(self, opts, io_loop=None):
try:
os.makedirs(default_minion_sock_dir, 0o755)
except OSError as exc:
log.error('Could not create SOCK_DIR: %s', exc)
log.error('Could not create SOCK_DIR: {0}'.format(exc))
# Let's stop at this stage
raise

Expand All @@ -1036,7 +1046,7 @@ def __init__(self, opts, io_loop=None):
payload_handler=self.handle_publish
)

log.info('Starting pull socket on %s', epull_uri)
log.info('Starting pull socket on {0}'.format(epull_uri))
with salt.utils.files.set_umask(0o177):
self.publisher.start()
self.puller.start()
Expand Down Expand Up @@ -1198,7 +1208,6 @@ def __init__(self, opts, log_queue=None):

self.opts = opts
self.event_return_queue = self.opts['event_return_queue']
self.event_return_queue_max_seconds = self.opts.get('event_return_queue_max_seconds', 0)
local_minion_opts = self.opts.copy()
local_minion_opts['file_client'] = 'local'
self.minion = salt.minion.MasterMinion(local_minion_opts)
Expand Down Expand Up @@ -1227,13 +1236,13 @@ def flush_events(self):
if isinstance(self.opts['event_return'], list):
# Multiple event returners
for r in self.opts['event_return']:
log.debug('Calling event returner %s, one of many.', r)
log.debug('Calling event returner {0}, one of many.'.format(r))
event_return = '{0}.event_return'.format(r)
self._flush_event_single(event_return)
else:
# Only a single event returner
log.debug('Calling event returner %s, only one '
'configured.', self.opts['event_return'])
log.debug('Calling event returner {0}, only one '
'configured.'.format(self.opts['event_return']))
event_return = '{0}.event_return'.format(
self.opts['event_return']
)
Expand All @@ -1245,13 +1254,13 @@ def _flush_event_single(self, event_return):
try:
self.minion.returners[event_return](self.event_queue)
except Exception as exc:
log.error('Could not store events - returner \'%s\' raised '
'exception: %s', event_return, exc)
log.error('Could not store events - returner \'{0}\' raised '
'exception: {1}'.format(event_return, exc))
# don't waste processing power unnecessarily on converting a
# potentially huge dataset to a string
if log.level <= logging.DEBUG:
log.debug('Event data that caused an exception: %s',
self.event_queue)
log.debug('Event data that caused an exception: {0}'.format(
self.event_queue))
else:
log.error('Could not store return for event(s) - returner '
'\'%s\' not found.', event_return)
Expand All @@ -1265,52 +1274,17 @@ def run(self):
events = self.event.iter_events(full=True)
self.event.fire_event({}, 'salt/event_listen/start')
try:
# events below is a generator, we will iterate until we get the salt/event/exit tag
oldestevent = None
for event in events:

if event['tag'] == 'salt/event/exit':
# We're done eventing
self.stop = True
if self._filter(event):
# This event passed the filter, add it to the queue
self.event_queue.append(event)
too_long_in_queue = False

# if max_seconds is >0, then we want to make sure we flush the queue
# every event_return_queue_max_seconds seconds, If it's 0, don't
# apply any of this logic
if self.event_return_queue_max_seconds > 0:
rightnow = datetime.datetime.now()
if not oldestevent:
oldestevent = rightnow
age_in_seconds = (rightnow - oldestevent).seconds
if age_in_seconds > 0:
log.debug('Oldest event in queue is %s seconds old.', age_in_seconds)
if age_in_seconds >= self.event_return_queue_max_seconds:
too_long_in_queue = True
oldestevent = None
else:
too_long_in_queue = False

if too_long_in_queue:
log.debug('Oldest event has been in queue too long, will flush queue')

# If we are over the max queue size or the oldest item in the queue has been there too long
# then flush the queue
if len(self.event_queue) >= self.event_return_queue or too_long_in_queue:
log.debug('Flushing %s events.', len(self.event_queue))
if len(self.event_queue) >= self.event_return_queue:
self.flush_events()
oldestevent = None
if self.stop:
# We saw the salt/event/exit tag, we can stop eventing
break
finally: # flush all we have at this moment
# No matter what, make sure we flush the queue even when we are exiting
# and there will be no more events.
if self.event_queue:
log.debug('Flushing %s events.', len(self.event_queue))

self.flush_events()

def _filter(self, event):
Expand Down

0 comments on commit 8925efb

Please sign in to comment.