From e7e2700d2922cc23b4ef4496f1f112bdc4a8f2f4 Mon Sep 17 00:00:00 2001 From: Edward J M Easton Date: Tue, 1 Sep 2015 14:43:08 +0100 Subject: [PATCH 1/2] When deleting symbols, we assert that the symbol is actually deleted using has_symbol(). This assertion can fail when reading from a secondary cluser member. This commit adds the allow_secondary argument to the has_symbol() method, and a little refactoring of the code to determine the mongo read preference. --- arctic/store/version_store.py | 39 +++++++++++++++++++----- tests/unit/store/test_version_store.py | 41 +++++++++++++++++++------- 2 files changed, 63 insertions(+), 17 deletions(-) diff --git a/arctic/store/version_store.py b/arctic/store/version_store.py index c48cce66e004a..0d27c1cb2ccdd 100644 --- a/arctic/store/version_store.py +++ b/arctic/store/version_store.py @@ -105,6 +105,12 @@ def __str__(self): def __repr__(self): return str(self) + + def _read_preference(self, allow_secondary): + """ Return the mongo read preference given an 'allow_secondary' argument + """ + allow_secondary = self._allow_secondary if allow_secondary is None else allow_secondary + return ReadPreference.NEAREST if allow_secondary else ReadPreference.PRIMARY @mongo_retry def list_symbols(self, all_symbols=False, snapshot=None, regex=None, **kwargs): @@ -166,7 +172,7 @@ def list_symbols(self, all_symbols=False, snapshot=None, regex=None, **kwargs): return sorted([x['symbol'] for x in results]) @mongo_retry - def has_symbol(self, symbol, as_of=None): + def has_symbol(self, symbol, as_of=None, allow_secondary=None): """ Return True if the 'symbol' exists in this library AND the symbol isn't deleted in the specified as_of. @@ -177,9 +183,19 @@ def has_symbol(self, symbol, as_of=None): ---------- symbol : `str` symbol name for the item + as_of : `str` or int or `datetime.datetime` + Return the data as it was as_of the point in time. + `int` : specific version number + `str` : snapshot name which contains the version + `datetime.datetime` : the version of the data that existed as_of the requested point in time + allow_secondary : `bool` or `None` + Override the default behavior for allowing reads from secondary members of a cluster: + `None` : use the settings from the top-level `Arctic` object used to query this version store. + `True` : allow reads from secondary members + `False` : only allow reads from primary members """ try: - self._read_metadata(symbol, as_of=as_of) + self._read_metadata(symbol, as_of=as_of, read_preference=self._read_preference(allow_secondary)) return True except NoDataFoundException: return False @@ -287,14 +303,18 @@ def read(self, symbol, as_of=None, from_version=None, allow_secondary=None, **kw `int` : specific version number `str` : snapshot name which contains the version `datetime.datetime` : the version of the data that existed as_of the requested point in time + allow_secondary : `bool` or `None` + Override the default behavior for allowing reads from secondary members of a cluster: + `None` : use the settings from the top-level `Arctic` object used to query this version store. + `True` : allow reads from secondary members + `False` : only allow reads from primary members Returns ------- VersionedItem namedtuple which contains a .data and .metadata element """ - allow_secondary = self._allow_secondary if allow_secondary is None else allow_secondary try: - read_preference = ReadPreference.NEAREST if allow_secondary else ReadPreference.PRIMARY + read_preference = self._read_preference(allow_secondary) _version = self._read_metadata(symbol, as_of=as_of, read_preference=read_preference) return self._do_read(symbol, _version, from_version, read_preference=read_preference, **kwargs) except (OperationFailure, AutoReconnect) as e: @@ -347,7 +367,7 @@ def _do_read(self, symbol, version, from_version=None, **kwargs): _do_read_retry = mongo_retry(_do_read) @mongo_retry - def read_metadata(self, symbol, as_of=None): + def read_metadata(self, symbol, as_of=None, allow_secondary=None): """ Return the metadata saved for a symbol. This method is fast as it doesn't actually load the data. @@ -361,8 +381,13 @@ def read_metadata(self, symbol, as_of=None): `int` : specific version number `str` : snapshot name which contains the version `datetime.datetime` : the version of the data that existed as_of the requested point in time + allow_secondary : `bool` or `None` + Override the default behavior for allowing reads from secondary members of a cluster: + `None` : use the settings from the top-level `Arctic` object used to query this version store. + `True` : allow reads from secondary members + `False` : only allow reads from primary members """ - _version = self._read_metadata(symbol, as_of=as_of) + _version = self._read_metadata(symbol, as_of=as_of, read_preference=self._read_preference(allow_secondary)) return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=_version['version'], metadata=_version.pop('metadata', None), data=None) @@ -642,7 +667,7 @@ def delete(self, symbol): 'metadata.deleted': {'$ne': True}}) if not snapped_version: self._delete_version(symbol, sentinel.version) - assert not self.has_symbol(symbol) + assert not self.has_symbol(symbol, allow_secondary=False) def _write_audit(self, user, message, changed_version): """ diff --git a/tests/unit/store/test_version_store.py b/tests/unit/store/test_version_store.py index 42fa888f9227d..4c978b91849f9 100644 --- a/tests/unit/store/test_version_store.py +++ b/tests/unit/store/test_version_store.py @@ -44,31 +44,52 @@ def test_list_versions_localTime(): 'snapshots': 'snap'} +def test__read_preference__allow_secondary_true(): + self = create_autospec(VersionStore) + assert VersionStore._read_preference(self, True) == ReadPreference.NEAREST + + +def test__read_preference__allow_secondary_false(): + self = create_autospec(VersionStore) + assert VersionStore._read_preference(self, False) == ReadPreference.PRIMARY + + +def test__read_preference__default_true(): + self = create_autospec(VersionStore, _allow_secondary=True) + assert VersionStore._read_preference(self, None) == ReadPreference.NEAREST + + +def test__read_preference__default_false(): + self = create_autospec(VersionStore, _allow_secondary=False) + assert VersionStore._read_preference(self, None) == ReadPreference.PRIMARY + + def test_get_version_allow_secondary_True(): vs = create_autospec(VersionStore, instance=True, _versions=Mock()) - vs._allow_secondary = True + vs._read_preference.return_value = sentinel.read_preference vs._find_snapshots.return_value = 'snap' vs._versions.find.return_value = [{'_id': bson.ObjectId.from_datetime(dt(2013, 4, 1, 9, 0)), 'symbol': 's', 'version': 10}] VersionStore.read(vs, "symbol") - assert vs._read_metadata.call_args_list == [call('symbol', as_of=None, read_preference=ReadPreference.NEAREST)] - assert vs._do_read.call_args_list == [call('symbol', vs._read_metadata.return_value, None, read_preference=ReadPreference.NEAREST)] + assert vs._read_metadata.call_args_list == [call('symbol', as_of=None, read_preference=sentinel.read_preference)] + assert vs._do_read.call_args_list == [call('symbol', vs._read_metadata.return_value, None, read_preference=sentinel.read_preference)] def test_get_version_allow_secondary_user_override_False(): """Ensure user can override read preference when calling read""" vs = create_autospec(VersionStore, instance=True, _versions=Mock()) - vs._allow_secondary = True + vs._read_preference.return_value = sentinel.read_preference vs._find_snapshots.return_value = 'snap' vs._versions.find.return_value = [{'_id': bson.ObjectId.from_datetime(dt(2013, 4, 1, 9, 0)), 'symbol': 's', 'version': 10}] VersionStore.read(vs, "symbol", allow_secondary=False) - assert vs._read_metadata.call_args_list == [call('symbol', as_of=None, read_preference=ReadPreference.PRIMARY)] - assert vs._do_read.call_args_list == [call('symbol', vs._read_metadata.return_value, None, read_preference=ReadPreference.PRIMARY)] + assert vs._read_metadata.call_args_list == [call('symbol', as_of=None, read_preference=sentinel.read_preference)] + assert vs._do_read.call_args_list == [call('symbol', vs._read_metadata.return_value, None, read_preference=sentinel.read_preference)] + vs._read_preference.assert_called_once_with(False) def test_read_as_of_LondonTime(): @@ -176,17 +197,17 @@ def test_prune_previous_versions_0_timeout(): def test_read_handles_operation_failure(): - self = create_autospec(VersionStore, _versions=Mock(), _arctic_lib=Mock(), - _allow_secondary=True) + self = create_autospec(VersionStore, _versions=Mock(), _arctic_lib=Mock()) + self._read_preference.return_value = sentinel.read_preference self._collection = create_autospec(Collection) self._read_metadata.side_effect = [sentinel.meta1, sentinel.meta2] self._read_metadata.__name__ = 'name' self._do_read.__name__ = 'name' # feh: mongo_retry decorator cares about this self._do_read.side_effect = [OperationFailure('error'), sentinel.read] VersionStore.read(self, sentinel.symbol, sentinel.as_of, sentinel.from_version) - # Assert that, for the two read calls, the second uses the new metadata + # Assert that, for the two read calls, the second uses the new metadata and forces a read from primary assert self._do_read.call_args_list == [call(sentinel.symbol, sentinel.meta1, sentinel.from_version, - read_preference=ReadPreference.NEAREST)] + read_preference=sentinel.read_preference)] assert self._do_read_retry.call_args_list == [call(sentinel.symbol, sentinel.meta2, sentinel.from_version, read_preference=ReadPreference.PRIMARY)] From e20cc85e2f3ad4f8e3cae6a22c37b1760d7847c2 Mon Sep 17 00:00:00 2001 From: Edward J M Easton Date: Tue, 1 Sep 2015 16:52:07 +0100 Subject: [PATCH 2/2] ReviewFix: always use primary for has_symbol, it's safer. --- arctic/store/version_store.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/arctic/store/version_store.py b/arctic/store/version_store.py index 0d27c1cb2ccdd..b7e37c46616c9 100644 --- a/arctic/store/version_store.py +++ b/arctic/store/version_store.py @@ -172,7 +172,7 @@ def list_symbols(self, all_symbols=False, snapshot=None, regex=None, **kwargs): return sorted([x['symbol'] for x in results]) @mongo_retry - def has_symbol(self, symbol, as_of=None, allow_secondary=None): + def has_symbol(self, symbol, as_of=None): """ Return True if the 'symbol' exists in this library AND the symbol isn't deleted in the specified as_of. @@ -188,14 +188,10 @@ def has_symbol(self, symbol, as_of=None, allow_secondary=None): `int` : specific version number `str` : snapshot name which contains the version `datetime.datetime` : the version of the data that existed as_of the requested point in time - allow_secondary : `bool` or `None` - Override the default behavior for allowing reads from secondary members of a cluster: - `None` : use the settings from the top-level `Arctic` object used to query this version store. - `True` : allow reads from secondary members - `False` : only allow reads from primary members """ try: - self._read_metadata(symbol, as_of=as_of, read_preference=self._read_preference(allow_secondary)) + # Always use the primary for has_symbol, it's safer + self._read_metadata(symbol, as_of=as_of, read_preference=ReadPreference.PRIMARY) return True except NoDataFoundException: return False @@ -667,7 +663,7 @@ def delete(self, symbol): 'metadata.deleted': {'$ne': True}}) if not snapped_version: self._delete_version(symbol, sentinel.version) - assert not self.has_symbol(symbol, allow_secondary=False) + assert not self.has_symbol(symbol) def _write_audit(self, user, message, changed_version): """