Skip to content

Commit

Permalink
storage/callback: asyncio implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
3hhh committed Jul 16, 2020
1 parent 170e5f5 commit 889c923
Showing 1 changed file with 144 additions and 56 deletions.
200 changes: 144 additions & 56 deletions qubes/storage/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,17 @@
import logging
import subprocess
import json
import asyncio
import threading
from shlex import quote
from qubes.utils import coro_maybe

import qubes.storage

class UnhandledSignalException(qubes.storage.StoragePoolException):
def __init__(self, pool, signal):
super().__init__('The pool %s failed to handle the signal %s, likely because it was run from synchronous code.' % (pool.name, signal))

class CallbackPool(qubes.storage.Pool):
''' Proxy storage pool driver adding callback functionality to other pool drivers.
Expand All @@ -37,6 +44,12 @@ class CallbackPool(qubes.storage.Pool):
- custom pool mounts
- encryption
- debugging
- run synchronous pool drivers asynchronously
A word of caution:
This implementation runs all methods that `qubes.storage.Pool` allows to be asynchronous asynchronously. So if a backend pool driver does
not support a particular method to be run asynchronously, there may be issues. In short, it is always preferable to use the original backend
driver over this one unless the functionality of this driver is required for a particular use case.
**Integration tests**:
Expand All @@ -58,7 +71,7 @@ class CallbackPool(qubes.storage.Pool):
ls /mnt/test01
qvm-pool -r test && sudo rm -rf /mnt/test01
echo '#!/bin/bash'$'\n''i=0 ; for arg in "$@" ; do echo "$i: $arg" >> /tmp/callback.log ; (( i++)) ; done ; exit 0' > /usr/bin/testCbLogArgs && chmod +x /usr/bin/testCbLogArgs
echo '#!/bin/bash'$'\n''i=1 ; for arg in "$@" ; do echo "$i: $arg" >> /tmp/callback.log ; (( i++)) ; done ; exit 0' > /usr/bin/testCbLogArgs && chmod +x /usr/bin/testCbLogArgs
rm -f /tmp/callback.log
qvm-pool -o conf_id=testing-succ-file-02 -a test callback
qvm-pool
Expand Down Expand Up @@ -141,7 +154,7 @@ class CallbackPool(qubes.storage.Pool):
systemctl restart qubesd
qvm-start test-eph2 (trigger storage re-init)
md5sum /mnt/ram/teph.key (same as in (2))
qvm-shutdown test-eph2
qvm-shutdown --wait test-eph2
sudo umount /mnt/test_eph
qvm-create -l red -P teph test-eph-fail (must fail with error in journalctl)
ls /mnt/test_eph/ (should be empty)
Expand All @@ -150,7 +163,7 @@ class CallbackPool(qubes.storage.Pool):
qvm-create -l red -P teph test-eph3
md5sum /mnt/ram/teph.key (same as in (2))
sudo mount|grep -E 'ram|test'
ls /mnt/test_eph/appvms/test_eph3
ls /mnt/test_eph/appvms/test-eph3
qvm-remove test-eph3
qvm-ls | grep test-eph
qvm-pool -r teph
Expand All @@ -172,13 +185,14 @@ def __init__(self, *, name, conf_id):
may be called from an untrusted VM (not by default though). In those cases it may be security-relevant
not to pick easily guessable `conf_id` values for your configuration as untrusted VMs may otherwise
execute callbacks meant for other pools.
:raise StoragePoolException: For user configuration issues.
'''
#NOTE: attribute names **must** start with `_cb_` unless they are meant to be stored as self._cb_impl attributes
self._cb_ctor_done = False #: Boolean to indicate whether or not `__init__` successfully ran through.
self._cb_log = logging.getLogger('qubes.storage.callback') #: Logger instance.
if not isinstance(conf_id, str):
raise qubes.storage.StoragePoolException('conf_id is no String. VM attack?!')
self._cb_conf_id = conf_id #: Configuration ID as passed to `__init__`.
self._cb_conf_id = conf_id #: Configuration ID as passed to `__init__()`.

with open(CallbackPool.config_path) as json_file:
conf_all = json.load(json_file)
Expand Down Expand Up @@ -207,12 +221,13 @@ def __init__(self, *, name, conf_id):
raise qubes.storage.StoragePoolException('The class %s must be a subclass of qubes.storage.Pool.' % cls)

self._cb_requires_init = self._check_init() #: Boolean indicating whether late storage initialization yet has to be done or not.
self._cb_init_lock = threading.Lock() #: Lock ensuring that late storage initialization is only run exactly once. Currently a `threading.Lock()` to make it accessible from synchronous code as well.
bdriver_args = self._cb_conf.get('bdriver_args', {})
self._cb_impl = cls(name=name, **bdriver_args) #: Instance of the backend pool driver.

super().__init__(name=name, revisions_to_keep=int(bdriver_args.get('revisions_to_keep', 1)))
self._cb_ctor_done = True
self._callback('on_ctor')
self._callback_nocoro('on_ctor')

