Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REVIEW ONLY - Track jid in proc file for runners #5

Merged
merged 21 commits into from
Mar 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements/tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ ansible; sys.platform != 'win32' and sys.platform != 'darwin' and python_version
ansible; sys.platform != 'win32' and sys.platform != 'darwin' and python_version == '2.7'
pylxd>=2.2.5; sys.platform != 'win32' and sys.platform != 'darwin'
jmespath
yamlordereddictloader
yamlordereddictloader
python-dateutil
16 changes: 16 additions & 0 deletions salt/client/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import weakref
import traceback
import collections
import os
import copy as pycopy

# Import Salt libs
Expand All @@ -28,6 +29,8 @@
import salt.utils.state
import salt.utils.user
import salt.utils.versions
import salt.utils.files
import salt.serializers.json
import salt.transport.client
import salt.log.setup
import salt.output
Expand Down Expand Up @@ -373,6 +376,13 @@ def low(self, fun, low, print_event=True, full_return=False):
data['fun_args'] = list(args) + ([kwargs] if kwargs else [])
func_globals['__jid_event__'].fire_event(data, 'new')

# Track the job locally so we know what is running on the master
serial = salt.payload.Serial(self.opts)
jid_proc_file = os.path.join(*[self.opts['cachedir'], 'proc', jid])
data['pid'] = os.getpid()
with salt.utils.files.fopen(jid_proc_file, 'w+b') as fp_:
fp_.write(serial.dumps(data))

# Initialize a context for executing the method.
with tornado.stack_context.StackContext(self.functions.context_dict.clone):
func = self.functions[fun]
Expand All @@ -398,6 +408,12 @@ def low(self, fun, low, print_event=True, full_return=False):
data['return'] = 'Exception occurred in {client} {fun}: {tb}'.format(
client=self.client, fun=fun, tb=traceback.format_exc())
data['success'] = False
finally:
# Job has finished or issue found, so let's clean up after ourselves
try:
os.remove(jid_proc_file)
except OSError as err:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you catch ioerror below but only soerror here, why?

Copy link
Author

@austinpapp austinpapp Feb 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, im not actually reading it so there is no ioerror possible.

log.error("Error attempting to remove master job tracker: %s", err)

if self.store_job:
try:
Expand Down
37 changes: 37 additions & 0 deletions salt/daemons/masterapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import salt.utils.user
import salt.utils.verify
import salt.utils.versions
import salt.utils.master
from salt.defaults import DEFAULT_TARGET_DELIM
from salt.pillar import git_pillar

Expand Down Expand Up @@ -174,6 +175,42 @@ def clean_old_jobs(opts):
mminion.returners[fstr]()


def clean_proc_dir(opts):
'''
Clean out old tracked jobs running on the master

Generally, anything tracking a job should remove the job
once the job has finished. However, this will remove any
jobs that for some reason were not properly removed
when finished or errored.
'''
serial = salt.payload.Serial(opts)
proc_dir = os.path.join(opts['cachedir'], 'proc')
for fn_ in os.listdir(proc_dir):
proc_file = os.path.join(*[proc_dir, fn_])
data = salt.utils.master.read_proc_file(proc_file, opts)
if not data:
try:
log.warning(
"Found proc file %s without proper data. Removing from tracked proc files.",
proc_file
)
os.remove(proc_file)
except (OSError, IOError) as err:
log.error('Unable to remove proc file: %s.', err)
continue
if not salt.utils.master.is_pid_healthy(data['pid']):
try:
log.warning(
"PID %s not owned by salt or no longer running. Removing tracked proc file %s",
data['pid'],
proc_file
)
os.remove(proc_file)
except (OSError, IOError) as err:
log.error('Unable to remove proc file: %s.', err)


def mk_key(opts, user):
if HAS_PWD:
uid = None
Expand Down
1 change: 1 addition & 0 deletions salt/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def run(self):
salt.daemons.masterapi.clean_old_jobs(self.opts)
salt.daemons.masterapi.clean_expired_tokens(self.opts)
salt.daemons.masterapi.clean_pub_auth(self.opts)
salt.daemons.masterapi.clean_proc_dir(self.opts)
self.handle_git_pillar()
self.handle_schedule()
self.handle_key_cache()
Expand Down
109 changes: 54 additions & 55 deletions salt/utils/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
from salt.utils.cache import CacheCli as cache_cli
from salt.utils.process import MultiprocessingProcess

# pylint: disable=import-error
try:
import salt.utils.psutil_compat as psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False

# Import third party libs
from salt.ext import six
from salt.utils.zeromq import zmq
Expand All @@ -40,7 +47,7 @@

def get_running_jobs(opts):
'''
Return the running jobs on this minion
Return the running jobs on the master
'''

ret = []
Expand All @@ -49,84 +56,76 @@ def get_running_jobs(opts):
return ret
for fn_ in os.listdir(proc_dir):
path = os.path.join(proc_dir, fn_)
try:
data = _read_proc_file(path, opts)
if data is not None:
ret.append(data)
except (IOError, OSError):
# proc files may be removed at any time during this process by
# the master process that is executing the JID in question, so
# we must ignore ENOENT during this process
log.trace('%s removed during processing by master process', path)
data = read_proc_file(path, opts)
if not data:
continue
if not is_pid_healthy(data['pid']):
continue
ret.append(data)
return ret


def _read_proc_file(path, opts):
def read_proc_file(path, opts):
'''
Return a dict of JID metadata, or None
'''
serial = salt.payload.Serial(opts)
with salt.utils.files.fopen(path, 'rb') as fp_:
buf = fp_.read()
fp_.close()
if buf:
data = serial.loads(buf)
else:
# Proc file is empty, remove
try:
os.remove(path)
except IOError:
log.debug('Unable to remove proc file %s.', path)
try:
data = serial.load(fp_)
except Exception as err:
# need to add serial exception here
# Could not read proc file
log.warning("Issue deserializing data: %s", err)
return None

