Skip to content

Commit

Permalink
Merge pull request saltstack#2 from ni/dev/iepopr/open-files-2018.3-l…
Browse files Browse the repository at this point in the history
…ocal

Fix max_open_files config on Windows
  • Loading branch information
Rares POP authored Jul 22, 2019
2 parents 8e7b796 + 5393022 commit e05c112
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 38 deletions.
2 changes: 1 addition & 1 deletion salt/client/netapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def run(self):
if not len(self.netapi):
log.error("Did not find any netapi configurations, nothing to start")

kwargs = {}
kwargs = {'_opts': self.opts}
if salt.utils.platform.is_windows():
kwargs['log_queue'] = salt.log.setup.get_multiprocessing_logging_queue()
kwargs['log_queue_level'] = salt.log.setup.get_multiprocessing_logging_level()
Expand Down
2 changes: 1 addition & 1 deletion salt/daemons/flo/maint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def maint_fork(self):
do salt raet maint fork at enter
'''
self.proc_mgr.value.add_process(Maintenance, args=(self.opts.value,))
self.proc_mgr.value.add_process(Maintenance, args=(self.opts.value,), kwargs={'_opts': self.opts.value})


class Maintenance(multiprocessing.Process):
Expand Down
8 changes: 6 additions & 2 deletions salt/daemons/flo/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ def reactor_fork(self):
'''
self.proc_mgr.value.add_process(
salt.utils.reactor.Reactor,
args=(self.opts.value,))
args=(self.opts.value,),
kwargs={'_opts': self.opts.value}
)


@ioflo.base.deeding.deedify(
Expand All @@ -37,4 +39,6 @@ def event_return_fork(self):
'''
self.proc_mgr.value.add_process(
salt.utils.event.EventReturn,
args=(self.opts.value,))
args=(self.opts.value,),
kwargs={'_opts': self.opts.value}
)
3 changes: 2 additions & 1 deletion salt/daemons/flo/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def worker_fork(self):
self.access_keys.value,
self.mkey.value,
self.aes.value
)
),
kwargs={'_opts': self.opts.value}
)


Expand Down
4 changes: 3 additions & 1 deletion salt/daemons/flo/zero.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ def zmq_ret_fork(self):
args=(
self.opts.value,
self.mkey.value,
self.aes.value))
self.aes.value),
kwargs={'_opts': self.opts.value}
)


class ZmqRet(multiprocessing.Process):
Expand Down
6 changes: 4 additions & 2 deletions salt/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def start_engines(opts, proc_mgr, proxy=None):
runners,
proxy
),
name=name
name=name,
kwargs={'_opts': opts}
)


Expand Down Expand Up @@ -94,7 +95,8 @@ def __setstate__(self, state):
state['runners'],
state['proxy'],
log_queue=state['log_queue'],
log_queue_level=state['log_queue_level']
log_queue_level=state['log_queue_level'],
_opts=state['opts']
)

def __getstate__(self):
Expand Down
47 changes: 27 additions & 20 deletions salt/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ def __setstate__(self, state):
self.__init__(
state['opts'],
log_queue=state['log_queue'],
log_queue_level=state['log_queue_level']
log_queue_level=state['log_queue_level'],
_opts=state['opts']
)

def __getstate__(self):
Expand Down Expand Up @@ -370,12 +371,12 @@ def __setstate__(self, state):
self.__init__(
state['opts'],
log_queue=state['log_queue'],
_opts=state['opts']
)

def __getstate__(self):
return {'opts': self.opts,
'log_queue': self.log_queue,
}
'log_queue': self.log_queue,}

def fill_buckets(self):
'''
Expand Down Expand Up @@ -672,13 +673,19 @@ def start(self):
pub_channels = []
log.info('Creating master publisher process')
log_queue = salt.log.setup.get_multiprocessing_logging_queue()

kwargs = {'_opts': self.opts}
if salt.utils.platform.is_windows():
kwargs['log_queue'] = salt.log.setup.get_multiprocessing_logging_queue()
kwargs['log_queue_level'] = salt.log.setup.get_multiprocessing_logging_level()

for transport, opts in iter_transport_opts(self.opts):
chan = salt.transport.server.PubServerChannel.factory(opts)
chan.pre_fork(self.process_manager, kwargs={'log_queue': log_queue})
chan.pre_fork(self.process_manager, kwargs=kwargs)
pub_channels.append(chan)

log.info('Creating master event publisher process')
self.process_manager.add_process(salt.utils.event.EventPublisher, args=(self.opts,))
self.process_manager.add_process(salt.utils.event.EventPublisher, args=(self.opts,), kwargs=kwargs)

if self.opts.get('reactor'):
if isinstance(self.opts['engines'], list):
Expand All @@ -698,11 +705,11 @@ def start(self):

# must be after channels
log.info('Creating master maintenance process')
self.process_manager.add_process(Maintenance, args=(self.opts,))
self.process_manager.add_process(Maintenance, args=(self.opts,), kwargs=kwargs)

if self.opts.get('event_return'):
log.info('Creating master event return process')
self.process_manager.add_process(salt.utils.event.EventReturn, args=(self.opts,))
self.process_manager.add_process(salt.utils.event.EventReturn, args=(self.opts,), kwargs=kwargs)

ext_procs = self.opts.get('ext_processes', [])
for proc in ext_procs:
Expand All @@ -712,45 +719,42 @@ def start(self):
cls = proc.split('.')[-1]
_tmp = __import__(mod, globals(), locals(), [cls], -1)
cls = _tmp.__getattribute__(cls)
self.process_manager.add_process(cls, args=(self.opts,))
self.process_manager.add_process(cls, args=(self.opts,), kwargs=kwargs)
except Exception:
log.error('Error creating ext_processes process: %s', proc)

if HAS_HALITE and 'halite' in self.opts:
log.info('Creating master halite process')
self.process_manager.add_process(Halite, args=(self.opts['halite'],))
self.process_manager.add_process(Halite, args=(self.opts['halite'],), kwargs=kwargs)

# TODO: remove, or at least push into the transport stuff (pre-fork probably makes sense there)
if self.opts['con_cache']:
log.info('Creating master concache process')
self.process_manager.add_process(salt.utils.master.ConnectedCache, args=(self.opts,))
self.process_manager.add_process(salt.utils.master.ConnectedCache, args=(self.opts,), kwargs=kwargs)
# workaround for issue #16315, race condition
log.debug('Sleeping for two seconds to let concache rest')
time.sleep(2)

log.info('Creating master request server process')
kwargs = {}
if salt.utils.platform.is_windows():
kwargs['log_queue'] = log_queue
kwargs['secrets'] = SMaster.secrets

self.process_manager.add_process(
ReqServer,
args=(self.opts, self.key, self.master_key),
args=(self.opts, self.key, self.master_key, SMaster.secrets),
kwargs=kwargs,
name='ReqServer')

self.process_manager.add_process(
FileserverUpdate,
args=(self.opts,))
args=(self.opts,), kwargs=kwargs)

