Skip to content

Commit

Permalink
storage/callback: do not run sync code async
Browse files Browse the repository at this point in the history
  • Loading branch information
3hhh committed Jul 18, 2020
1 parent a53781b commit 287a4a0
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions qubes/storage/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import json
import asyncio
from shlex import quote
from qubes.utils import coro_maybe

import qubes.storage

Expand All @@ -45,13 +44,6 @@ class CallbackPool(qubes.storage.Pool):
- custom pool mounts
- encryption
- debugging
- run synchronous pool drivers asynchronously
A word of caution:
This implementation runs all methods that `qubes.storage.Pool` allows to be asynchronous asynchronously. So if a backend pool driver does
not support a particular method to be run asynchronously, there may be issues. In short, it is always preferable to use the original backend
driver over this one unless the functionality of this driver is required for a particular use case.
**Integration tests**:
(all of these tests assume the `qubes_callback.json.example` configuration)
Expand Down Expand Up @@ -234,13 +226,21 @@ def __init__(self, *, name, conf_id):

self._cb_requires_init = self._check_init() #: Boolean indicating whether late storage initialization yet has to be done or not.
self._cb_init_lock = asyncio.Lock() #: Lock ensuring that late storage initialization is only run exactly once.
self._cb_sync_lock = asyncio.Lock() #: Lock to prevent sync code from running async.
bdriver_args = self._cb_conf.get('bdriver_args', {})
self._cb_impl = cls(name=name, **bdriver_args) #: Instance of the backend pool driver.

super().__init__(name=name, revisions_to_keep=int(bdriver_args.get('revisions_to_keep', 1)))
self._cb_ctor_done = True
self._callback_nocoro('post_ctor')

@asyncio.coroutine
def _coro_maybe(self, value):
if asyncio.iscoroutine(value):
return (yield from value)
with (yield from self._cb_sync_lock):
return value

def _check_init(self):
''' Whether or not this object requires late storage initialization via callback. '''
cmd = self._cb_conf.get('pre_sinit')
Expand Down Expand Up @@ -317,7 +317,7 @@ def _process_signals(self, out):
if line == 'SIGNAL_setup':
self._cb_log.info('callback driver processing SIGNAL_setup for %s', self._cb_conf_id)
#NOTE: calling our own methods may lead to a deadlock / qubesd freeze due to `self._assert_initialized()` / `self._cb_init_lock`
yield from coro_maybe(self._cb_impl.setup())
yield from self._coro_maybe(self._cb_impl.setup())

def _process_signals_nocoro(self, out):
'''Variant of `process_signals` to be used with synchronous code.
Expand Down Expand Up @@ -347,7 +347,7 @@ def config(self):
@asyncio.coroutine
def destroy(self):
yield from self._assert_initialized()
ret = yield from coro_maybe(self._cb_impl.destroy())
ret = yield from self._coro_maybe(self._cb_impl.destroy())
yield from self._callback('post_destroy')
return ret

Expand All @@ -360,7 +360,7 @@ def init_volume(self, vm, volume_config):
def setup(self):
yield from self._assert_initialized(callback=False) #setup is assumed to include storage initialization
yield from self._callback('pre_setup')
return (yield from coro_maybe(self._cb_impl.setup()))
return (yield from self._coro_maybe(self._cb_impl.setup()))

@property
def volumes(self):
Expand Down Expand Up @@ -455,6 +455,10 @@ def __init__(self, pool, impl):
self._cb_pool = pool #: CallbackPool instance the Volume belongs to.
self._cb_impl = impl #: Backend volume implementation instance.

@asyncio.coroutine
def _coro_maybe(self, value):
return (yield from self._cb_pool._coro_maybe(value)) # pylint: disable=protected-access

@asyncio.coroutine
def _assert_initialized(self, **kwargs):
yield from self._cb_pool._assert_initialized(**kwargs) # pylint: disable=protected-access
Expand All @@ -477,54 +481,54 @@ def backend_class(self):
def create(self):
yield from self._assert_initialized()
yield from self._callback('pre_volume_create')
return (yield from coro_maybe(self._cb_impl.create()))
return (yield from self._coro_maybe(self._cb_impl.create()))

@asyncio.coroutine
def remove(self):
yield from self._assert_initialized()
ret = yield from coro_maybe(self._cb_impl.remove())
ret = yield from self._coro_maybe(self._cb_impl.remove())
yield from self._callback('post_volume_remove')
return ret

@asyncio.coroutine
def resize(self, size):
yield from self._assert_initialized()
yield from self._callback('pre_volume_resize', cb_args=[size])
return (yield from coro_maybe(self._cb_impl.resize(size)))
return (yield from self._coro_maybe(self._cb_impl.resize(size)))

@asyncio.coroutine
def start(self):
yield from self._assert_initialized()
yield from self._callback('pre_volume_start')
ret = yield from coro_maybe(self._cb_impl.start())
ret = yield from self._coro_maybe(self._cb_impl.start())
yield from self._callback('post_volume_start')
return ret

@asyncio.coroutine
def stop(self):
yield from self._assert_initialized()
ret = yield from coro_maybe(self._cb_impl.stop())
ret = yield from self._coro_maybe(self._cb_impl.stop())
yield from self._callback('post_volume_stop')
return ret

@asyncio.coroutine
def import_data(self, size):
yield from self._assert_initialized()
yield from self._callback('pre_volume_import_data', cb_args=[size])
return (yield from coro_maybe(self._cb_impl.import_data(size)))
return (yield from self._coro_maybe(self._cb_impl.import_data(size)))

@asyncio.coroutine
def import_data_end(self, success):
yield from self._assert_initialized()
ret = yield from coro_maybe(self._cb_impl.import_data_end(success))
ret = yield from self._coro_maybe(self._cb_impl.import_data_end(success))
yield from self._callback('post_volume_import_data_end', cb_args=[success])
return ret

@asyncio.coroutine
def import_volume(self, src_volume):
yield from self._assert_initialized()
yield from self._callback('pre_volume_import', cb_args=[src_volume.vid])
return (yield from coro_maybe(self._cb_impl.import_volume(src_volume)))
return (yield from self._coro_maybe(self._cb_impl.import_volume(src_volume)))

def is_dirty(self):
# pylint: disable=protected-access
Expand Down Expand Up @@ -566,24 +570,24 @@ def block_device(self):
def export(self):
yield from self._assert_initialized()
yield from self._callback('pre_volume_export')
return (yield from coro_maybe(self._cb_impl.export()))
return (yield from self._coro_maybe(self._cb_impl.export()))

@asyncio.coroutine
def export_end(self, path):
yield from self._assert_initialized()
ret = yield from coro_maybe(self._cb_impl.export_end(path))
ret = yield from self._coro_maybe(self._cb_impl.export_end(path))
yield from self._callback('post_volume_export_end', cb_args=[path])
return ret

@asyncio.coroutine
def verify(self):
yield from self._assert_initialized()
return (yield from coro_maybe(self._cb_impl.verify()))
return (yield from self._coro_maybe(self._cb_impl.verify()))

@asyncio.coroutine
def revert(self, revision=None):
yield from self._assert_initialized()
return (yield from coro_maybe(self._cb_impl.revert(revision=revision)))
return (yield from self._coro_maybe(self._cb_impl.revert(revision=revision)))

#shadow all qubes.storage.Volume class attributes as instance properties
#NOTE: this will cause a subtle difference to using an actual _cb_impl instance: CallbackVolume.devtype will return a property object, Volume.devtype the actual value
Expand Down

0 comments on commit 287a4a0

Please sign in to comment.