if not isinstance(data, dict):
# Invalid serial object
return None
if not salt.utils.process.os_is_running(data['pid']):
# The process is no longer running, clear out the file and
# continue
try:
os.remove(path)
except IOError:
log.debug('Unable to remove proc file %s.', path)
log.warning("Data is not a dict: %s", data)
return None

if not _check_cmdline(data):
pid = data.get('pid')
if pid:
log.warning(
'PID %s exists but does not appear to be a salt process.', pid
)
try:
os.remove(path)
except IOError:
log.debug('Unable to remove proc file %s.', path)
pid = data.get('pid', None)
if not pid:
# No pid, not a salt proc file
log.warning("No PID found in data")
return None

return data


def _check_cmdline(data):
def is_pid_healthy(pid):
'''
In some cases where there are an insane number of processes being created
on a system a PID can get recycled or assigned to a non-Salt process.
On Linux this fn checks to make sure the PID we are checking on is actually
a Salt process.
This is a health check that will confirm the PID is running
and executed by salt.

If pusutil is available:
* all architectures are checked

For non-Linux systems we punt and just return True
if psutil is not available:
* Linux/Solaris/etc: archs with `/proc/cmdline` available are checked
* AIX/Windows: assume PID is healhty and return True
'''
if not salt.utils.platform.is_linux():
return True
pid = data.get('pid')
if not pid:
return False
if not os.path.isdir('/proc'):
if HAS_PSUTIL:
try:
proc = psutil.Process(pid)
except psutil.NoSuchProcess:
log.warning("PID %s is no longer running.", pid)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is a warning necessary here. maybe its called in cases where pid being dead/finished isnt an error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea i was on the fence with that. technically this should never happen because "things" should clean up after themselves. i didnt want to generate an error bc i thought that was a bit much.

return False
return any(['salt' in cmd for cmd in proc.cmdline()])

if salt.utils.platform.is_aix() or salt.utils.platform.is_windows():
return True
path = os.path.join('/proc/{0}/cmdline'.format(pid))
if not os.path.isfile(path):

if not salt.utils.process.os_is_running(pid):
log.warning("PID %s is no longer running.", pid)
return False

cmdline_file = os.path.join('proc', str(pid), 'cmdline')
try:
with salt.utils.files.fopen(path, 'rb') as fp_:
with salt.utils.files.fopen(cmdline_file, 'rb') as fp_:
return b'salt' in fp_.read()
except (OSError, IOError):
except (OSError, IOError) as err:
log.error("There was a problem reading proc file: %s", err)
return False


Expand Down
52 changes: 52 additions & 0 deletions tests/integration/utils/test_master.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
'''
Test master code from utils
'''
from __future__ import absolute_import

import os
import time

import setproctitle # pylint: disable=W8410

import salt.config
import salt.utils.master as master

from tests.support.case import ShellTestCase
from tests.support.paths import TMP_ROOT_DIR
from tests.support.helpers import flaky

DEFAULT_CONFIG = salt.config.master_config(None)
DEFAULT_CONFIG['cachedir'] = os.path.join(TMP_ROOT_DIR, 'cache')


class MasterUtilJobsTestCase(ShellTestCase):

def setUp(self):
'''
Necessary so that the master pid health check
passes as it looks for salt in cmdline
'''
setproctitle.setproctitle('salt')

@flaky
def test_get_running_jobs(self):
'''
Test get running jobs
'''
ret = self.run_run_plus("test.sleep", '90', asynchronous=True)
jid = ret['jid']

# Ran into a problem where the async jump was not seen until
# after the test had finished. This caused the test to fail
# because no job was present (not proc file). This attempts
# to wait a total of 20s before giving up.
attempt = 0
while attempt < 10:
jobs = master.get_running_jobs(DEFAULT_CONFIG)
if jobs:
jids = [job['jid'] for job in jobs]
assert jids.count(jid) == 1
break
time.sleep(2)
attempt += attempt + 1
10 changes: 10 additions & 0 deletions tests/runtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@
'logging':
{'display_name': 'Logging',
'path': 'integration/logging'},
'utils':
{'display_name': 'Utils',
'path': 'integration/utils'},
}

TEST_SUITES = collections.OrderedDict(sorted(TEST_SUITES_UNORDERED.items(),
Expand Down Expand Up @@ -527,6 +530,13 @@ def setup_additional_options(self):
default=False,
help='Run logging integration tests'
)
self.test_selection_group.add_option(
'--utils',
dest='utils',
action='store_true',
default=False,
help='Run utils integration tests'
)

def validate_options(self):
if self.options.cloud_provider or self.options.external_api:
Expand Down
9 changes: 9 additions & 0 deletions tests/support/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ def run_run_plus(self, fun, *arg, **kwargs):
self.get_config_file_path('master')
)

if 'asynchronous' in kwargs:
opts['async'] = True
kwargs.pop('asynchronous')

opts_arg = list(arg)
if kwargs:
opts_arg.append({'__kwarg__': True})
Expand Down Expand Up @@ -517,6 +521,11 @@ def run_run_plus(self, fun, *arg, **kwargs):
opts = {}
opts.update(self.get_config('client_config', from_scratch=from_scratch))
opts_arg = list(arg)

if 'asynchronous' in kwargs:
opts['async'] = True
kwargs.pop('asynchronous')

if kwargs:
opts_arg.append({'__kwarg__': True})
opts_arg[-1].update(kwargs)
Expand Down
Loading