# Fire up SSDP discovery publisher
if self.opts['discovery']:
if salt.utils.ssdp.SSDPDiscoveryServer.is_available():
self.process_manager.add_process(salt.utils.ssdp.SSDPDiscoveryServer(
port=self.opts['discovery']['port'],
listen_ip=self.opts['interface'],
answer={'mapping': self.opts['discovery'].get('mapping', {})}).run)
answer={'mapping': self.opts['discovery'].get('mapping', {})}).run,
kwargs=kwargs)
else:
log.error('Unable to load SSDP: asynchronous IO is not available.')
if sys.version_info.major == 2:
Expand Down Expand Up @@ -798,7 +802,8 @@ def __setstate__(self, state):
self.__init__(
state['hopts'],
log_queue=state['log_queue'],
log_queue_level=state['log_queue_level']
log_queue_level=state['log_queue_level'],
_opts=state['hopts']
)

def __getstate__(self):
Expand Down Expand Up @@ -850,7 +855,8 @@ def __setstate__(self, state):
state['mkey'],
secrets=state['secrets'],
log_queue=state['log_queue'],
log_queue_level=state['log_queue_level']
log_queue_level=state['log_queue_level'],
_opts=state['opts']
)

def __getstate__(self):
Expand Down Expand Up @@ -991,7 +997,8 @@ def __setstate__(self, state):
self._is_child = True
super(MWorker, self).__init__(
log_queue=state['log_queue'],
log_queue_level=state['log_queue_level']
log_queue_level=state['log_queue_level'],
_opts=state['opts']
)
self.opts = state['opts']
self.req_channels = state['req_channels']
Expand Down
9 changes: 6 additions & 3 deletions salt/transport/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def __setstate__(self, state):
state['opts'],
state['socket_queue'],
log_queue=state['log_queue'],
log_queue_level=state['log_queue_level']
log_queue_level=state['log_queue_level'],
_opts=state['opts']
)

def __getstate__(self):
Expand Down Expand Up @@ -598,8 +599,10 @@ def pre_fork(self, process_manager):
if USE_LOAD_BALANCER:
self.socket_queue = multiprocessing.Queue()
process_manager.add_process(
LoadBalancerServer, args=(self.opts, self.socket_queue)
)
LoadBalancerServer,
args=(self.opts, self.socket_queue),
kwargs= {'_opts': self.opts}
)
elif not salt.utils.platform.is_windows():
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Expand Down
3 changes: 2 additions & 1 deletion salt/transport/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin,

def __init__(self, opts):
salt.transport.server.ReqServerChannel.__init__(self, opts)
self.opts = opts
self._closing = False

