Skip to content

Commit

Permalink
When deleting symbols, we assert that the symbol is actually deleted
Browse files Browse the repository at this point in the history
using has_symbol(). This assertion can fail when reading from a
secondary cluser member. This commit adds the allow_secondary argument
to the has_symbol() method, and a little refactoring of the code to
determine the mongo read preference.
  • Loading branch information
Edward J M Easton committed Sep 1, 2015
1 parent dfbaa0f commit e7e2700
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 17 deletions.
39 changes: 32 additions & 7 deletions arctic/store/version_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ def __str__(self):

def __repr__(self):
return str(self)

def _read_preference(self, allow_secondary):
""" Return the mongo read preference given an 'allow_secondary' argument
"""
allow_secondary = self._allow_secondary if allow_secondary is None else allow_secondary
return ReadPreference.NEAREST if allow_secondary else ReadPreference.PRIMARY

@mongo_retry
def list_symbols(self, all_symbols=False, snapshot=None, regex=None, **kwargs):
Expand Down Expand Up @@ -166,7 +172,7 @@ def list_symbols(self, all_symbols=False, snapshot=None, regex=None, **kwargs):
return sorted([x['symbol'] for x in results])

@mongo_retry
def has_symbol(self, symbol, as_of=None):
def has_symbol(self, symbol, as_of=None, allow_secondary=None):
"""
Return True if the 'symbol' exists in this library AND the symbol
isn't deleted in the specified as_of.
Expand All @@ -177,9 +183,19 @@ def has_symbol(self, symbol, as_of=None):
----------
symbol : `str`
symbol name for the item
as_of : `str` or int or `datetime.datetime`
Return the data as it was as_of the point in time.
`int` : specific version number
`str` : snapshot name which contains the version
`datetime.datetime` : the version of the data that existed as_of the requested point in time
allow_secondary : `bool` or `None`
Override the default behavior for allowing reads from secondary members of a cluster:
`None` : use the settings from the top-level `Arctic` object used to query this version store.
`True` : allow reads from secondary members
`False` : only allow reads from primary members
"""
try:
self._read_metadata(symbol, as_of=as_of)
self._read_metadata(symbol, as_of=as_of, read_preference=self._read_preference(allow_secondary))
return True
except NoDataFoundException:
return False
Expand Down Expand Up @@ -287,14 +303,18 @@ def read(self, symbol, as_of=None, from_version=None, allow_secondary=None, **kw
`int` : specific version number
`str` : snapshot name which contains the version
`datetime.datetime` : the version of the data that existed as_of the requested point in time
allow_secondary : `bool` or `None`
Override the default behavior for allowing reads from secondary members of a cluster:
`None` : use the settings from the top-level `Arctic` object used to query this version store.
`True` : allow reads from secondary members
`False` : only allow reads from primary members
Returns
-------
VersionedItem namedtuple which contains a .data and .metadata element
"""
allow_secondary = self._allow_secondary if allow_secondary is None else allow_secondary
try:
read_preference = ReadPreference.NEAREST if allow_secondary else ReadPreference.PRIMARY
read_preference = self._read_preference(allow_secondary)
_version = self._read_metadata(symbol, as_of=as_of, read_preference=read_preference)
return self._do_read(symbol, _version, from_version, read_preference=read_preference, **kwargs)
except (OperationFailure, AutoReconnect) as e:
Expand Down Expand Up @@ -347,7 +367,7 @@ def _do_read(self, symbol, version, from_version=None, **kwargs):
_do_read_retry = mongo_retry(_do_read)

@mongo_retry
def read_metadata(self, symbol, as_of=None):
def read_metadata(self, symbol, as_of=None, allow_secondary=None):
"""
Return the metadata saved for a symbol. This method is fast as it doesn't
actually load the data.
Expand All @@ -361,8 +381,13 @@ def read_metadata(self, symbol, as_of=None):
`int` : specific version number
`str` : snapshot name which contains the version
`datetime.datetime` : the version of the data that existed as_of the requested point in time
allow_secondary : `bool` or `None`
Override the default behavior for allowing reads from secondary members of a cluster:
`None` : use the settings from the top-level `Arctic` object used to query this version store.
`True` : allow reads from secondary members
`False` : only allow reads from primary members
"""
_version = self._read_metadata(symbol, as_of=as_of)
_version = self._read_metadata(symbol, as_of=as_of, read_preference=self._read_preference(allow_secondary))
return VersionedItem(symbol=symbol, library=self._arctic_lib.get_name(), version=_version['version'],
metadata=_version.pop('metadata', None), data=None)

Expand Down Expand Up @@ -642,7 +667,7 @@ def delete(self, symbol):
'metadata.deleted': {'$ne': True}})
if not snapped_version:
self._delete_version(symbol, sentinel.version)
assert not self.has_symbol(symbol)
assert not self.has_symbol(symbol, allow_secondary=False)

def _write_audit(self, user, message, changed_version):
"""
Expand Down
41 changes: 31 additions & 10 deletions tests/unit/store/test_version_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,52 @@ def test_list_versions_localTime():
'snapshots': 'snap'}