def _check_init(self):
''' Whether or not this object requires late storage initialization via callback. '''
Expand All @@ -221,24 +236,38 @@ def _check_init(self):
cmd = self._cb_conf.get('cmd')
return bool(cmd and cmd != '-')

@asyncio.coroutine
def _init(self, callback=True):
''' Late storage initialization on first use for e.g. decryption on first usage request.
:param callback: Whether to trigger the `on_sinit` callback or not.
'''
#maybe TODO: if this function is meant to be run in parallel (are Pool operations asynchronous?), a function lock is required!
if callback:
self._callback('on_sinit')
self._cb_requires_init = False

with self._cb_init_lock:
if self._cb_requires_init:
if callback:
yield from self._callback('on_sinit')
self._cb_requires_init = False

def _init_nocoro(self, callback=True):
''' `_init()` in synchronous code. '''
with self._cb_init_lock:
if self._cb_requires_init:
if callback:
self._callback_nocoro('on_sinit')
self._cb_requires_init = False

@asyncio.coroutine
def _assert_initialized(self, **kwargs):
if self._cb_requires_init:
self._init(**kwargs)
yield from self._init(**kwargs)

def _callback(self, cb, cb_args=None):
'''Run a callback.
def _callback_nocoro(self, cb, cb_args=None, handle_signals=True):
'''Run a callback (variant that can be used outside of coroutines / from synchronous code).
:param cb: Callback identifier string.
:param cb_args: Optional list of arguments to pass to the command as last arguments.
Only passed on for the generic command specified as `cmd`, not for `on_xyz` callbacks.
:param handle_signals: Attempt to handle signals locally in synchronous code.
May throw an exception, if a callback signal cannot be handled locally.
:return: String with potentially unhandled signals, if `handle_signals` is `False`. Nothing otherwise.
'''
if self._cb_ctor_done:
cmd = self._cb_conf.get(cb)
Expand All @@ -249,7 +278,6 @@ def _callback(self, cb, cb_args=None):
cmd = self._cb_conf.get('cmd')
args = [self.name, self._cb_conf['bdriver'], cb, self._cb_cmd_arg, *cb_args]
if cmd and cmd != '-':
args = filter(None, args)
args = ' '.join(quote(str(a)) for a in args)
cmd = ' '.join(filter(None, [cmd, args]))
self._cb_log.info('callback driver executing (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, cmd)
Expand All @@ -258,8 +286,24 @@ def _callback(self, cb, cb_args=None):
self._cb_log.debug('callback driver stdout (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, res.stdout)
self._cb_log.debug('callback driver stderr (%s, %s %s): %s', self._cb_conf_id, cb, cb_args, res.stderr)
if self._cb_conf.get('signal_back', False) is True:
self._process_signals(res.stdout)
if handle_signals:
self._process_signals_nocoro(res.stdout)
else:
return res.stdout
return None

@asyncio.coroutine
def _callback(self, cb, cb_args=None):
'''Run a callback.
:param cb: Callback identifier string.
:param cb_args: Optional list of arguments to pass to the command as last arguments.
Only passed on for the generic command specified as `cmd`, not for `on_xyz` callbacks.
'''
ret = self._callback_nocoro(cb, cb_args=cb_args, handle_signals=False)
if ret:
yield from self._process_signals(ret)

@asyncio.coroutine
def _process_signals(self, out):
'''Process any signals found inside a string.
:param out: String to check for signals. Each signal must be on a dedicated line.
Expand All @@ -268,7 +312,18 @@ def _process_signals(self, out):
for line in out.splitlines():
if line == 'SIGNAL_setup':
self._cb_log.info('callback driver processing SIGNAL_setup for %s', self._cb_conf_id)
self._setup_cb(callback=False)
#NOTE: calling our own methods may lead to a deadlock / qubesd freeze due to `self._assert_initialized()` / `self._cb_init_lock`
yield from coro_maybe(self._cb_impl.setup())

def _process_signals_nocoro(self, out):
'''Variant of `process_signals` to be used with synchronous code.
:param out: String to check for signals. Each signal must be on a dedicated line.
They are executed in the order they are found. Callbacks are not triggered.
:raise UnhandledSignalException: If signals cannot be handled here / in synchronous code.
'''
for line in out.splitlines():
if line == 'SIGNAL_setup':
raise UnhandledSignalException(self, line)

@property
def config(self):
Expand All @@ -278,23 +333,21 @@ def config(self):
'conf_id': self._cb_conf_id,
}

@asyncio.coroutine
def destroy(self):
self._assert_initialized()
ret = self._cb_impl.destroy()
self._callback('on_destroy')
yield from self._assert_initialized()
ret = yield from coro_maybe(self._cb_impl.destroy())
yield from self._callback('on_destroy')
return ret

def init_volume(self, vm, volume_config):
return CallbackVolume(self, self._cb_impl.init_volume(vm, volume_config))

def _setup_cb(self, callback=True):
if callback:
self._callback('on_setup')
self._assert_initialized(callback=False) #setup is assumed to include storage initialization
return self._cb_impl.setup()

@asyncio.coroutine
def setup(self):
return self._setup_cb()
yield from self._assert_initialized(callback=False) #setup is assumed to include storage initialization
yield from self._callback('on_setup')
return (yield from coro_maybe(self._cb_impl.setup()))