def zmq_device(self):
Expand Down Expand Up @@ -598,7 +599,7 @@ def pre_fork(self, process_manager):
:param func process_manager: An instance of salt.utils.process.ProcessManager
'''
salt.transport.mixins.auth.AESReqServerMixin.pre_fork(self, process_manager)
process_manager.add_process(self.zmq_device)
process_manager.add_process(self.zmq_device, kwargs={'_opts': self.opts})

def _start_zmq_monitor(self):
'''
Expand Down
6 changes: 4 additions & 2 deletions salt/utils/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,7 +1088,8 @@ def __setstate__(self, state):
self.__init__(
state['opts'],
log_queue=state['log_queue'],
log_queue_level=state['log_queue_level']
log_queue_level=state['log_queue_level'],
_opts=state['opts']
)

def __getstate__(self):
Expand Down Expand Up @@ -1220,7 +1221,8 @@ def __setstate__(self, state):
self.__init__(
state['opts'],
log_queue=state['log_queue'],
log_queue_level=state['log_queue_level']
log_queue_level=state['log_queue_level'],
_opts=state['opts']
)

def __getstate__(self):
Expand Down
57 changes: 54 additions & 3 deletions salt/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,36 @@ def __init__(self, name=None, wait_for_kill=1):
self._sigterm_handler = signal.getsignal(signal.SIGTERM)
self._restart_processes = True


@staticmethod
def run_function(fun, opts, *args, **kwargs):
'''
Run a specified function in a new process, while providing the required setup
'''
ProcessManager._setup_process(opts)
return fun(*args, **kwargs)

@staticmethod
def _setup_process(opts):
'''
This function is supposed to set up the newly spawned process
'''
if not salt.utils.platform.is_windows():
return
if opts:
max_open_files = opts.get('max_open_files')
if max_open_files:
try:
if max_open_files > 8192:
max_open_files = 8192
log.warning('max_open_files ajusted to 8192, since that is maximum in C runtime.')
import win32file
count = win32file._setmaxstdio(max_open_files) # pylint: disable=W0212
if count != max_open_files:
log.error('Failed to set \'max_open_files\' on the process')
except ImportError:
log.error('Failed to set \'max_open_files\' on the process')

def add_process(self, tgt, args=None, kwargs=None, name=None):
'''
Create a processes and args + kwargs
Expand Down Expand Up @@ -415,10 +445,18 @@ def add_process(self, tgt, args=None, kwargs=None, name=None):
tgt.__name__,
)

_opts = kwargs.get('_opts', {})
if type(multiprocessing.Process) is type(tgt) and issubclass(tgt, multiprocessing.Process):
process = tgt(*args, **kwargs)
else:
process = multiprocessing.Process(target=tgt, args=args, kwargs=kwargs, name=name)
kwargs.pop('_opts', {})
args = (tgt, _opts) if len(args) is 0 else (tgt, _opts) + args
process = multiprocessing.Process(
target=ProcessManager.run_function,
args=args,
kwargs=kwargs,
name=name
)

if isinstance(process, SignalHandlingMultiprocessingProcess):
with default_signals(signal.SIGINT, signal.SIGTERM):
Expand All @@ -429,7 +467,8 @@ def add_process(self, tgt, args=None, kwargs=None, name=None):
self._process_map[process.pid] = {'tgt': tgt,
'args': args,
'kwargs': kwargs,
'Process': process}
'Process': process,
'_opts': _opts}
return process

def restart_process(self, pid):
Expand All @@ -447,9 +486,16 @@ def restart_process(self, pid):
# don't block, the process is already dead
self._process_map[pid]['Process'].join(1)

kwargs = self._process_map[pid]['kwargs']
_opts = self._process_map[pid]['_opts']
if _opts:
if not kwargs:
kwargs = {}
kwargs['_opts'] = _opts

self.add_process(self._process_map[pid]['tgt'],
self._process_map[pid]['args'],
self._process_map[pid]['kwargs'])
kwargs=kwargs)

del self._process_map[pid]

Expand Down Expand Up @@ -696,6 +742,8 @@ def __init__(self, *args, **kwargs):
# salt.log.setup.get_multiprocessing_logging_queue().
salt.log.setup.set_multiprocessing_logging_queue(self.log_queue)

self._opts = kwargs.pop('_opts', {})

self.log_queue_level = kwargs.pop('log_queue_level', None)
if self.log_queue_level is None:
self.log_queue_level = salt.log.setup.get_multiprocessing_logging_level()
Expand Down Expand Up @@ -750,6 +798,8 @@ def __getstate__(self):
kwargs['log_queue'] = self.log_queue
if 'log_queue_level' not in kwargs:
kwargs['log_queue_level'] = self.log_queue_level
if '_opts' not in kwargs:
kwargs['_opts'] = self._opts
# Remove the version of these in the parent process since
# they are no longer needed.
del self._args_for_getstate
Expand All @@ -762,6 +812,7 @@ def __setup_process_logging(self):

def _run(self):
try:
ProcessManager._setup_process(self._opts)
return self._original_run()
except SystemExit:
# These are handled by multiprocessing.Process._bootstrap()
Expand Down
Loading

0 comments on commit e05c112

Please sign in to comment.