def test__read_preference__allow_secondary_true():
self = create_autospec(VersionStore)
assert VersionStore._read_preference(self, True) == ReadPreference.NEAREST


def test__read_preference__allow_secondary_false():
self = create_autospec(VersionStore)
assert VersionStore._read_preference(self, False) == ReadPreference.PRIMARY


def test__read_preference__default_true():
self = create_autospec(VersionStore, _allow_secondary=True)
assert VersionStore._read_preference(self, None) == ReadPreference.NEAREST


def test__read_preference__default_false():
self = create_autospec(VersionStore, _allow_secondary=False)
assert VersionStore._read_preference(self, None) == ReadPreference.PRIMARY


def test_get_version_allow_secondary_True():
vs = create_autospec(VersionStore, instance=True,
_versions=Mock())
vs._allow_secondary = True
vs._read_preference.return_value = sentinel.read_preference
vs._find_snapshots.return_value = 'snap'
vs._versions.find.return_value = [{'_id': bson.ObjectId.from_datetime(dt(2013, 4, 1, 9, 0)),
'symbol': 's', 'version': 10}]

VersionStore.read(vs, "symbol")
assert vs._read_metadata.call_args_list == [call('symbol', as_of=None, read_preference=ReadPreference.NEAREST)]
assert vs._do_read.call_args_list == [call('symbol', vs._read_metadata.return_value, None, read_preference=ReadPreference.NEAREST)]
assert vs._read_metadata.call_args_list == [call('symbol', as_of=None, read_preference=sentinel.read_preference)]
assert vs._do_read.call_args_list == [call('symbol', vs._read_metadata.return_value, None, read_preference=sentinel.read_preference)]


def test_get_version_allow_secondary_user_override_False():
"""Ensure user can override read preference when calling read"""
vs = create_autospec(VersionStore, instance=True,
_versions=Mock())
vs._allow_secondary = True
vs._read_preference.return_value = sentinel.read_preference
vs._find_snapshots.return_value = 'snap'
vs._versions.find.return_value = [{'_id': bson.ObjectId.from_datetime(dt(2013, 4, 1, 9, 0)),
'symbol': 's', 'version': 10}]

VersionStore.read(vs, "symbol", allow_secondary=False)
assert vs._read_metadata.call_args_list == [call('symbol', as_of=None, read_preference=ReadPreference.PRIMARY)]
assert vs._do_read.call_args_list == [call('symbol', vs._read_metadata.return_value, None, read_preference=ReadPreference.PRIMARY)]
assert vs._read_metadata.call_args_list == [call('symbol', as_of=None, read_preference=sentinel.read_preference)]
assert vs._do_read.call_args_list == [call('symbol', vs._read_metadata.return_value, None, read_preference=sentinel.read_preference)]
vs._read_preference.assert_called_once_with(False)


def test_read_as_of_LondonTime():
Expand Down Expand Up @@ -176,17 +197,17 @@ def test_prune_previous_versions_0_timeout():


def test_read_handles_operation_failure():
self = create_autospec(VersionStore, _versions=Mock(), _arctic_lib=Mock(),
_allow_secondary=True)
self = create_autospec(VersionStore, _versions=Mock(), _arctic_lib=Mock())
self._read_preference.return_value = sentinel.read_preference
self._collection = create_autospec(Collection)
self._read_metadata.side_effect = [sentinel.meta1, sentinel.meta2]
self._read_metadata.__name__ = 'name'
self._do_read.__name__ = 'name' # feh: mongo_retry decorator cares about this
self._do_read.side_effect = [OperationFailure('error'), sentinel.read]
VersionStore.read(self, sentinel.symbol, sentinel.as_of, sentinel.from_version)
# Assert that, for the two read calls, the second uses the new metadata
# Assert that, for the two read calls, the second uses the new metadata and forces a read from primary
assert self._do_read.call_args_list == [call(sentinel.symbol, sentinel.meta1, sentinel.from_version,
read_preference=ReadPreference.NEAREST)]
read_preference=sentinel.read_preference)]
assert self._do_read_retry.call_args_list == [call(sentinel.symbol, sentinel.meta2, sentinel.from_version,
read_preference=ReadPreference.PRIMARY)]

Expand Down

0 comments on commit e7e2700

Please sign in to comment.