From 17a3d4de8faae07e6aa7b71db573943bf45e0207 Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Fri, 9 Aug 2019 12:49:05 -0400 Subject: [PATCH] Implement ``use_datastore`` flag. --- src/google/cloud/ndb/_datastore_api.py | 93 ++++++++++------ src/google/cloud/ndb/_options.py | 4 - src/google/cloud/ndb/context.py | 145 +++++++++++++------------ tests/conftest.py | 5 +- tests/system/test_crud.py | 30 +++++ tests/unit/test__datastore_api.py | 96 +++++++++++++++- tests/unit/test__options.py | 4 +- tests/unit/test_context.py | 37 +++++-- 8 files changed, 299 insertions(+), 115 deletions(-) diff --git a/src/google/cloud/ndb/_datastore_api.py b/src/google/cloud/ndb/_datastore_api.py index 6ceab240..703f60a7 100644 --- a/src/google/cloud/ndb/_datastore_api.py +++ b/src/google/cloud/ndb/_datastore_api.py @@ -136,9 +136,19 @@ def lookup(key, options): either an entity protocol buffer or _NOT_FOUND. """ context = context_module.get_context() - use_global_cache = context._use_global_cache(key, options) + use_datastore = context._use_datastore(key, options) + in_transaction = bool(_get_transaction(options)) + if use_datastore and in_transaction: + use_global_cache = False + else: + use_global_cache = context._use_global_cache(key, options) + + if not (use_global_cache or use_datastore): + raise TypeError( + "use_global_cache and use_datastore can't both be False" + ) - entity_pb = None + entity_pb = _NOT_FOUND key_locked = False if use_global_cache: @@ -150,20 +160,21 @@ def lookup(key, options): entity_pb = entity_pb2.Entity() entity_pb.MergeFromString(result) - else: + elif use_datastore: yield _cache.global_lock(cache_key) yield _cache.global_watch(cache_key) - if entity_pb is None: + if entity_pb is _NOT_FOUND and use_datastore: batch = _batch.get_batch(_LookupBatch, options) entity_pb = yield batch.add(key) - if use_global_cache and not key_locked and entity_pb is not _NOT_FOUND: - expires = context._global_cache_timeout(key, options) - serialized = entity_pb.SerializeToString() - yield _cache.global_compare_and_swap( - cache_key, serialized, expires=expires - ) + # Do not cache misses + if use_global_cache and not key_locked and entity_pb is not _NOT_FOUND: + expires = context._global_cache_timeout(key, options) + serialized = entity_pb.SerializeToString() + yield _cache.global_compare_and_swap( + cache_key, serialized, expires=expires + ) return entity_pb @@ -368,27 +379,39 @@ def put(entity, options): """ context = context_module.get_context() use_global_cache = context._use_global_cache(entity.key, options) + use_datastore = context._use_datastore(entity.key, options) + if not (use_global_cache or use_datastore): + raise TypeError( + "use_global_cache and use_datastore can't both be False" + ) + + entity_pb = helpers.entity_to_protobuf(entity) cache_key = _cache.global_cache_key(entity.key) if use_global_cache and not entity.key.is_partial: - yield _cache.global_lock(cache_key) - - transaction = _get_transaction(options) - if transaction: - batch = _get_commit_batch(transaction, options) - else: - batch = _batch.get_batch(_NonTransactionalCommitBatch, options) + if use_datastore: + yield _cache.global_lock(cache_key) + else: + expires = context._global_cache_timeout(entity.key, options) + cache_value = entity_pb.SerializeToString() + yield _cache.global_set(cache_key, cache_value, expires=expires) + + if use_datastore: + transaction = _get_transaction(options) + if transaction: + batch = _get_commit_batch(transaction, options) + else: + batch = _batch.get_batch(_NonTransactionalCommitBatch, options) - entity_pb = helpers.entity_to_protobuf(entity) - key_pb = yield batch.put(entity_pb) - if key_pb: - key = helpers.key_from_protobuf(key_pb) - else: - key = None + key_pb = yield batch.put(entity_pb) + if key_pb: + key = helpers.key_from_protobuf(key_pb) + else: + key = None - if use_global_cache: - yield _cache.global_delete(cache_key) + if use_global_cache: + yield _cache.global_delete(cache_key) - return key + return key @tasklets.tasklet @@ -408,18 +431,22 @@ def delete(key, options): """ context = context_module.get_context() use_global_cache = context._use_global_cache(key, options) + use_datastore = context._use_datastore(key, options) if use_global_cache: cache_key = _cache.global_cache_key(key) - yield _cache.global_lock(cache_key) - transaction = _get_transaction(options) - if transaction: - batch = _get_commit_batch(transaction, options) - else: - batch = _batch.get_batch(_NonTransactionalCommitBatch, options) + if use_datastore: + if use_global_cache: + yield _cache.global_lock(cache_key) + + transaction = _get_transaction(options) + if transaction: + batch = _get_commit_batch(transaction, options) + else: + batch = _batch.get_batch(_NonTransactionalCommitBatch, options) - yield batch.delete(key) + yield batch.delete(key) if use_global_cache: yield _cache.global_delete(cache_key) diff --git a/src/google/cloud/ndb/_options.py b/src/google/cloud/ndb/_options.py index c12fc375..4ce358dc 100644 --- a/src/google/cloud/ndb/_options.py +++ b/src/google/cloud/ndb/_options.py @@ -32,7 +32,6 @@ class Options: "use_cache", "use_global_cache", "global_cache_timeout", - # Not yet implemented "use_datastore", # Might or might not implement "force_writes", @@ -155,9 +154,6 @@ def __init__(self, config=None, **kwargs): ) ) - if self.use_datastore is not None: - raise NotImplementedError - if self.max_memcache_items is not None: raise NotImplementedError diff --git a/src/google/cloud/ndb/context.py b/src/google/cloud/ndb/context.py index 0bb8a42c..8275066a 100644 --- a/src/google/cloud/ndb/context.py +++ b/src/google/cloud/ndb/context.py @@ -65,66 +65,71 @@ def get_context(): raise exceptions.ContextError() -def _default_cache_policy(key): - """The default cache policy. +def _default_policy(attr_name, value_type): + """Factory for producing default policies. - Defers to ``_use_cache`` on the Model class for the key's kind. + Born of the observation that all default policies are more less the + same—they defer to some attribute on the model class for the key's kind and + expects the value to be either of a particular type or a callable. - See: :meth:`~google.cloud.ndb.context.Context.set_cache_policy` + Returns: + Callable[[key], value_type]: A policy function suitable for use as a + default policy. """ - flag = None - if key is not None: - modelclass = model.Model._kind_map.get(key.kind()) - if modelclass is not None: - policy = getattr(modelclass, "_use_cache", None) - if policy is not None: - if isinstance(policy, bool): - flag = policy - else: - flag = policy(key) - return flag + def policy(key): + value = None + if key is not None: + kind = key.kind + if callable(kind): + kind = kind() + modelclass = model.Model._kind_map.get(kind) + if modelclass is not None: + policy = getattr(modelclass, attr_name, None) + if policy is not None: + if isinstance(policy, value_type): + value = policy + else: + value = policy(key) + return value -def _default_global_cache_policy(key): - """The default global cache policy. + return policy - Defers to ``_use_global_cache`` on the Model class for the key's kind. - See: :meth:`~google.cloud.ndb.context.Context.set_global_cache_policy` - """ - flag = None - if key is not None: - modelclass = model.Model._kind_map.get(key.kind) - if modelclass is not None: - policy = getattr(modelclass, "_use_global_cache", None) - if policy is not None: - if isinstance(policy, bool): - flag = policy - else: - flag = policy(key) - - return flag - - -def _default_global_cache_timeout_policy(key): - """The default global cache timeout policy. - - Defers to ``_global_cache_timeout`` on the Model class for the key's kind. - See: - :meth:`~google.cloud.ndb.context.Context.set_global_cache_timeout_policy` - """ - timeout = None - if key is not None: - modelclass = model.Model._kind_map.get(key.kind) - if modelclass is not None: - policy = getattr(modelclass, "_global_cache_timeout", None) - if policy is not None: - if isinstance(policy, int): - timeout = policy - else: - timeout = policy(key) - return timeout +_default_cache_policy = _default_policy("_use_cache", bool) +"""The default cache policy. + +Defers to ``_use_cache`` on the Model class for the key's kind. + +See: :meth:`~google.cloud.ndb.context.Context.set_cache_policy` +""" + +_default_global_cache_policy = _default_policy("_use_global_cache", bool) +"""The default global cache policy. + +Defers to ``_use_global_cache`` on the Model class for the key's kind. + +See: :meth:`~google.cloud.ndb.context.Context.set_global_cache_policy` +""" + +_default_global_cache_timeout_policy = _default_policy( + "_global_cache_timeout", int +) +"""The default global cache timeout policy. + +Defers to ``_global_cache_timeout`` on the Model class for the key's kind. + +See: :meth:`~google.cloud.ndb.context.Context.set_global_cache_timeout_policy` +""" + +_default_datastore_policy = _default_policy("_use_datastore", bool) +"""The default datastore policy. + +Defers to ``_use_datastore`` on the Model class for the key's kind. + +See: :meth:`~google.cloud.ndb.context.Context.set_datastore_policy` +""" _ContextTuple = collections.namedtuple( @@ -172,6 +177,7 @@ def __new__( global_cache=None, global_cache_policy=None, global_cache_timeout_policy=None, + datastore_policy=None, ): if eventloop is None: eventloop = _eventloop.EventLoop() @@ -206,6 +212,7 @@ def __new__( context.set_cache_policy(cache_policy) context.set_global_cache_policy(global_cache_policy) context.set_global_cache_timeout_policy(global_cache_timeout_policy) + context.set_datastore_policy(datastore_policy) return context @@ -283,6 +290,15 @@ def _global_cache_timeout(self, key, options): timeout = self.global_cache_timeout_policy(key) return timeout + def _use_datastore(self, key, options=None): + """Return whether to use the Datastore for this key.""" + flag = options.use_datastore if options else None + if flag is None: + flag = self.datastore_policy(key) + if flag is None: + flag = True + return flag + class Context(_Context): """User management of cache and other policy.""" @@ -376,7 +392,16 @@ def set_datastore_policy(self, policy): positional argument and returns a ``bool`` indicating if it should use the datastore. May be :data:`None`. """ - raise NotImplementedError + if policy is None: + policy = _default_datastore_policy + + elif isinstance(policy, bool): + flag = policy + + def policy(key): + return flag + + self.datastore_policy = policy def set_global_cache_policy(self, policy): """Set the memcache policy function. @@ -454,20 +479,6 @@ def in_transaction(self): """ return self.transaction is not None - @staticmethod - def default_datastore_policy(key): - """Default cache policy. - - This defers to ``Model._use_datastore``. - - Args: - key (google.cloud.ndb.key.Key): The key. - - Returns: - Union[bool, None]: Whether to use datastore. - """ - raise NotImplementedError - def memcache_add(self, *args, **kwargs): """Direct pass-through to memcache client.""" raise exceptions.NoLongerImplementedError() diff --git a/tests/conftest.py b/tests/conftest.py index f4f9a5b1..5d87b441 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -86,7 +86,10 @@ def context(): project="testing", namespace=None, spec=("project", "namespace") ) context = context_module.Context( - client, stub=mock.Mock(spec=()), eventloop=TestingEventLoop() + client, + stub=mock.Mock(spec=()), + eventloop=TestingEventLoop(), + datastore_policy=True, ) return context diff --git a/tests/system/test_crud.py b/tests/system/test_crud.py index cf1fe766..7d24e5db 100644 --- a/tests/system/test_crud.py +++ b/tests/system/test_crud.py @@ -798,3 +798,33 @@ class SomeKind(ndb.Model): with pytest.raises(ndb.exceptions.BadValueError): entity.put() + + +@mock.patch( + "google.cloud.ndb._datastore_api.make_call", + mock.Mock(side_effect=Exception("Datastore shouldn't get called.")), +) +def test_crud_without_datastore(ds_entity, client_context): + entity_id = test_utils.system.unique_resource_id() + + class SomeKind(ndb.Model): + foo = ndb.IntegerProperty() + bar = ndb.StringProperty() + baz = ndb.StringProperty() + + global_cache = global_cache_module._InProcessGlobalCache() + with client_context.new(global_cache=global_cache).use() as context: + context.set_global_cache_policy(None) # Use default + context.set_datastore_policy(False) # Don't use Datastore + + key = ndb.Key(KIND, entity_id) + SomeKind(foo=42, bar="none", baz="night", _key=key).put() + + entity = key.get() + assert isinstance(entity, SomeKind) + assert entity.foo == 42 + assert entity.bar == "none" + assert entity.baz == "night" + + key.delete() + assert key.get() is None diff --git a/tests/unit/test__datastore_api.py b/tests/unit/test__datastore_api.py index ee077d93..c5a56b0b 100644 --- a/tests/unit/test__datastore_api.py +++ b/tests/unit/test__datastore_api.py @@ -150,7 +150,7 @@ def test_explicit_timeout(stub, _retry): def _mock_key(key_str): - key = mock.Mock(spec=("to_protobuf",)) + key = mock.Mock(kind="SomeKind", spec=("to_protobuf", "kind")) key.to_protobuf.return_value = protobuf = mock.Mock( spec=("SerializeToString",) ) @@ -195,6 +195,30 @@ def test_it_with_options(context): add_idle = context.eventloop.add_idle assert add_idle.call_count == 2 + @staticmethod + def test_it_with_transaction(context): + eventloop = mock.Mock(spec=("add_idle", "run")) + new_context = context.new(eventloop=eventloop, transaction=b"tx123") + with new_context.use(): + new_context._use_global_cache = mock.Mock( + side_effect=Exception("Shouldn't call _use_global_cache") + ) + _api.lookup(_mock_key("foo"), _options.ReadOptions()) + _api.lookup(_mock_key("foo"), _options.ReadOptions()) + _api.lookup(_mock_key("bar"), _options.ReadOptions()) + + batch = new_context.batches[_api._LookupBatch][()] + assert len(batch.todo["foo"]) == 2 + assert len(batch.todo["bar"]) == 1 + assert new_context.eventloop.add_idle.call_count == 1 + + @staticmethod + def test_it_no_global_cache_or_datastore(in_context): + with pytest.raises(TypeError): + _api.lookup( + _mock_key("foo"), _options.ReadOptions(use_datastore=False) + ).result() + class Test_lookup_WithGlobalCache: @staticmethod @@ -218,6 +242,25 @@ class SomeKind(model.Model): assert global_cache.get([cache_key]) == [cache_value] + @staticmethod + @mock.patch("google.cloud.ndb._datastore_api._LookupBatch") + def test_cache_miss_no_datastore(_LookupBatch, global_cache): + class SomeKind(model.Model): + pass + + key = key_module.Key("SomeKind", 1) + cache_key = _cache.global_cache_key(key._key) + + batch = _LookupBatch.return_value + batch.add.side_effect = Exception("Shouldn't use Datastore") + + future = _api.lookup( + key._key, _options.ReadOptions(use_datastore=False) + ) + assert future.result() is _api._NOT_FOUND + + assert global_cache.get([cache_key]) == [None] + @staticmethod @mock.patch("google.cloud.ndb._datastore_api._LookupBatch") def test_cache_hit(_LookupBatch, global_cache): @@ -595,6 +638,19 @@ def MockEntity(*path): Mutation(upsert=helpers.entity_to_protobuf(entity3)), ] + @staticmethod + @pytest.mark.usefixtures("in_context") + def test_no_datastore_or_global_cache(): + def MockEntity(*path): + key = ds_key_module.Key(*path, project="testing") + return entity.Entity(key=key) + + mock_entity = MockEntity("what", "ever") + with pytest.raises(TypeError): + _api.put( + mock_entity, _options.Options(use_datastore=False) + ).result() + class Test_put_WithGlobalCache: @staticmethod @@ -638,6 +694,29 @@ class SomeKind(model.Model): assert global_cache.get([cache_key]) == [None] + @staticmethod + @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") + def test_no_datastore(Batch, global_cache): + class SomeKind(model.Model): + pass + + key = key_module.Key("SomeKind", 1) + cache_key = _cache.global_cache_key(key._key) + + entity = SomeKind(key=key) + cache_value = model._entity_to_protobuf(entity).SerializeToString() + + batch = Batch.return_value + batch.put.return_value = future_result(None) + + future = _api.put( + model._entity_to_ds_entity(entity), + _options.Options(use_datastore=False), + ) + assert future.result() is None + + assert global_cache.get([cache_key]) == [cache_value] + class Test_delete: @staticmethod @@ -712,6 +791,21 @@ def test_cache_enabled(Batch, global_cache): assert global_cache.get([cache_key]) == [None] + @staticmethod + @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") + def test_without_datastore(Batch, global_cache): + key = key_module.Key("SomeKind", 1) + cache_key = _cache.global_cache_key(key._key) + global_cache.set({cache_key: b"foo"}) + + batch = Batch.return_value + batch.delete.side_effect = Exception("Shouldn't use Datastore") + + future = _api.delete(key._key, _options.Options(use_datastore=False)) + assert future.result() is None + + assert global_cache.get([cache_key]) == [None] + @staticmethod @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") def test_cache_disabled(Batch, global_cache): diff --git a/tests/unit/test__options.py b/tests/unit/test__options.py index 36c676fb..e302faa8 100644 --- a/tests/unit/test__options.py +++ b/tests/unit/test__options.py @@ -55,8 +55,8 @@ def test_constructor_w_use_memcache_and_global_cache(): @staticmethod def test_constructor_w_use_datastore(): - with pytest.raises(NotImplementedError): - MyOptions(use_datastore=20) + options = MyOptions(use_datastore=False) + assert options.use_datastore is False @staticmethod def test_constructor_w_use_cache(): diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index 7c9a7ee1..235b3ac3 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -141,6 +141,26 @@ def test_get_datastore_policy(self): with pytest.raises(NotImplementedError): context.get_datastore_policy() + def test__use_datastore_default_policy(self): + class SomeKind(model.Model): + pass + + context = self._make_one() + with context.use(): + key = key_module.Key("SomeKind", 1) + options = _options.Options() + assert context._use_datastore(key, options) is True + + def test__use_datastore_from_options(self): + class SomeKind(model.Model): + pass + + context = self._make_one() + with context.use(): + key = key_module.Key("SomeKind", 1) + options = _options.Options(use_datastore=False) + assert context._use_datastore(key, options) is False + def test_get_memcache_policy(self): context = self._make_one() context.get_memcache_policy() @@ -211,8 +231,16 @@ class SomeKind(model.Model): def test_set_datastore_policy(self): context = self._make_one() - with pytest.raises(NotImplementedError): - context.set_datastore_policy(None) + context.set_datastore_policy(None) + assert ( + context.datastore_policy + is context_module._default_datastore_policy + ) + + def test_set_datastore_policy_as_bool(self): + context = self._make_one() + context.set_datastore_policy(False) + context.datastore_policy(None) is False def test_set_memcache_policy(self): context = self._make_one() @@ -308,11 +336,6 @@ def test_in_transaction(self): context = self._make_one() assert context.in_transaction() is False - def test_default_datastore_policy(self): - context = self._make_one() - with pytest.raises(NotImplementedError): - context.default_datastore_policy(None) - def test_memcache_add(self): context = self._make_one() with pytest.raises(NotImplementedError):