Skip to content

Commit

Permalink
Initial addition of ContextDict
Browse files Browse the repository at this point in the history
Inspired by saltstack#23373

The basic issue we ran into is that the loader is injecting globals directly into the global namespace. This means that these injected values are not thread or coroutine safe-- meaning we can never do more than one thing at a time. Instead of multiprocessing everything to death-- we can simply use a stack_context to nicely handle this down in the core. As far as the module authors/users are concerned nothing has changed-- but the storage behind the scenes is now per-JID. This same set of classes can easily be used to store additional data (next candidates are reactors, master MWorker tasks, etc.).
  • Loading branch information
jacksontj committed Oct 23, 2015
1 parent 604b9bb commit 6838a95
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 43 deletions.
59 changes: 23 additions & 36 deletions salt/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from salt.exceptions import LoaderError
from salt.template import check_render_pipe_str
from salt.utils.decorators import Depends
from salt.utils import context
import salt.utils.context
import salt.utils.lazy
import salt.utils.event
import salt.utils.odict
Expand Down Expand Up @@ -191,13 +191,6 @@ def minion_mods(
__salt__['test.ping']()
'''
# TODO Publish documentation for module whitelisting
if context is None:
context = {}
if utils is None:
utils = {}
if proxy is None:
proxy = {}

if not whitelist:
whitelist = opts.get('whitelist_modules', None)
ret = LazyLoader(_module_dirs(opts, 'modules', 'module'),
Expand Down Expand Up @@ -293,8 +286,6 @@ def returners(opts, functions, whitelist=None, context=None):
'''
Returns the returner modules
'''
if context is None:
context = {}
return LazyLoader(_module_dirs(opts, 'returners', 'returner'),
opts,
tag='returner',
Expand All @@ -307,8 +298,6 @@ def utils(opts, whitelist=None, context=None):
'''
Returns the utility modules
'''
if context is None:
context = {}
return LazyLoader(_module_dirs(opts, 'utils', 'utils', ext_type_dirs='utils_dirs'),
opts,
tag='utils',
Expand All @@ -320,8 +309,6 @@ def pillars(opts, functions, context=None):
'''
Returns the pillars modules
'''
if context is None:
context = {}
ret = LazyLoader(_module_dirs(opts, 'pillar', 'pillar'),
opts,
tag='pillar',
Expand Down Expand Up @@ -449,8 +436,6 @@ def beacons(opts, functions, context=None):
:param dict functions: A dictionary of minion modules, with module names as
keys and funcs as values.
'''
if context is None:
context = {}
return LazyLoader(_module_dirs(opts, 'beacons', 'beacons'),
opts,
tag='beacons',
Expand Down Expand Up @@ -496,10 +481,6 @@ def ssh_wrapper(opts, functions=None, context=None):
'''
Returns the custom logging handler modules
'''
if context is None:
context = {}
if functions is None:
functions = {}
return LazyLoader(_module_dirs(opts,
'wrapper',
'wrapper',
Expand Down Expand Up @@ -888,20 +869,30 @@ def __init__(self,
virtual_enable=True,
static_modules=None
): # pylint: disable=W0231
'''
In pack, if any of the values are None they will be replaced with an
empty context-specific dict
'''

self.inject_globals = {}
self.pack = {} if pack is None else pack
if opts is None:
opts = {}
self.context_dict = salt.utils.context.ContextDict()
self.opts = self.__prep_mod_opts(opts)

self.module_dirs = module_dirs
if opts is None:
opts = {}
self.tag = tag
self.loaded_base_name = loaded_base_name or LOADED_BASE_NAME
self.mod_type_check = mod_type_check or _mod_type

self.pack = {} if pack is None else pack
if '__context__' not in self.pack:
self.pack['__context__'] = {}
self.pack['__context__'] = None

for k, v in self.pack.iteritems():
if v is None: # if the value of a pack is None, lets make an empty dict
self.context_dict.setdefault(k, {})
self.pack[k] = salt.utils.context.NamespacedDictWrapper(self.context_dict, k)

self.whitelist = whitelist
self.virtual_enable = virtual_enable
Expand Down Expand Up @@ -1079,14 +1070,13 @@ def __prep_mod_opts(self, opts):
'''
Strip out of the opts any logger instance
'''
if 'grains' in opts:
self._grains = opts['grains']
else:
self._grains = {}
if 'pillar' in opts:
self._pillar = opts['pillar']
else:
self._pillar = {}
if '__grains__' not in self.pack:
self.context_dict['grains'] = opts.get('grains', {})
self.pack['__grains__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'grains')

if '__pillar__' not in self.pack:
self.context_dict['pillar'] = opts.get('pillar', {})
self.pack['__pillar__'] = salt.utils.context.NamespacedDictWrapper(self.context_dict, 'pillar')

mod_opts = {}
for key, val in list(opts.items()):
Expand Down Expand Up @@ -1204,9 +1194,6 @@ def _load_module(self, name):
else:
mod.__opts__ = self.opts

mod.__grains__ = self._grains
mod.__pillar__ = self._pillar

# pack whatever other globals we were asked to
for p_name, p_value in six.iteritems(self.pack):
setattr(mod, p_name, p_value)
Expand Down Expand Up @@ -1528,7 +1515,7 @@ def global_injector_decorator(inject_globals):
def inner_decorator(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
with context.func_globals_inject(f, **inject_globals):
with salt.utils.context.func_globals_inject(f, **inject_globals):
return f(*args, **kwargs)
return wrapper
return inner_decorator
22 changes: 15 additions & 7 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import salt.payload
import salt.syspaths
import salt.utils
import salt.utils.context
import salt.utils.jid
import salt.pillar
import salt.utils.args
Expand Down Expand Up @@ -960,10 +961,6 @@ def _handle_decoded_payload(self, data):
self.functions, self.returners, self.function_errors, self.executors = self._load_modules()
self.schedule.functions = self.functions
self.schedule.returners = self.returners
if isinstance(data['fun'], tuple) or isinstance(data['fun'], list):
target = Minion._thread_multi_return
else:
target = Minion._thread_return
# We stash an instance references to allow for the socket
# communication in Windows. You can't pickle functions, and thus
# python needs to be able to reconstruct the reference on the other
Expand All @@ -975,20 +972,31 @@ def _handle_decoded_payload(self, data):
# running on windows
instance = None
process = multiprocessing.Process(
target=target, args=(instance, self.opts, data)
target=self._target, args=(instance, self.opts, data)
)
else:
process = threading.Thread(
target=target,
target=self._target,
args=(instance, self.opts, data),
name=data['jid']
)
process.start()
if not sys.platform.startswith('win'):
# TODO: remove the windows specific check?
if not sys.platform.startswith('win') and self.opts['multiprocessing']:
# we only want to join() immediately if we are daemonizing a process
process.join()
else:
self.win_proc.append(process)

@classmethod
def _target(cls, minion_instance, opts, data):
# TODO: clone all contexts? Should be one per loader :/
with tornado.stack_context.StackContext(minion_instance.functions.context_dict.clone):
if isinstance(data['fun'], tuple) or isinstance(data['fun'], list):
Minion._thread_multi_return(minion_instance, opts, data)
else:
Minion._thread_return(minion_instance, opts, data)

@classmethod
def _thread_return(cls, minion_instance, opts, data):
'''
Expand Down
141 changes: 141 additions & 0 deletions salt/utils/context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
'''
:codeauthor: :email:`Pedro Algarvio ([email protected])`
:codeauthor: :email:`Thomas Jackson ([email protected])`
salt.utils.context
Expand All @@ -11,8 +12,13 @@
from __future__ import absolute_import

# Import python libs
import copy
import threading
import collections
from contextlib import contextmanager

import salt.ext.six


@contextmanager
def func_globals_inject(func, **overrides):
Expand Down Expand Up @@ -49,3 +55,138 @@ def func_globals_inject(func, **overrides):
# Remove any entry injected in the function globals
for injected in injected_func_globals:
del func_globals[injected]


class ContextDict(collections.MutableMapping):
"""A context manager that saves some per-thread state globally.
Intended for use with Tornado's StackContext.
Provide arbitrary data as kwargs upon creation,
then allow any children to override the values of the parent.
"""

def __init__(self, **data):
# state should be thread local, so this object can be threadsafe
self._state = threading.local()
# variable for the overriden data
self._state.data = None
self.global_data = {}

@property
def active(self):
'''Determine if this ContextDict is currently overriden
Since the ContextDict can be overriden in each thread, we check whether
the _state.data is set or not.
'''
try:
return self._state.data is not None
except AttributeError:
return False

# TODO: rename?
def clone(self, **kwargs):
'''
Clone this context, and return the ChildContextDict
'''
child = ChildContextDict(parent=self, overrides=kwargs)
return child

def __setitem__(self, key, val):
if self.active:
self._state.data[key] = val
else:
self.global_data[key] = val

def __delitem__(self, key):
if self.active:
del self._state.data[key]
else:
del self.global_data[key]

def __getitem__(self, key):
if self.active:
return self._state.data[key]
else:
return self.global_data[key]

def __len__(self):
if self.active:
return len(self._state.data)
else:
return len(self.global_data)

def __iter__(self):
if self.active:
return iter(self._state.data)
else:
return iter(self.global_data)


class ChildContextDict(collections.MutableMapping):
'''An overrideable child of ContextDict
'''
def __init__(self, parent, overrides=None):
self.parent = parent
self._data = {} if overrides is None else overrides

# merge self.global_data into self._data
for k, v in self.parent.global_data.iteritems():
if k not in self._data:
self._data[k] = copy.deepcopy(v)

def __setitem__(self, key, val):
self._data[key] = val

def __delitem__(self, key):
del self._data[key]

def __getitem__(self, key):
return self._data[key]

def __len__(self):
return len(self._data)

def __iter__(self):
return iter(self._data)

def __enter__(self):
self.parent._state.data = self._data

def __exit__(self, *exc):
self.parent._state.data = None


class NamespacedDictWrapper(collections.MutableMapping, dict):
'''
Create a dict which wraps another dict with a specific prefix of key(s)
MUST inherit from dict to serialize through msgpack correctly
'''
def __init__(self, d, pre_keys):
self.__dict = d
if isinstance(pre_keys, salt.ext.six.string_types):
self.pre_keys = (pre_keys,)
else:
self.pre_keys = pre_keys

def _dict(self):
r = self.__dict
for k in self.pre_keys:
r = r[k]
return r

def __setitem__(self, key, val):
self._dict()[key] = val

def __delitem__(self, key):
del self._dict()[key]

def __getitem__(self, key):
return self._dict()[key]

def __len__(self):
return len(self._dict())

def __iter__(self):
return iter(self._dict())
Loading

0 comments on commit 6838a95

Please sign in to comment.