@property
def volumes(self):
Expand Down Expand Up @@ -365,72 +418,107 @@ def __init__(self, pool, impl):
self._cb_pool = pool #: CallbackPool instance the Volume belongs to.
self._cb_impl = impl #: Backend volume implementation instance.

@asyncio.coroutine
def _assert_initialized(self, **kwargs):
return self._cb_pool._assert_initialized(**kwargs) # pylint: disable=protected-access
yield from self._cb_pool._assert_initialized(**kwargs) # pylint: disable=protected-access

@asyncio.coroutine
def _callback(self, cb, cb_args=None, **kwargs):
if cb_args is None:
cb_args = []
vol_args = [self.name, self.vid, *cb_args]
return self._cb_pool._callback(cb, cb_args=vol_args, **kwargs) # pylint: disable=protected-access
yield from self._cb_pool._callback(cb, cb_args=vol_args, **kwargs) # pylint: disable=protected-access

@asyncio.coroutine
def create(self):
self._assert_initialized()
self._callback('on_volume_create')
return self._cb_impl.create()
yield from self._assert_initialized()
yield from self._callback('on_volume_create')
return (yield from coro_maybe(self._cb_impl.create()))

@asyncio.coroutine
def remove(self):
self._assert_initialized()
ret = self._cb_impl.remove()
self._callback('on_volume_remove')
yield from self._assert_initialized()
ret = yield from coro_maybe(self._cb_impl.remove())
yield from self._callback('on_volume_remove')
return ret

@asyncio.coroutine
def resize(self, size):
self._assert_initialized()
self._callback('on_volume_resize', cb_args=[size])
return self._cb_impl.resize(size)
yield from self._assert_initialized()
yield from self._callback('on_volume_resize', cb_args=[size])
return (yield from coro_maybe(self._cb_impl.resize(size)))

@asyncio.coroutine
def start(self):
self._assert_initialized()
self._callback('on_volume_start')
return self._cb_impl.start()
yield from self._assert_initialized()
yield from self._callback('on_volume_start')
return (yield from coro_maybe(self._cb_impl.start()))

@asyncio.coroutine
def stop(self):
self._assert_initialized()
ret = self._cb_impl.stop()
self._callback('on_volume_stop')
yield from self._assert_initialized()
ret = yield from coro_maybe(self._cb_impl.stop())
yield from self._callback('on_volume_stop')
return ret

@asyncio.coroutine
def import_data(self):
self._assert_initialized()
self._callback('on_volume_import_data')
return self._cb_impl.import_data()
yield from self._assert_initialized()
yield from self._callback('on_volume_import_data')
return (yield from coro_maybe(self._cb_impl.import_data()))

@asyncio.coroutine
def import_data_end(self, success):
self._assert_initialized()
ret = self._cb_impl.import_data_end(success)
self._callback('on_volume_import_data_end', cb_args=[success])
yield from self._assert_initialized()
ret = yield from coro_maybe(self._cb_impl.import_data_end(success))
yield from self._callback('on_volume_import_data_end', cb_args=[success])
return ret

@asyncio.coroutine
def import_volume(self, src_volume):
self._assert_initialized()
self._callback('on_volume_import', cb_args=[src_volume.vid])
return self._cb_impl.import_volume(src_volume)
yield from self._assert_initialized()
yield from self._callback('on_volume_import', cb_args=[src_volume.vid])
return (yield from coro_maybe(self._cb_impl.import_volume(src_volume)))

def is_dirty(self):
if self._cb_pool._cb_requires_init: # pylint: disable=protected-access
# pylint: disable=protected-access
if self._cb_pool._cb_requires_init:
return False
return self._cb_impl.is_dirty()

def is_outdated(self):
if self._cb_pool._cb_requires_init: # pylint: disable=protected-access
# pylint: disable=protected-access
if self._cb_pool._cb_requires_init:
return False
return self._cb_impl.is_outdated()

def block_device(self):
# pylint: disable=protected-access
if self._cb_pool._cb_requires_init:
# usually Volume.start() is called beforehand
# --> we should be initialized in 99% of cases
return None
return self._cb_impl.block_device()

def export(self, volume):
# pylint: disable=protected-access
#TODO: once this becomes a coroutine in the Volume class, avoid the below blocking & potentially exception-throwing code; maybe also add a callback
if self._cb_pool._cb_requires_init:
self._cb_pool._init_nocoro()
return self._cb_impl.export(volume)

@asyncio.coroutine
def verify(self):
yield from self._assert_initialized()
return (yield from coro_maybe(self._cb_impl.verify()))

@asyncio.coroutine
def revert(self, revision=None):
yield from self._assert_initialized()
return (yield from coro_maybe(self._cb_impl.revert(revision=revision)))

#remaining method & attribute delegation
def __getattr__(self, name):
if name in ['block_device', 'verify', 'revert', 'export']:
self._assert_initialized()
return getattr(self._cb_impl, name)

def __setattr__(self, name, value):
Expand Down

0 comments on commit 889c923

Please sign in to comment.