diff --git a/docs/global_cache.rst b/docs/global_cache.rst new file mode 100644 index 00000000..80c384d6 --- /dev/null +++ b/docs/global_cache.rst @@ -0,0 +1,7 @@ +####### +Context +####### + +.. automodule:: google.cloud.ndb.global_cache + :members: + :show-inheritance: diff --git a/docs/index.rst b/docs/index.rst index 7bec793e..ff5ec5fc 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -8,6 +8,7 @@ client context + global_cache key model query diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 0efb69b5..8c3b400d 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -65,6 +65,7 @@ protobuf proxied QueryOptions reimplemented +Redis RequestHandler runtime schemas diff --git a/src/google/cloud/ndb/__init__.py b/src/google/cloud/ndb/__init__.py index 04390df5..4c839eb8 100644 --- a/src/google/cloud/ndb/__init__.py +++ b/src/google/cloud/ndb/__init__.py @@ -52,6 +52,7 @@ "get_indexes_async", "get_multi", "get_multi_async", + "GlobalCache", "in_transaction", "Index", "IndexProperty", @@ -135,6 +136,7 @@ from google.cloud.ndb._datastore_api import STRONG from google.cloud.ndb._datastore_query import Cursor from google.cloud.ndb._datastore_query import QueryIterator +from google.cloud.ndb.global_cache import GlobalCache from google.cloud.ndb.key import Key from google.cloud.ndb.model import BlobKey from google.cloud.ndb.model import BlobKeyProperty diff --git a/src/google/cloud/ndb/_batch.py b/src/google/cloud/ndb/_batch.py new file mode 100644 index 00000000..b0dacbe5 --- /dev/null +++ b/src/google/cloud/ndb/_batch.py @@ -0,0 +1,66 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Support for batching operations.""" + +from google.cloud.ndb import context as context_module +from google.cloud.ndb import _eventloop + + +def get_batch(batch_cls, options=None): + """Gets a data structure for storing batched calls to Datastore Lookup. + + The batch data structure is stored in the current context. If there is + not already a batch started, a new structure is created and an idle + callback is added to the current event loop which will eventually perform + the batch look up. + + Args: + batch_cls (type): Class representing the kind of operation being + batched. + options (_options.ReadOptions): The options for the request. Calls with + different options will be placed in different batches. + + Returns: + batch_cls: An instance of the batch class. + """ + context = context_module.get_context() + batches = context.batches.get(batch_cls) + if batches is None: + context.batches[batch_cls] = batches = {} + + if options is not None: + options_key = tuple( + sorted( + ( + (key, value) + for key, value in options.items() + if value is not None + ) + ) + ) + else: + options_key = () + + batch = batches.get(options_key) + if batch is not None: + return batch + + def idle(): + batch = batches.pop(options_key) + batch.idle_callback() + + batches[options_key] = batch = batch_cls(options) + _eventloop.add_idle(idle) + return batch diff --git a/src/google/cloud/ndb/_cache.py b/src/google/cloud/ndb/_cache.py new file mode 100644 index 00000000..10e42c1b --- /dev/null +++ b/src/google/cloud/ndb/_cache.py @@ -0,0 +1,367 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import itertools + +from google.cloud.ndb import _batch +from google.cloud.ndb import context as context_module +from google.cloud.ndb import tasklets + +_LOCKED = b"0" +_LOCK_TIME = 32 +_PREFIX = b"NDB30" + + +class ContextCache(collections.UserDict): + """A per-context in-memory entity cache. + + This cache verifies the fetched entity has the correct key before + returning a result, in order to handle cases where the entity's key was + modified but the cache's key was not updated. + """ + + def get_and_validate(self, key): + """Verify that the entity's key has not changed since it was added + to the cache. If it has changed, consider this a cache miss. + See issue 13. http://goo.gl/jxjOP""" + entity = self.data[key] # May be None, meaning "doesn't exist". + if entity is None or entity._key == key: + return entity + else: + del self.data[key] + raise KeyError(key) + + +def _future_result(result): + """Returns a completed Future with the given result. + + For conforming to the asynchronous interface even if we've gotten the + result synchronously. + """ + future = tasklets.Future() + future.set_result(result) + return future + + +class _GlobalCacheBatch: + """Abstract base for classes used to batch operations for the global cache. + """ + + def idle_callback(self): + """Call the cache operation. + + Also, schedule a callback for the completed operation. + """ + cache_call = self.make_call() + if not isinstance(cache_call, tasklets.Future): + cache_call = _future_result(cache_call) + cache_call.add_done_callback(self.done_callback) + + def done_callback(self, cache_call): + """Process results of call to global cache. + + If there is an exception for the cache call, distribute that to waiting + futures, otherwise set the result for all waiting futures to ``None``. + """ + exception = cache_call.exception() + if exception: + for future in self.futures: + future.set_exception(exception) + + else: + for future in self.futures: + future.set_result(None) + + def make_call(self): + """Make the actual call to the global cache. To be overridden.""" + raise NotImplementedError + + def future_info(self, key): + """Generate info string for Future. To be overridden.""" + raise NotImplementedError + + +def global_get(key): + """Get entity from global cache. + + Args: + key (bytes): The key to get. + + Returns: + tasklets.Future: Eventual result will be the entity (``bytes``) or + ``None``. + """ + batch = _batch.get_batch(_GlobalCacheGetBatch) + return batch.add(key) + + +class _GlobalCacheGetBatch(_GlobalCacheBatch): + """Batch for global cache get requests. + + Attributes: + todo (Dict[bytes, List[Future]]): Mapping of keys to futures that are + waiting on them. + + Arguments: + ignore_options (Any): Ignored. + """ + + def __init__(self, ignore_options): + self.todo = {} + self.keys = [] + + def add(self, key): + """Add a key to get from the cache. + + Arguments: + key (bytes): The key to get from the cache. + + Returns: + tasklets.Future: Eventual result will be the entity retrieved from + the cache (``bytes``) or ``None``. + """ + future = tasklets.Future(info=self.future_info(key)) + futures = self.todo.get(key) + if futures is None: + self.todo[key] = futures = [] + self.keys.append(key) + futures.append(future) + return future + + def done_callback(self, cache_call): + """Process results of call to global cache. + + If there is an exception for the cache call, distribute that to waiting + futures, otherwise distribute cache hits or misses to their respective + waiting futures. + """ + exception = cache_call.exception() + if exception: + for future in itertools.chain(*self.todo.values()): + future.set_exception(exception) + + return + + results = cache_call.result() + for key, result in zip(self.keys, results): + futures = self.todo[key] + for future in futures: + future.set_result(result) + + def make_call(self): + """Call :method:`GlobalCache.get`.""" + cache = context_module.get_context().global_cache + return cache.get(self.todo.keys()) + + def future_info(self, key): + """Generate info string for Future.""" + return "GlobalCache.get({})".format(key) + + +def global_set(key, value, expires=None): + """Store entity in the global cache. + + Args: + key (bytes): The key to save. + value (bytes): The entity to save. + expires (Optional[float]): Number of seconds until value expires. + + Returns: + tasklets.Future: Eventual result will be ``None``. + """ + options = {} + if expires: + options = {"expires": expires} + + batch = _batch.get_batch(_GlobalCacheSetBatch, options) + return batch.add(key, value) + + +class _GlobalCacheSetBatch(_GlobalCacheBatch): + """Batch for global cache set requests. """ + + def __init__(self, options): + self.expires = options.get("expires") + self.todo = {} + self.futures = [] + + def add(self, key, value): + """Add a key, value pair to store in the cache. + + Arguments: + key (bytes): The key to store in the cache. + value (bytes): The value to store in the cache. + + Returns: + tasklets.Future: Eventual result will be ``None``. + """ + future = tasklets.Future(info=self.future_info(key, value)) + self.todo[key] = value + self.futures.append(future) + return future + + def make_call(self): + """Call :method:`GlobalCache.set`.""" + cache = context_module.get_context().global_cache + return cache.set(self.todo, expires=self.expires) + + def future_info(self, key, value): + """Generate info string for Future.""" + return "GlobalCache.set({}, {})".format(key, value) + + +def global_delete(key): + """Delete an entity from the global cache. + + Args: + key (bytes): The key to delete. + + Returns: + tasklets.Future: Eventual result will be ``None``. + """ + batch = _batch.get_batch(_GlobalCacheDeleteBatch) + return batch.add(key) + + +class _GlobalCacheDeleteBatch(_GlobalCacheBatch): + """Batch for global cache delete requests.""" + + def __init__(self, ignore_options): + self.keys = [] + self.futures = [] + + def add(self, key): + """Add a key to delete from the cache. + + Arguments: + key (bytes): The key to delete. + + Returns: + tasklets.Future: Eventual result will be ``None``. + """ + future = tasklets.Future(info=self.future_info(key)) + self.keys.append(key) + self.futures.append(future) + return future + + def make_call(self): + """Call :method:`GlobalCache.delete`.""" + cache = context_module.get_context().global_cache + return cache.delete(self.keys) + + def future_info(self, key): + """Generate info string for Future.""" + return "GlobalCache.delete({})".format(key) + + +def global_watch(key): + """Start optimistic transaction with global cache. + + A future call to :func:`global_compare_and_swap` will only set the value + if the value hasn't changed in the cache since the call to this function. + + Args: + key (bytes): The key to watch. + + Returns: + tasklets.Future: Eventual result will be ``None``. + """ + batch = _batch.get_batch(_GlobalCacheWatchBatch) + return batch.add(key) + + +class _GlobalCacheWatchBatch(_GlobalCacheDeleteBatch): + """Batch for global cache watch requests. """ + + def __init__(self, ignore_options): + self.keys = [] + self.futures = [] + + def make_call(self): + """Call :method:`GlobalCache.watch`.""" + cache = context_module.get_context().global_cache + return cache.watch(self.keys) + + def future_info(self, key): + """Generate info string for Future.""" + return "GlobalWatch.delete({})".format(key) + + +def global_compare_and_swap(key, value, expires=None): + """Like :func:`global_set` but using an optimistic transaction. + + Value will only be set for the given key if the value in the cache hasn't + changed since a preceding call to :func:`global_watch`. + + Args: + key (bytes): The key to save. + value (bytes): The entity to save. + expires (Optional[float]): Number of seconds until value expires. + + Returns: + tasklets.Future: Eventual result will be ``None``. + """ + options = {} + if expires: + options["expires"] = expires + + batch = _batch.get_batch(_GlobalCacheCompareAndSwapBatch, options) + return batch.add(key, value) + + +class _GlobalCacheCompareAndSwapBatch(_GlobalCacheSetBatch): + """Batch for global cache compare and swap requests. """ + + def make_call(self): + """Call :method:`GlobalCache.compare_and_swap`.""" + cache = context_module.get_context().global_cache + return cache.compare_and_swap(self.todo, expires=self.expires) + + def future_info(self, key, value): + """Generate info string for Future.""" + return "GlobalCache.compare_and_swap({}, {})".format(key, value) + + +def global_lock(key): + """Lock a key by setting a special value. + + Args: + key (bytes): The key to lock. + + Returns: + tasklets.Future: Eventual result will be ``None``. + """ + return global_set(key, _LOCKED, expires=_LOCK_TIME) + + +def is_locked_value(value): + """Check if the given value is the special reserved value for key lock. + + Returns: + bool: Whether the value is the special reserved value for key lock. + """ + return value == _LOCKED + + +def global_cache_key(key): + """Convert Datastore key to ``bytes`` to use for global cache key. + + Args: + key (datastore.Key): The Datastore key. + + Returns: + bytes: The cache key. + """ + return _PREFIX + key.to_protobuf().SerializeToString() diff --git a/src/google/cloud/ndb/_datastore_api.py b/src/google/cloud/ndb/_datastore_api.py index c90d786a..6ceab240 100644 --- a/src/google/cloud/ndb/_datastore_api.py +++ b/src/google/cloud/ndb/_datastore_api.py @@ -21,11 +21,14 @@ from google.cloud import _helpers from google.cloud import _http +from google.cloud.datastore import helpers from google.cloud.datastore_v1.proto import datastore_pb2 from google.cloud.datastore_v1.proto import datastore_pb2_grpc from google.cloud.datastore_v1.proto import entity_pb2 from google.cloud.ndb import context as context_module +from google.cloud.ndb import _batch +from google.cloud.ndb import _cache from google.cloud.ndb import _eventloop from google.cloud.ndb import _options from google.cloud.ndb import _remote @@ -116,12 +119,12 @@ def rpc_call(): return rpc_call() +@tasklets.tasklet def lookup(key, options): """Look up a Datastore entity. - Gets an entity from Datastore, asynchronously. Actually adds the request to - a batch and fires off a Datastore Lookup call as soon as some code asks for - the result of one of the batched requests. + Gets an entity from Datastore, asynchronously. Checks the global cache, + first, if appropriate. Uses batching. Args: key (~datastore.Key): The key for the entity to retrieve. @@ -132,52 +135,37 @@ def lookup(key, options): :class:`~tasklets.Future`: If not an exception, future's result will be either an entity protocol buffer or _NOT_FOUND. """ - batch = _get_batch(_LookupBatch, options) - return batch.add(key) - - -def _get_batch(batch_cls, options): - """Gets a data structure for storing batched calls to Datastore Lookup. + context = context_module.get_context() + use_global_cache = context._use_global_cache(key, options) - The batch data structure is stored in the current context. If there is - not already a batch started, a new structure is created and an idle - callback is added to the current event loop which will eventually perform - the batch look up. + entity_pb = None + key_locked = False - Args: - batch_cls (type): Class representing the kind of operation being - batched. - options (_options.ReadOptions): The options for the request. Calls with - different options will be placed in different batches. + if use_global_cache: + cache_key = _cache.global_cache_key(key) + result = yield _cache.global_get(cache_key) + key_locked = _cache.is_locked_value(result) + if not key_locked: + if result is not None: + entity_pb = entity_pb2.Entity() + entity_pb.MergeFromString(result) - Returns: - batch_cls: An instance of the batch class. - """ - context = context_module.get_context() - batches = context.batches.get(batch_cls) - if batches is None: - context.batches[batch_cls] = batches = {} - - options_key = tuple( - sorted( - ( - (key, value) - for key, value in options.items() - if value is not None - ) + else: + yield _cache.global_lock(cache_key) + yield _cache.global_watch(cache_key) + + if entity_pb is None: + batch = _batch.get_batch(_LookupBatch, options) + entity_pb = yield batch.add(key) + + if use_global_cache and not key_locked and entity_pb is not _NOT_FOUND: + expires = context._global_cache_timeout(key, options) + serialized = entity_pb.SerializeToString() + yield _cache.global_compare_and_swap( + cache_key, serialized, expires=expires ) - ) - batch = batches.get(options_key) - if batch is not None: - return batch - - def idle(): - batch = batches.pop(options_key) - batch.idle_callback() - batches[options_key] = batch = batch_cls(options) - _eventloop.add_idle(idle) - return batch + return entity_pb class _LookupBatch: @@ -256,7 +244,7 @@ def lookup_callback(self, rpc): # For all deferred keys, batch them up again with their original # futures if results.deferred: - next_batch = _get_batch(type(self), self.options) + next_batch = _batch.get_batch(type(self), self.options) for key in results.deferred: todo_key = key.SerializeToString() next_batch.todo.setdefault(todo_key, []).extend( @@ -363,29 +351,47 @@ def _get_transaction(options): return transaction -def put(entity_pb, options): +@tasklets.tasklet +def put(entity, options): """Store an entity in datastore. The entity can be a new entity to be saved for the first time or an existing entity that has been updated. Args: - entity_pb (datastore_v1.types.Entity): The entity to be stored. + entity_pb (datastore.Entity): The entity to be stored. options (_options.Options): Options for this request. Returns: tasklets.Future: Result will be completed datastore key - (entity_pb2.Key) for the entity. + (datastore.Key) for the entity. """ + context = context_module.get_context() + use_global_cache = context._use_global_cache(entity.key, options) + cache_key = _cache.global_cache_key(entity.key) + if use_global_cache and not entity.key.is_partial: + yield _cache.global_lock(cache_key) + transaction = _get_transaction(options) if transaction: batch = _get_commit_batch(transaction, options) else: - batch = _get_batch(_NonTransactionalCommitBatch, options) + batch = _batch.get_batch(_NonTransactionalCommitBatch, options) - return batch.put(entity_pb) + entity_pb = helpers.entity_to_protobuf(entity) + key_pb = yield batch.put(entity_pb) + if key_pb: + key = helpers.key_from_protobuf(key_pb) + else: + key = None + if use_global_cache: + yield _cache.global_delete(cache_key) + return key + + +@tasklets.tasklet def delete(key, options): """Delete an entity from Datastore. @@ -400,13 +406,23 @@ def delete(key, options): tasklets.Future: Will be finished when entity is deleted. Result will always be :data:`None`. """ + context = context_module.get_context() + use_global_cache = context._use_global_cache(key, options) + + if use_global_cache: + cache_key = _cache.global_cache_key(key) + yield _cache.global_lock(cache_key) + transaction = _get_transaction(options) if transaction: batch = _get_commit_batch(transaction, options) else: - batch = _get_batch(_NonTransactionalCommitBatch, options) + batch = _batch.get_batch(_NonTransactionalCommitBatch, options) - return batch.delete(key) + yield batch.delete(key) + + if use_global_cache: + yield _cache.global_delete(cache_key) class _NonTransactionalCommitBatch: @@ -747,8 +763,10 @@ def _complete(key_pb): A new key may be left incomplete so that the id can be allocated by the database. A key is considered incomplete if the last element of the path has neither a ``name`` or an ``id``. + Args: key_pb (entity_pb2.Key): The key to check. + Returns: boolean: :data:`True` if key is incomplete, otherwise :data:`False`. """ @@ -805,7 +823,7 @@ def allocate(keys, options): Returns: tasklets.Future: A future for the key completed with the allocated id. """ - batch = _get_batch(_AllocateIdsBatch, options) + batch = _batch.get_batch(_AllocateIdsBatch, options) return batch.add(keys) diff --git a/src/google/cloud/ndb/_options.py b/src/google/cloud/ndb/_options.py index 3ae496bb..c12fc375 100644 --- a/src/google/cloud/ndb/_options.py +++ b/src/google/cloud/ndb/_options.py @@ -30,13 +30,13 @@ class Options: "retries", "timeout", "use_cache", + "use_global_cache", + "global_cache_timeout", # Not yet implemented - "use_memcache", "use_datastore", - "memcache_timeout", - "max_memcache_items", # Might or might not implement "force_writes", + "max_memcache_items", # Deprecated "propagation", ) @@ -118,6 +118,25 @@ def __init__(self, config=None, **kwargs): raise TypeError("Can't specify both 'deadline' and 'timeout'") kwargs["timeout"] = deadline + memcache_timeout = kwargs.pop("memcache_timeout", None) + if memcache_timeout is not None: + global_cache_timeout = kwargs.get("global_cache_timeout") + if global_cache_timeout is not None: + raise TypeError( + "Can't specify both 'memcache_timeout' and " + "'global_cache_timeout'" + ) + kwargs["global_cache_timeout"] = memcache_timeout + + use_memcache = kwargs.pop("use_memcache", None) + if use_memcache is not None: + use_global_cache = kwargs.get("use_global_cache") + if use_global_cache is not None: + raise TypeError( + "Can't specify both 'use_memcache' and 'use_global_cache'" + ) + kwargs["use_global_cache"] = use_memcache + for key in self.slots(): default = getattr(config, key, None) if config else None setattr(self, key, kwargs.pop(key, default)) @@ -136,15 +155,9 @@ def __init__(self, config=None, **kwargs): ) ) - if self.use_memcache is not None: - raise NotImplementedError - if self.use_datastore is not None: raise NotImplementedError - if self.memcache_timeout is not None: - raise NotImplementedError - if self.max_memcache_items is not None: raise NotImplementedError diff --git a/src/google/cloud/ndb/_transaction.py b/src/google/cloud/ndb/_transaction.py index 2c6ed663..c14be894 100644 --- a/src/google/cloud/ndb/_transaction.py +++ b/src/google/cloud/ndb/_transaction.py @@ -99,7 +99,7 @@ def _transaction_async(context, callback, read_only=False): read_only, retries=0 ) - with context.new(transaction=transaction_id).use(): + with context.new(transaction=transaction_id).use() as tx_context: try: # Run the callback result = callback() @@ -114,6 +114,8 @@ def _transaction_async(context, callback, read_only=False): yield _datastore_api.rollback(transaction_id) raise + tx_context._clear_global_cache() + return result @@ -154,7 +156,7 @@ def transactional_async( retries=_retry._DEFAULT_RETRIES, read_only=False, xg=True, propagation=None ): """A decorator to run a function in an async transaction. - + Usage example: @transactional_async(retries=1, read_only=False) diff --git a/src/google/cloud/ndb/client.py b/src/google/cloud/ndb/client.py index 0f6ce268..5b195a48 100644 --- a/src/google/cloud/ndb/client.py +++ b/src/google/cloud/ndb/client.py @@ -108,7 +108,13 @@ def __init__(self, project=None, namespace=None, credentials=None): ) @contextlib.contextmanager - def context(self, cache_policy=None): + def context( + self, + cache_policy=None, + global_cache=None, + global_cache_policy=None, + global_cache_timeout_policy=None, + ): """Establish a context for a set of NDB calls. This method provides a context manager which establishes the runtime @@ -142,8 +148,23 @@ def context(self, cache_policy=None): cache_policy (Optional[Callable[[key.Key], bool]]): The cache policy to use in this context. See: :meth:`~google.cloud.ndb.context.Context.set_cache_policy`. + global_cache (Optional[global_cache.GlobalCache]): + The global cache for this context. See: + :class:`~google.cloud.ndb.global_cache.GlobalCache`. + global_cache_policy (Optional[Callable[[key.Key], bool]]): The + global cache policy to use in this context. See: + :meth:`~google.cloud.ndb.context.Context.set_global_cache_policy`. + global_cache_timeout_policy (Optional[Callable[[key.Key], int]]): + The global cache timeout to use in this context. See: + :meth:`~google.cloud.ndb.context.Context.set_global_cache_timeout_policy`. """ - context = context_module.Context(self, cache_policy=cache_policy) + context = context_module.Context( + self, + cache_policy=cache_policy, + global_cache=global_cache, + global_cache_policy=global_cache_policy, + global_cache_timeout_policy=global_cache_timeout_policy, + ) with context.use(): yield context diff --git a/src/google/cloud/ndb/context.py b/src/google/cloud/ndb/context.py index 740ed6b6..0e406b17 100644 --- a/src/google/cloud/ndb/context.py +++ b/src/google/cloud/ndb/context.py @@ -18,10 +18,12 @@ import contextlib import threading +from google.cloud.ndb import _cache from google.cloud.ndb import _datastore_api from google.cloud.ndb import _eventloop from google.cloud.ndb import exceptions from google.cloud.ndb import model +from google.cloud.ndb import tasklets __all__ = [ @@ -63,25 +65,6 @@ def get_context(): raise exceptions.ContextError() -class _Cache(collections.UserDict): - """An in-memory entity cache. - - This cache verifies the fetched entity has the correct key before - returning a result, in order to handle cases where the entity's key was - modified but the cache's key was not updated.""" - - def get_and_validate(self, key): - """Verify that the entity's key has not changed since it was added - to the cache. If it has changed, consider this a cache miss. - See issue 13. http://goo.gl/jxjOP""" - entity = self.data[key] # May be None, meaning "doesn't exist". - if entity is None or entity._key == key: - return entity - else: - del self.data[key] - raise KeyError(key) - - def _default_cache_policy(key): """The default cache policy. @@ -103,6 +86,47 @@ def _default_cache_policy(key): return flag +def _default_global_cache_policy(key): + """The default global cache policy. + + Defers to ``_use_global_cache`` on the Model class for the key's kind. + See: :meth:`~google.cloud.ndb.context.Context.set_global_cache_policy` + """ + flag = None + if key is not None: + modelclass = model.Model._kind_map.get(key.kind) + if modelclass is not None: + policy = getattr(modelclass, "_use_global_cache", None) + if policy is not None: + if isinstance(policy, bool): + flag = policy + else: + flag = policy(key) + + return flag + + +def _default_global_cache_timeout_policy(key): + """The default global cache timeout policy. + + Defers to ``_global_cache_timeout`` on the Model class for the key's kind. + See: + :meth:`~google.cloud.ndb.context.Context.set_global_cache_timeout_policy` + """ + timeout = None + if key is not None: + modelclass = model.Model._kind_map.get(key.kind) + if modelclass is not None: + policy = getattr(modelclass, "_global_cache_timeout", None) + if policy is not None: + if isinstance(policy, int): + timeout = policy + else: + timeout = policy(key) + + return timeout + + _ContextTuple = collections.namedtuple( "_ContextTuple", [ @@ -113,6 +137,7 @@ def _default_cache_policy(key): "commit_batches", "transaction", "cache", + "global_cache", ], ) @@ -144,6 +169,9 @@ def __new__( transaction=None, cache=None, cache_policy=None, + global_cache=None, + global_cache_policy=None, + global_cache_timeout_policy=None, ): if eventloop is None: eventloop = _eventloop.EventLoop() @@ -159,12 +187,9 @@ def __new__( # Create a cache and, if an existing cache was passed into this # method, duplicate its entries. + new_cache = _cache.ContextCache() if cache: - new_cache = _Cache() new_cache.update(cache) - cache = new_cache - else: - cache = _Cache() context = super(_Context, cls).__new__( cls, @@ -174,10 +199,13 @@ def __new__( batches=batches, commit_batches=commit_batches, transaction=transaction, - cache=cache, + cache=new_cache, + global_cache=global_cache, ) context.set_cache_policy(cache_policy) + context.set_global_cache_policy(global_cache_policy) + context.set_global_cache_timeout_policy(global_cache_timeout_policy) return context @@ -187,7 +215,8 @@ def new(self, **kwargs): New context will be the same as context except values from ``kwargs`` will be substituted. """ - state = {name: getattr(self, name) for name in self._fields} + fields = self._fields + tuple(self.__dict__.keys()) + state = {name: getattr(self, name) for name in fields} state.update(kwargs) return type(self)(**state) @@ -208,6 +237,52 @@ def use(self): prev_context.cache.update(self.cache) _state.context = prev_context + @tasklets.tasklet + def _clear_global_cache(self): + """Clears the global cache. + + Clears keys from the global cache that appear in the local context + cache. In this way, only keys that were touched in the current context + are affected. + """ + keys = [ + _cache.global_cache_key(key._key) + for key in self.cache + if self._use_global_cache(key) + ] + if keys: + yield [_cache.global_delete(key) for key in keys] + + def _use_cache(self, key, options): + """Return whether to use the context cache for this key.""" + flag = options.use_cache + if flag is None: + flag = self.cache_policy(key) + if flag is None: + flag = True + return flag + + def _use_global_cache(self, key, options=None): + """Return whether to use the global cache for this key.""" + if self.global_cache is None: + return False + + flag = options.use_global_cache if options else None + if flag is None: + flag = self.global_cache_policy(key) + if flag is None: + flag = True + return flag + + def _global_cache_timeout(self, key, options): + """Return global cache timeout (expiration) for this key.""" + timeout = None + if options: + timeout = options.global_cache_timeout + if timeout is None: + timeout = self.global_cache_timeout_policy(key) + return timeout + class Context(_Context): """User management of cache and other policy.""" @@ -215,7 +290,7 @@ class Context(_Context): def clear_cache(self): """Clears the in-memory cache. - This does not affect memcache. + This does not affect global cache. """ self.cache.clear() @@ -245,7 +320,7 @@ def get_datastore_policy(self): """ raise NotImplementedError - def get_memcache_policy(self): + def get_global_cache_policy(self): """Return the current memcache policy function. Returns: @@ -254,9 +329,11 @@ def get_memcache_policy(self): positional argument and returns a ``bool`` indicating if it should be cached. May be :data:`None`. """ - raise NotImplementedError + return self.global_cache_policy - def get_memcache_timeout_policy(self): + get_memcache_policy = get_global_cache_policy # backwards compatability + + def get_global_cache_timeout_policy(self): """Return the current policy function memcache timeout (expiration). Returns: @@ -266,7 +343,9 @@ def get_memcache_timeout_policy(self): timeout, in seconds, for the key. ``0`` implies the default timeout. May be :data:`None`. """ - raise NotImplementedError + return self.global_cache_timeout_policy + + get_memcache_timeout_policy = get_global_cache_timeout_policy def set_cache_policy(self, policy): """Set the context cache policy function. @@ -299,7 +378,7 @@ def set_datastore_policy(self, policy): """ raise NotImplementedError - def set_memcache_policy(self, policy): + def set_global_cache_policy(self, policy): """Set the memcache policy function. Args: @@ -308,9 +387,20 @@ def set_memcache_policy(self, policy): positional argument and returns a ``bool`` indicating if it should be cached. May be :data:`None`. """ - raise NotImplementedError + if policy is None: + policy = _default_global_cache_policy + + elif isinstance(policy, bool): + flag = policy + + def policy(key): + return flag + + self.global_cache_policy = policy - def set_memcache_timeout_policy(self, policy): + set_memcache_policy = set_global_cache_policy # backwards compatibility + + def set_global_cache_timeout_policy(self, policy): """Set the policy function for memcache timeout (expiration). Args: @@ -320,7 +410,18 @@ def set_memcache_timeout_policy(self, policy): timeout, in seconds, for the key. ``0`` implies the default timeout. May be :data:`None`. """ - raise NotImplementedError + if policy is None: + policy = _default_global_cache_timeout_policy + + elif isinstance(policy, int): + timeout = policy + + def policy(key): + return timeout + + self.global_cache_timeout_policy = policy + + set_memcache_timeout_policy = set_global_cache_timeout_policy def call_on_commit(self, callback): """Call a callback upon successful commit of a transaction. @@ -367,83 +468,45 @@ def default_datastore_policy(key): """ raise NotImplementedError - @staticmethod - def default_memcache_policy(key): - """Default memcache policy. - - This defers to ``Model._use_memcache``. - - Args: - key (google.cloud.ndb.key.Key): The key. - - Returns: - Union[bool, None]: Whether to cache the key. - """ - raise NotImplementedError - - @staticmethod - def default_memcache_timeout_policy(key): - """Default memcache timeout policy. - - This defers to ``Model._memcache_timeout``. - - Args: - key (google.cloud.ndb.key.Key): The key. - - Returns: - Union[int, None]: Memcache timeout to use. - """ - raise NotImplementedError - def memcache_add(self, *args, **kwargs): """Direct pass-through to memcache client.""" - raise NotImplementedError + raise exceptions.NoLongerImplementedError() def memcache_cas(self, *args, **kwargs): """Direct pass-through to memcache client.""" - - raise NotImplementedError + raise exceptions.NoLongerImplementedError() def memcache_decr(self, *args, **kwargs): """Direct pass-through to memcache client.""" - raise NotImplementedError + raise exceptions.NoLongerImplementedError() def memcache_delete(self, *args, **kwargs): """Direct pass-through to memcache client.""" - raise NotImplementedError + raise exceptions.NoLongerImplementedError() def memcache_get(self, *args, **kwargs): """Direct pass-through to memcache client.""" - raise NotImplementedError + raise exceptions.NoLongerImplementedError() def memcache_gets(self, *args, **kwargs): """Direct pass-through to memcache client.""" - raise NotImplementedError + raise exceptions.NoLongerImplementedError() def memcache_incr(self, *args, **kwargs): """Direct pass-through to memcache client.""" - raise NotImplementedError + raise exceptions.NoLongerImplementedError() def memcache_replace(self, *args, **kwargs): """Direct pass-through to memcache client.""" - raise NotImplementedError + raise exceptions.NoLongerImplementedError() def memcache_set(self, *args, **kwargs): """Direct pass-through to memcache client.""" - raise NotImplementedError + raise exceptions.NoLongerImplementedError() def urlfetch(self, *args, **kwargs): """Fetch a resource using HTTP.""" - raise NotImplementedError - - def _use_cache(self, key, options): - """Return whether to use the context cache for this key.""" - flag = options.use_cache - if flag is None: - flag = self.cache_policy(key) - if flag is None: - flag = True - return flag + raise exceptions.NoLongerImplementedError() class ContextOptions: diff --git a/src/google/cloud/ndb/global_cache.py b/src/google/cloud/ndb/global_cache.py new file mode 100644 index 00000000..987b35b8 --- /dev/null +++ b/src/google/cloud/ndb/global_cache.py @@ -0,0 +1,162 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import time + +"""GlobalCache interface and its implementations.""" + + +class GlobalCache(abc.ABC): + """Abstract base class for a global entity cache. + + A global entity cache is shared across contexts, sessions, and possibly + even servers. A concrete implementation is available which uses Redis. + + Essentially, this class models a simple key/value store where keys and + values are arbitrary ``bytes`` instances. "Compare and swap", aka + "optimistic transactions" should also be supported. + + Concrete implementations can either by synchronous or asynchronous. + Asynchronous implementations should return + :class:`~google.cloud.ndb.tasklets.Future` instances whose eventual results + match the return value described for each method. Because coordinating with + the single threaded event model used by ``NDB`` can be tricky with remote + services, it's not recommended that casual users write asynchronous + implementations, as some specialized knowledge is required. + """ + + @abc.abstractmethod + def get(self, keys): + """Retrieve entities from the cache. + + Arguments: + keys (List[bytes]): The keys to get. + + Returns: + List[Union[bytes, None]]]: Serialized entities, or :data:`None`, + for each key. + """ + raise NotImplementedError + + @abc.abstractmethod + def set(self, items, expires=None): + """Store entities in the cache. + + Arguments: + items (Dict[bytes, Union[bytes, None]]): Mapping of keys to + serialized entities. + expires (Optional[float]): Number of seconds until value expires. + """ + raise NotImplementedError + + @abc.abstractmethod + def delete(self, keys): + """Remove entities from the cache. + + Arguments: + keys (List[bytes]): The keys to remove. + """ + raise NotImplementedError + + @abc.abstractmethod + def watch(self, keys): + """Begin an optimistic transaction for the given keys. + + A future call to :meth:`compare_and_swap` will only set values for keys + whose values haven't changed since the call to this method. + + Arguments: + keys (List[bytes]): The keys to watch. + """ + raise NotImplementedError + + @abc.abstractmethod + def compare_and_swap(self, items, expires=None): + """Like :meth:`set` but using an optimistic transaction. + + Only keys whose values haven't changed since a preceding call to + :meth:`watch` will be changed. + + Arguments: + items (Dict[bytes, Union[bytes, None]]): Mapping of keys to + serialized entities. + expires (Optional[float]): Number of seconds until value expires. + """ + raise NotImplementedError + + +class _InProcessGlobalCache(GlobalCache): + """Reference implementation of :class:`GlobalCache`. + + Not intended for production use. Uses a single process wide dictionary to + keep an in memory cache. For use in testing and to have an easily grokkable + reference implementation. Thread safety is potentially a little sketchy. + """ + + cache = {} + """Dict: The cache. + + Relies on atomicity of ``__setitem__`` for thread safety. See: + http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm + """ + + def __init__(self): + self._watch_keys = {} + + def get(self, keys): + """Implements :meth:`GlobalCache.get`.""" + now = time.time() + results = [self.cache.get(key) for key in keys] + entity_pbs = [] + for result in results: + if result is not None: + entity_pb, expires = result + if expires and expires < now: + entity_pb = None + else: + entity_pb = None + + entity_pbs.append(entity_pb) + + return entity_pbs + + def set(self, items, expires=None): + """Implements :meth:`GlobalCache.set`.""" + if expires: + expires = time.time() + expires + + for key, value in items.items(): + self.cache[key] = (value, expires) # Supposedly threadsafe + + def delete(self, keys): + """Implements :meth:`GlobalCache.delete`.""" + for key in keys: + self.cache.pop(key, None) # Threadsafe? + + def watch(self, keys): + """Implements :meth:`GlobalCache.watch`.""" + for key in keys: + self._watch_keys[key] = self.cache.get(key) + + def compare_and_swap(self, items, expires=None): + """Implements :meth:`GlobalCache.compare_and_swap`.""" + if expires: + expires = time.time() + expires + + for key, new_value in items.items(): + watch_value = self._watch_keys.get(key) + current_value = self.cache.get(key) + if watch_value == current_value: + self.cache[key] = (new_value, expires) diff --git a/src/google/cloud/ndb/metadata.py b/src/google/cloud/ndb/metadata.py index 7b5b1cb7..43bbafbc 100644 --- a/src/google/cloud/ndb/metadata.py +++ b/src/google/cloud/ndb/metadata.py @@ -62,7 +62,7 @@ class _BaseMetadata(model.Model): __slots__ = () _use_cache = False - _use_memcache = False + _use_global_cache = False KIND_NAME = "" diff --git a/src/google/cloud/ndb/model.py b/src/google/cloud/ndb/model.py index 25d8fd8f..e2ba3de2 100644 --- a/src/google/cloud/ndb/model.py +++ b/src/google/cloud/ndb/model.py @@ -4856,15 +4856,14 @@ def _put_async( @tasklets.tasklet def put(self): - entity_pb = _entity_to_protobuf(self) - key_pb = yield _datastore_api.put(entity_pb, _options) - if key_pb: - ds_key = helpers.key_from_protobuf(key_pb) + ds_entity = _entity_to_ds_entity(self) + ds_key = yield _datastore_api.put(ds_entity, _options) + if ds_key: self._key = key_module.Key._from_ds_key(ds_key) - context = context_module.get_context() - if context._use_cache(self._key, _options): - context.cache[self._key] = self + context = context_module.get_context() + if context._use_cache(self._key, _options): + context.cache[self._key] = self return self._key diff --git a/src/google/cloud/ndb/tasklets.py b/src/google/cloud/ndb/tasklets.py index ff5313ee..b6380c28 100644 --- a/src/google/cloud/ndb/tasklets.py +++ b/src/google/cloud/ndb/tasklets.py @@ -285,7 +285,9 @@ def _advance_tasklet(self, send_value=None, error=None): with self.context.use(): # Send the next value or exception into the generator if error: - self.generator.throw(type(error), error) + self.generator.throw( + type(error), error, error.__traceback__ + ) # send_value will be None if this is the first time yielded = self.generator.send(send_value) diff --git a/tests/conftest.py b/tests/conftest.py index 82c3e850..f4f9a5b1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,6 +25,7 @@ from google.cloud import environment_vars from google.cloud.ndb import context as context_module from google.cloud.ndb import _eventloop +from google.cloud.ndb import global_cache as global_cache_module from google.cloud.ndb import model import pytest @@ -51,6 +52,7 @@ def reset_state(environ): yield model.Property._FIND_METHODS_CACHE.clear() model.Model._kind_map.clear() + global_cache_module._InProcessGlobalCache.cache.clear() @pytest.fixture @@ -95,3 +97,14 @@ def in_context(context): with context.use(): yield context assert not context_module._state.context + + +@pytest.fixture +def global_cache(context): + assert not context_module._state.context + + cache = global_cache_module._InProcessGlobalCache() + with context.new(global_cache=cache).use(): + yield cache + + assert not context_module._state.context diff --git a/tests/system/test_crud.py b/tests/system/test_crud.py index 9e524cda..0816b62e 100644 --- a/tests/system/test_crud.py +++ b/tests/system/test_crud.py @@ -20,11 +20,15 @@ import operator import threading +from unittest import mock + import pytest import test_utils.system from google.cloud import ndb +from google.cloud.ndb import _cache +from google.cloud.ndb import global_cache as global_cache_module from tests.system import KIND, eventually @@ -72,6 +76,40 @@ class SomeKind(ndb.Model): assert key.get() is entity +def test_retrieve_entity_with_global_cache(ds_entity, client_context): + entity_id = test_utils.system.unique_resource_id() + ds_entity(KIND, entity_id, foo=42, bar="none", baz=b"night") + + class SomeKind(ndb.Model): + foo = ndb.IntegerProperty() + bar = ndb.StringProperty() + baz = ndb.StringProperty() + + global_cache = global_cache_module._InProcessGlobalCache() + cache_dict = global_cache_module._InProcessGlobalCache.cache + with client_context.new(global_cache=global_cache).use() as context: + context.set_global_cache_policy(None) # Use default + + key = ndb.Key(KIND, entity_id) + entity = key.get() + assert isinstance(entity, SomeKind) + assert entity.foo == 42 + assert entity.bar == "none" + assert entity.baz == "night" + + cache_key = _cache.global_cache_key(key._key) + assert cache_key in cache_dict + + patch = mock.patch("google.cloud.ndb._datastore_api._LookupBatch.add") + patch.side_effect = Exception("Shouldn't call this") + with patch: + entity = key.get() + assert isinstance(entity, SomeKind) + assert entity.foo == 42 + assert entity.bar == "none" + assert entity.baz == "night" + + @pytest.mark.usefixtures("client_context") def test_retrieve_entity_not_found(ds_entity): entity_id = test_utils.system.unique_resource_id() @@ -247,6 +285,37 @@ class SomeKind(ndb.Model): assert retrieved.bar == "none" +def test_insert_entity_with_global_cache(dispose_of, client_context): + class SomeKind(ndb.Model): + foo = ndb.IntegerProperty() + bar = ndb.StringProperty() + + global_cache = global_cache_module._InProcessGlobalCache() + cache_dict = global_cache_module._InProcessGlobalCache.cache + with client_context.new(global_cache=global_cache).use() as context: + context.set_global_cache_policy(None) # Use default + + entity = SomeKind(foo=42, bar="none") + key = entity.put() + cache_key = _cache.global_cache_key(key._key) + assert not cache_dict + + retrieved = key.get() + assert retrieved.foo == 42 + assert retrieved.bar == "none" + + assert cache_key in cache_dict + + entity.foo = 43 + entity.put() + + # This is py27 behavior. I can see a case being made for caching the + # entity on write rather than waiting for a subsequent lookup. + assert cache_key not in cache_dict + + dispose_of(key._key) + + @pytest.mark.usefixtures("client_context") def test_update_entity(ds_entity): entity_id = test_utils.system.unique_resource_id() @@ -359,6 +428,31 @@ class SomeKind(ndb.Model): assert key.delete() is None +def test_delete_entity_with_global_cache(ds_entity, client_context): + entity_id = test_utils.system.unique_resource_id() + ds_entity(KIND, entity_id, foo=42) + + class SomeKind(ndb.Model): + foo = ndb.IntegerProperty() + + key = ndb.Key(KIND, entity_id) + cache_key = _cache.global_cache_key(key._key) + global_cache = global_cache_module._InProcessGlobalCache() + cache_dict = global_cache_module._InProcessGlobalCache.cache + + with client_context.new(global_cache=global_cache).use(): + assert key.get().foo == 42 + assert cache_key in cache_dict + + assert key.delete() is None + assert cache_key not in cache_dict + + # This is py27 behavior. Not entirely sold on leaving _LOCKED value for + # Datastore misses. + assert key.get() is None + assert cache_dict[cache_key][0] == b"0" + + @pytest.mark.usefixtures("client_context") def test_delete_entity_in_transaction(ds_entity): entity_id = test_utils.system.unique_resource_id() diff --git a/tests/unit/test__batch.py b/tests/unit/test__batch.py new file mode 100644 index 00000000..67a8460e --- /dev/null +++ b/tests/unit/test__batch.py @@ -0,0 +1,46 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from google.cloud.ndb import _batch +from google.cloud.ndb import _eventloop + + +@pytest.mark.usefixtures("in_context") +class Test_get_batch: + def test_it(self): + options = {"foo": "bar"} + batch = _batch.get_batch(MockBatch, options) + assert batch.options is options + assert not batch.idle_called + + different_options = {"food": "barn"} + assert _batch.get_batch(MockBatch, different_options) is not batch + + assert _batch.get_batch(MockBatch) is not batch + + assert _batch.get_batch(MockBatch, options) is batch + + _eventloop.run() + assert batch.idle_called + + +class MockBatch: + def __init__(self, options): + self.options = options + self.idle_called = False + + def idle_callback(self): + self.idle_called = True diff --git a/tests/unit/test__cache.py b/tests/unit/test__cache.py new file mode 100644 index 00000000..7d891bf5 --- /dev/null +++ b/tests/unit/test__cache.py @@ -0,0 +1,363 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +import pytest + +from google.cloud.ndb import _cache +from google.cloud.ndb import tasklets + + +def future_result(result): + future = tasklets.Future() + future.set_result(result) + return future + + +class TestContextCache: + @staticmethod + def test_get_and_validate_valid(): + cache = _cache.ContextCache() + test_entity = mock.Mock(_key="test") + cache["test"] = test_entity + assert cache.get_and_validate("test") is test_entity + + @staticmethod + def test_get_and_validate_invalid(): + cache = _cache.ContextCache() + test_entity = mock.Mock(_key="test") + cache["test"] = test_entity + test_entity._key = "changed_key" + with pytest.raises(KeyError): + cache.get_and_validate("test") + + @staticmethod + def test_get_and_validate_none(): + cache = _cache.ContextCache() + cache["test"] = None + assert cache.get_and_validate("test") is None + + @staticmethod + def test_get_and_validate_miss(): + cache = _cache.ContextCache() + with pytest.raises(KeyError): + cache.get_and_validate("nonexistent_key") + + +class Test_GlobalCacheBatch: + @staticmethod + def test_make_call(): + batch = _cache._GlobalCacheBatch() + with pytest.raises(NotImplementedError): + batch.make_call() + + @staticmethod + def test_future_info(): + batch = _cache._GlobalCacheBatch() + with pytest.raises(NotImplementedError): + batch.future_info(None) + + +@mock.patch("google.cloud.ndb._cache._batch") +def test_global_get(_batch): + batch = _batch.get_batch.return_value + assert _cache.global_get(b"foo") is batch.add.return_value + _batch.get_batch.assert_called_once_with(_cache._GlobalCacheGetBatch) + batch.add.assert_called_once_with(b"foo") + + +class Test_GlobalCacheGetBatch: + @staticmethod + def test_add_and_idle_and_done_callbacks(in_context): + cache = mock.Mock() + cache.get.return_value = future_result([b"one", b"two"]) + + batch = _cache._GlobalCacheGetBatch(None) + future1 = batch.add(b"foo") + future2 = batch.add(b"bar") + future3 = batch.add(b"foo") + + assert set(batch.todo.keys()) == {b"foo", b"bar"} + assert batch.keys == [b"foo", b"bar"] + + with in_context.new(global_cache=cache).use(): + batch.idle_callback() + + cache.get.assert_called_once_with(batch.todo.keys()) + assert future1.result() == b"one" + assert future2.result() == b"two" + assert future3.result() == b"one" + + @staticmethod + def test_add_and_idle_and_done_callbacks_synchronous(in_context): + cache = mock.Mock() + cache.get.return_value = [b"one", b"two"] + + batch = _cache._GlobalCacheGetBatch(None) + future1 = batch.add(b"foo") + future2 = batch.add(b"bar") + + assert set(batch.todo.keys()) == {b"foo", b"bar"} + assert batch.keys == [b"foo", b"bar"] + + with in_context.new(global_cache=cache).use(): + batch.idle_callback() + + cache.get.assert_called_once_with(batch.todo.keys()) + assert future1.result() == b"one" + assert future2.result() == b"two" + + @staticmethod + def test_add_and_idle_and_done_callbacks_w_error(in_context): + error = Exception("spurious error") + cache = mock.Mock() + cache.get.return_value = tasklets.Future() + cache.get.return_value.set_exception(error) + + batch = _cache._GlobalCacheGetBatch(None) + future1 = batch.add(b"foo") + future2 = batch.add(b"bar") + + assert set(batch.todo.keys()) == {b"foo", b"bar"} + assert batch.keys == [b"foo", b"bar"] + + with in_context.new(global_cache=cache).use(): + batch.idle_callback() + + cache.get.assert_called_once_with(batch.todo.keys()) + assert future1.exception() is error + assert future2.exception() is error + + +class Test_global_set: + @staticmethod + @mock.patch("google.cloud.ndb._cache._batch") + def test_without_expires(_batch): + batch = _batch.get_batch.return_value + assert _cache.global_set(b"key", b"value") is batch.add.return_value + _batch.get_batch.assert_called_once_with( + _cache._GlobalCacheSetBatch, {} + ) + batch.add.assert_called_once_with(b"key", b"value") + + @staticmethod + @mock.patch("google.cloud.ndb._cache._batch") + def test_with_expires(_batch): + batch = _batch.get_batch.return_value + future = _cache.global_set(b"key", b"value", expires=5) + assert future is batch.add.return_value + _batch.get_batch.assert_called_once_with( + _cache._GlobalCacheSetBatch, {"expires": 5} + ) + batch.add.assert_called_once_with(b"key", b"value") + + +class Test_GlobalCacheSetBatch: + @staticmethod + def test_add_and_idle_and_done_callbacks(in_context): + cache = mock.Mock() + + batch = _cache._GlobalCacheSetBatch({}) + future1 = batch.add(b"foo", b"one") + future2 = batch.add(b"bar", b"two") + + assert batch.expires is None + + with in_context.new(global_cache=cache).use(): + batch.idle_callback() + + cache.set.assert_called_once_with( + {b"foo": b"one", b"bar": b"two"}, expires=None + ) + assert future1.result() is None + assert future2.result() is None + + @staticmethod + def test_add_and_idle_and_done_callbacks_with_expires(in_context): + cache = mock.Mock() + + batch = _cache._GlobalCacheSetBatch({"expires": 5}) + future1 = batch.add(b"foo", b"one") + future2 = batch.add(b"bar", b"two") + + assert batch.expires == 5 + + with in_context.new(global_cache=cache).use(): + batch.idle_callback() + + cache.set.assert_called_once_with( + {b"foo": b"one", b"bar": b"two"}, expires=5 + ) + assert future1.result() is None + assert future2.result() is None + + @staticmethod + def test_add_and_idle_and_done_callbacks_w_error(in_context): + error = Exception("spurious error") + cache = mock.Mock() + cache.set.return_value = tasklets.Future() + cache.set.return_value.set_exception(error) + + batch = _cache._GlobalCacheSetBatch({}) + future1 = batch.add(b"foo", b"one") + future2 = batch.add(b"bar", b"two") + + with in_context.new(global_cache=cache).use(): + batch.idle_callback() + + cache.set.assert_called_once_with( + {b"foo": b"one", b"bar": b"two"}, expires=None + ) + assert future1.exception() is error + assert future2.exception() is error + + +@mock.patch("google.cloud.ndb._cache._batch") +def test_global_delete(_batch): + batch = _batch.get_batch.return_value + assert _cache.global_delete(b"key") is batch.add.return_value + _batch.get_batch.assert_called_once_with(_cache._GlobalCacheDeleteBatch) + batch.add.assert_called_once_with(b"key") + + +class Test_GlobalCacheDeleteBatch: + @staticmethod + def test_add_and_idle_and_done_callbacks(in_context): + cache = mock.Mock() + + batch = _cache._GlobalCacheDeleteBatch({}) + future1 = batch.add(b"foo") + future2 = batch.add(b"bar") + + with in_context.new(global_cache=cache).use(): + batch.idle_callback() + + cache.delete.assert_called_once_with([b"foo", b"bar"]) + assert future1.result() is None + assert future2.result() is None + + +@mock.patch("google.cloud.ndb._cache._batch") +def test_global_watch(_batch): + batch = _batch.get_batch.return_value + assert _cache.global_watch(b"key") is batch.add.return_value + _batch.get_batch.assert_called_once_with(_cache._GlobalCacheWatchBatch) + batch.add.assert_called_once_with(b"key") + + +class Test_GlobalCacheWatchBatch: + @staticmethod + def test_add_and_idle_and_done_callbacks(in_context): + cache = mock.Mock() + + batch = _cache._GlobalCacheWatchBatch({}) + future1 = batch.add(b"foo") + future2 = batch.add(b"bar") + + with in_context.new(global_cache=cache).use(): + batch.idle_callback() + + cache.watch.assert_called_once_with([b"foo", b"bar"]) + assert future1.result() is None + assert future2.result() is None + + +class Test_global_compare_and_swap: + @staticmethod + @mock.patch("google.cloud.ndb._cache._batch") + def test_without_expires(_batch): + batch = _batch.get_batch.return_value + assert ( + _cache.global_compare_and_swap(b"key", b"value") + is batch.add.return_value + ) + _batch.get_batch.assert_called_once_with( + _cache._GlobalCacheCompareAndSwapBatch, {} + ) + batch.add.assert_called_once_with(b"key", b"value") + + @staticmethod + @mock.patch("google.cloud.ndb._cache._batch") + def test_with_expires(_batch): + batch = _batch.get_batch.return_value + future = _cache.global_compare_and_swap(b"key", b"value", expires=5) + assert future is batch.add.return_value + _batch.get_batch.assert_called_once_with( + _cache._GlobalCacheCompareAndSwapBatch, {"expires": 5} + ) + batch.add.assert_called_once_with(b"key", b"value") + + +class Test_GlobalCacheCompareAndSwapBatch: + @staticmethod + def test_add_and_idle_and_done_callbacks(in_context): + cache = mock.Mock() + + batch = _cache._GlobalCacheCompareAndSwapBatch({}) + future1 = batch.add(b"foo", b"one") + future2 = batch.add(b"bar", b"two") + + assert batch.expires is None + + with in_context.new(global_cache=cache).use(): + batch.idle_callback() + + cache.compare_and_swap.assert_called_once_with( + {b"foo": b"one", b"bar": b"two"}, expires=None + ) + assert future1.result() is None + assert future2.result() is None + + @staticmethod + def test_add_and_idle_and_done_callbacks_with_expires(in_context): + cache = mock.Mock() + + batch = _cache._GlobalCacheCompareAndSwapBatch({"expires": 5}) + future1 = batch.add(b"foo", b"one") + future2 = batch.add(b"bar", b"two") + + assert batch.expires == 5 + + with in_context.new(global_cache=cache).use(): + batch.idle_callback() + + cache.compare_and_swap.assert_called_once_with( + {b"foo": b"one", b"bar": b"two"}, expires=5 + ) + assert future1.result() is None + assert future2.result() is None + + +@mock.patch("google.cloud.ndb._cache._batch") +def test_global_lock(_batch): + batch = _batch.get_batch.return_value + assert _cache.global_lock(b"key") is batch.add.return_value + _batch.get_batch.assert_called_once_with( + _cache._GlobalCacheSetBatch, {"expires": _cache._LOCK_TIME} + ) + batch.add.assert_called_once_with(b"key", _cache._LOCKED) + + +def test_is_locked_value(): + assert _cache.is_locked_value(_cache._LOCKED) + assert not _cache.is_locked_value("new db, who dis?") + + +def test_global_cache_key(): + key = mock.Mock() + key.to_protobuf.return_value.SerializeToString.return_value = b"himom!" + assert _cache.global_cache_key(key) == _cache._PREFIX + b"himom!" + key.to_protobuf.assert_called_once_with() + key.to_protobuf.return_value.SerializeToString.assert_called_once_with() diff --git a/tests/unit/test__datastore_api.py b/tests/unit/test__datastore_api.py index aeb9d8be..ee077d93 100644 --- a/tests/unit/test__datastore_api.py +++ b/tests/unit/test__datastore_api.py @@ -17,17 +17,29 @@ import pytest from google.cloud import _http +from google.cloud.datastore import entity +from google.cloud.datastore import helpers +from google.cloud.datastore import key as ds_key_module from google.cloud.datastore_v1.proto import datastore_pb2 from google.cloud.datastore_v1.proto import entity_pb2 +from google.cloud.ndb import _batch +from google.cloud.ndb import _cache from google.cloud.ndb import context as context_module from google.cloud.ndb import _datastore_api as _api from google.cloud.ndb import key as key_module +from google.cloud.ndb import model from google.cloud.ndb import _options from google.cloud.ndb import tasklets from tests.unit import utils +def future_result(result): + future = tasklets.Future() + future.set_result(result) + return future + + class TestStub: @staticmethod @mock.patch("google.cloud.ndb._datastore_api._helpers") @@ -146,57 +158,125 @@ def _mock_key(key_str): return key -class TestLookup: +class Test_lookup: @staticmethod def test_it(context): eventloop = mock.Mock(spec=("add_idle", "run")) with context.new(eventloop=eventloop).use() as context: - future1 = _api.lookup(_mock_key("foo"), _options.ReadOptions()) - future2 = _api.lookup(_mock_key("foo"), _options.ReadOptions()) - future3 = _api.lookup(_mock_key("bar"), _options.ReadOptions()) + _api.lookup(_mock_key("foo"), _options.ReadOptions()) + _api.lookup(_mock_key("foo"), _options.ReadOptions()) + _api.lookup(_mock_key("bar"), _options.ReadOptions()) batch = context.batches[_api._LookupBatch][()] - assert batch.todo["foo"] == [future1, future2] - assert batch.todo["bar"] == [future3] + assert len(batch.todo["foo"]) == 2 + assert len(batch.todo["bar"]) == 1 assert context.eventloop.add_idle.call_count == 1 @staticmethod def test_it_with_options(context): eventloop = mock.Mock(spec=("add_idle", "run")) with context.new(eventloop=eventloop).use() as context: - future1 = _api.lookup(_mock_key("foo"), _options.ReadOptions()) - future2 = _api.lookup( + _api.lookup(_mock_key("foo"), _options.ReadOptions()) + _api.lookup( _mock_key("foo"), _options.ReadOptions(read_consistency=_api.EVENTUAL), ) - future3 = _api.lookup(_mock_key("bar"), _options.ReadOptions()) + _api.lookup(_mock_key("bar"), _options.ReadOptions()) batches = context.batches[_api._LookupBatch] batch1 = batches[()] - assert batch1.todo["foo"] == [future1] - assert batch1.todo["bar"] == [future3] + assert len(batch1.todo["foo"]) == 1 + assert len(batch1.todo["bar"]) == 1 batch2 = batches[(("read_consistency", _api.EVENTUAL),)] - assert batch2.todo == {"foo": [future2]} + assert len(batch2.todo) == 1 + assert len(batch2.todo["foo"]) == 1 add_idle = context.eventloop.add_idle assert add_idle.call_count == 2 + +class Test_lookup_WithGlobalCache: @staticmethod - def test_idle_callback(context): - eventloop = mock.Mock(spec=("add_idle", "run")) - with context.new(eventloop=eventloop).use() as context: - future = _api.lookup(_mock_key("foo"), _options.ReadOptions()) + @mock.patch("google.cloud.ndb._datastore_api._LookupBatch") + def test_cache_miss(_LookupBatch, global_cache): + class SomeKind(model.Model): + pass - batches = context.batches[_api._LookupBatch] - batch = batches[()] - assert batch.todo["foo"] == [future] + key = key_module.Key("SomeKind", 1) + cache_key = _cache.global_cache_key(key._key) + + entity = SomeKind(key=key) + entity_pb = model._entity_to_protobuf(entity) + cache_value = entity_pb.SerializeToString() + + batch = _LookupBatch.return_value + batch.add.return_value = future_result(entity_pb) + + future = _api.lookup(key._key, _options.ReadOptions()) + assert future.result() == entity_pb + + assert global_cache.get([cache_key]) == [cache_value] + + @staticmethod + @mock.patch("google.cloud.ndb._datastore_api._LookupBatch") + def test_cache_hit(_LookupBatch, global_cache): + class SomeKind(model.Model): + pass + + key = key_module.Key("SomeKind", 1) + cache_key = _cache.global_cache_key(key._key) + + entity = SomeKind(key=key) + entity_pb = model._entity_to_protobuf(entity) + cache_value = entity_pb.SerializeToString() + + global_cache.set({cache_key: cache_value}) + + batch = _LookupBatch.return_value + batch.add.side_effect = Exception("Shouldn't get called.") + + future = _api.lookup(key._key, _options.ReadOptions()) + assert future.result() == entity_pb + + @staticmethod + @mock.patch("google.cloud.ndb._datastore_api._LookupBatch") + def test_cache_locked(_LookupBatch, global_cache): + class SomeKind(model.Model): + pass + + key = key_module.Key("SomeKind", 1) + cache_key = _cache.global_cache_key(key._key) + + entity = SomeKind(key=key) + entity_pb = model._entity_to_protobuf(entity) - idle = context.eventloop.add_idle.call_args[0][0] - batch.idle_callback = mock.Mock() - idle() - batch.idle_callback.assert_called_once_with() - assert () not in batches + global_cache.set({cache_key: _cache._LOCKED}) + + batch = _LookupBatch.return_value + batch.add.return_value = future_result(entity_pb) + + future = _api.lookup(key._key, _options.ReadOptions()) + assert future.result() == entity_pb + + assert global_cache.get([cache_key]) == [_cache._LOCKED] + + @staticmethod + @mock.patch("google.cloud.ndb._datastore_api._LookupBatch") + def test_cache_not_found(_LookupBatch, global_cache): + class SomeKind(model.Model): + pass + + key = key_module.Key("SomeKind", 1) + cache_key = _cache.global_cache_key(key._key) + + batch = _LookupBatch.return_value + batch.add.return_value = future_result(_api._NOT_FOUND) + + future = _api.lookup(key._key, _options.ReadOptions()) + assert future.result() is _api._NOT_FOUND + + assert global_cache.get([cache_key]) == [_cache._LOCKED] class Test_LookupBatch: @@ -453,24 +533,29 @@ def __init__(self, upsert=None): self.upsert = upsert def __eq__(self, other): - return self.upsert is other.upsert + return self.upsert == other.upsert - eventloop = mock.Mock(spec=("add_idle", "run")) - with in_context.new(eventloop=eventloop).use() as context: - datastore_pb2.Mutation = Mutation + def MockEntity(*path): + key = ds_key_module.Key(*path, project="testing") + return entity.Entity(key=key) - entity1, entity2, entity3 = object(), object(), object() - future1 = _api.put(entity1, _options.Options()) - future2 = _api.put(entity2, _options.Options()) - future3 = _api.put(entity3, _options.Options()) + datastore_pb2.Mutation = Mutation - batch = context.batches[_api._NonTransactionalCommitBatch][()] - assert batch.mutations == [ - Mutation(upsert=entity1), - Mutation(upsert=entity2), - Mutation(upsert=entity3), - ] - assert batch.futures == [future1, future2, future3] + entity1 = MockEntity("a", "1") + _api.put(entity1, _options.Options()) + + entity2 = MockEntity("a") + _api.put(entity2, _options.Options()) + + entity3 = MockEntity("b") + _api.put(entity3, _options.Options()) + + batch = in_context.batches[_api._NonTransactionalCommitBatch][()] + assert batch.mutations == [ + Mutation(upsert=helpers.entity_to_protobuf(entity1)), + Mutation(upsert=helpers.entity_to_protobuf(entity2)), + Mutation(upsert=helpers.entity_to_protobuf(entity3)), + ] @staticmethod @mock.patch("google.cloud.ndb._datastore_api.datastore_pb2") @@ -480,45 +565,78 @@ def __init__(self, upsert=None): self.upsert = upsert def __eq__(self, other): - return self.upsert is other.upsert - - class PathElement: - id = None - - def __init__(self, name): - self.name = name + return self.upsert == other.upsert def MockEntity(*path): - path = [PathElement(name) for name in path] - return mock.Mock(key=mock.Mock(path=path)) + key = ds_key_module.Key(*path, project="testing") + return entity.Entity(key=key) - eventloop = mock.Mock(spec=("add_idle", "run")) - context = in_context.new(eventloop=eventloop, transaction=b"123") - with context.use() as context: + with in_context.new(transaction=b"123").use() as context: datastore_pb2.Mutation = Mutation entity1 = MockEntity("a", "1") - future1 = _api.put(entity1, _options.Options()) + _api.put(entity1, _options.Options()) - entity2 = MockEntity("a", None) - future2 = _api.put(entity2, _options.Options()) + entity2 = MockEntity("a") + _api.put(entity2, _options.Options()) - entity3 = MockEntity() - future3 = _api.put(entity3, _options.Options()) + entity3 = MockEntity("b") + _api.put(entity3, _options.Options()) batch = context.commit_batches[b"123"] assert batch.mutations == [ - Mutation(upsert=entity1), - Mutation(upsert=entity2), - Mutation(upsert=entity3), + Mutation(upsert=helpers.entity_to_protobuf(entity1)), + Mutation(upsert=helpers.entity_to_protobuf(entity2)), + Mutation(upsert=helpers.entity_to_protobuf(entity3)), ] - assert batch.futures == [future1, future2, future3] assert batch.transaction == b"123" assert batch.incomplete_mutations == [ - Mutation(upsert=entity2), - Mutation(upsert=entity3), + Mutation(upsert=helpers.entity_to_protobuf(entity2)), + Mutation(upsert=helpers.entity_to_protobuf(entity3)), ] - assert batch.incomplete_futures == [future2, future3] + + +class Test_put_WithGlobalCache: + @staticmethod + @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") + def test_no_key_returned(Batch, global_cache): + class SomeKind(model.Model): + pass + + key = key_module.Key("SomeKind", 1) + cache_key = _cache.global_cache_key(key._key) + + entity = SomeKind(key=key) + batch = Batch.return_value + batch.put.return_value = future_result(None) + + future = _api.put( + model._entity_to_ds_entity(entity), _options.Options() + ) + assert future.result() is None + + assert global_cache.get([cache_key]) == [None] + + @staticmethod + @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") + def test_key_returned(Batch, global_cache): + class SomeKind(model.Model): + pass + + key = key_module.Key("SomeKind", 1) + key_pb = key._key.to_protobuf() + cache_key = _cache.global_cache_key(key._key) + + entity = SomeKind(key=key) + batch = Batch.return_value + batch.put.return_value = future_result(key_pb) + + future = _api.put( + model._entity_to_ds_entity(entity), _options.Options() + ) + assert future.result() == key._key + + assert global_cache.get([cache_key]) == [None] class Test_delete: @@ -532,24 +650,21 @@ def __init__(self, delete=None): def __eq__(self, other): return self.delete == other.delete - eventloop = mock.Mock(spec=("add_idle", "run")) - with in_context.new(eventloop=eventloop).use() as context: - datastore_pb2.Mutation = Mutation + datastore_pb2.Mutation = Mutation - key1 = key_module.Key("SomeKind", 1)._key - key2 = key_module.Key("SomeKind", 2)._key - key3 = key_module.Key("SomeKind", 3)._key - future1 = _api.delete(key1, _options.Options()) - future2 = _api.delete(key2, _options.Options()) - future3 = _api.delete(key3, _options.Options()) + key1 = key_module.Key("SomeKind", 1)._key + key2 = key_module.Key("SomeKind", 2)._key + key3 = key_module.Key("SomeKind", 3)._key + _api.delete(key1, _options.Options()) + _api.delete(key2, _options.Options()) + _api.delete(key3, _options.Options()) - batch = context.batches[_api._NonTransactionalCommitBatch][()] - assert batch.mutations == [ - Mutation(delete=key1.to_protobuf()), - Mutation(delete=key2.to_protobuf()), - Mutation(delete=key3.to_protobuf()), - ] - assert batch.futures == [future1, future2, future3] + batch = in_context.batches[_api._NonTransactionalCommitBatch][()] + assert batch.mutations == [ + Mutation(delete=key1.to_protobuf()), + Mutation(delete=key2.to_protobuf()), + Mutation(delete=key3.to_protobuf()), + ] @staticmethod @mock.patch("google.cloud.ndb._datastore_api.datastore_pb2") @@ -570,9 +685,9 @@ def __eq__(self, other): key1 = key_module.Key("SomeKind", 1)._key key2 = key_module.Key("SomeKind", 2)._key key3 = key_module.Key("SomeKind", 3)._key - future1 = _api.delete(key1, _options.Options()) - future2 = _api.delete(key2, _options.Options()) - future3 = _api.delete(key3, _options.Options()) + _api.delete(key1, _options.Options()) + _api.delete(key2, _options.Options()) + _api.delete(key3, _options.Options()) batch = context.commit_batches[b"tx123"] assert batch.mutations == [ @@ -580,7 +695,38 @@ def __eq__(self, other): Mutation(delete=key2.to_protobuf()), Mutation(delete=key3.to_protobuf()), ] - assert batch.futures == [future1, future2, future3] + + +class Test_delete_WithGlobalCache: + @staticmethod + @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") + def test_cache_enabled(Batch, global_cache): + key = key_module.Key("SomeKind", 1) + cache_key = _cache.global_cache_key(key._key) + + batch = Batch.return_value + batch.delete.return_value = future_result(None) + + future = _api.delete(key._key, _options.Options()) + assert future.result() is None + + assert global_cache.get([cache_key]) == [None] + + @staticmethod + @mock.patch("google.cloud.ndb._datastore_api._NonTransactionalCommitBatch") + def test_cache_disabled(Batch, global_cache): + key = key_module.Key("SomeKind", 1) + cache_key = _cache.global_cache_key(key._key) + + batch = Batch.return_value + batch.delete.return_value = future_result(None) + + future = _api.delete( + key._key, _options.Options(use_global_cache=False) + ) + assert future.result() is None + + assert global_cache.get([cache_key]) == [None] class Test_NonTransactionalCommitBatch: @@ -926,7 +1072,7 @@ def test_w_transaction(stub, datastore_pb2): def test_allocate(): options = _options.Options() future = _api.allocate(["one", "two"], options) - batch = _api._get_batch(_api._AllocateIdsBatch, options) + batch = _batch.get_batch(_api._AllocateIdsBatch, options) assert batch.keys == ["one", "two"] assert batch.futures == future._dependencies @@ -1109,3 +1255,15 @@ def test__datastore_rollback(stub, datastore_pb2): request = datastore_pb2.RollbackRequest.return_value assert api.Rollback.future.called_once_with(request) + + +def test__complete(): + class MockElement: + def __init__(self, id=None, name=None): + self.id = id + self.name = name + + assert not _api._complete(mock.Mock(path=[])) + assert not _api._complete(mock.Mock(path=[MockElement()])) + assert _api._complete(mock.Mock(path=[MockElement(id=1)])) + assert _api._complete(mock.Mock(path=[MockElement(name="himom")])) diff --git a/tests/unit/test__options.py b/tests/unit/test__options.py index d8188bcd..36c676fb 100644 --- a/tests/unit/test__options.py +++ b/tests/unit/test__options.py @@ -40,8 +40,18 @@ def test_constructor_w_deadline_and_timeout(): @staticmethod def test_constructor_w_use_memcache(): - with pytest.raises(NotImplementedError): - MyOptions(use_memcache=20) + options = MyOptions(use_memcache=True) + assert options.use_global_cache is True + + @staticmethod + def test_constructor_w_use_global_cache(): + options = MyOptions(use_global_cache=True) + assert options.use_global_cache is True + + @staticmethod + def test_constructor_w_use_memcache_and_global_cache(): + with pytest.raises(TypeError): + MyOptions(use_global_cache=True, use_memcache=False) @staticmethod def test_constructor_w_use_datastore(): @@ -55,8 +65,18 @@ def test_constructor_w_use_cache(): @staticmethod def test_constructor_w_memcache_timeout(): - with pytest.raises(NotImplementedError): - MyOptions(memcache_timeout=20) + options = MyOptions(memcache_timeout=20) + assert options.global_cache_timeout == 20 + + @staticmethod + def test_constructor_w_global_cache_timeout(): + options = MyOptions(global_cache_timeout=20) + assert options.global_cache_timeout == 20 + + @staticmethod + def test_constructor_w_memcache_and_global_cache_timeout(): + with pytest.raises(TypeError): + MyOptions(memcache_timeout=20, global_cache_timeout=20) @staticmethod def test_constructor_w_max_memcache_items(): diff --git a/tests/unit/test_context.py b/tests/unit/test_context.py index ddda5b1a..7c9a7ee1 100644 --- a/tests/unit/test_context.py +++ b/tests/unit/test_context.py @@ -15,11 +15,14 @@ import pytest from unittest import mock +from google.cloud.ndb import _cache from google.cloud.ndb import context as context_module from google.cloud.ndb import _eventloop from google.cloud.ndb import exceptions +from google.cloud.ndb import global_cache from google.cloud.ndb import key as key_module from google.cloud.ndb import model +from google.cloud.ndb import _options import tests.unit.utils @@ -28,10 +31,12 @@ def test___all__(): class TestContext: - def _make_one(self): - client = mock.Mock(spec=()) + def _make_one(self, **kwargs): + client = mock.Mock( + namespace=None, project="testing", spec=("namespace", "project") + ) stub = mock.Mock(spec=()) - return context_module.Context(client, stub=stub) + return context_module.Context(client, stub=stub, **kwargs) @mock.patch("google.cloud.ndb._datastore_api.make_stub") def test_constructor_defaults(self, make_stub): @@ -63,6 +68,13 @@ def test_new_transaction(self): assert new_context.transaction == "tx123" assert context.transaction is None + def test_new_with_cache(self): + context = self._make_one() + context.cache["foo"] = "bar" + new_context = context.new() + assert context.cache is not new_context.cache + assert context.cache == new_context.cache + def test_use(self): context = self._make_one() with context.use(): @@ -70,12 +82,49 @@ def test_use(self): with pytest.raises(exceptions.ContextError): context_module.get_context() + def test_use_nested(self): + context = self._make_one() + with context.use(): + assert context_module.get_context() is context + next_context = context.new() + with next_context.use(): + assert context_module.get_context() is next_context + + assert context_module.get_context() is context + + with pytest.raises(exceptions.ContextError): + context_module.get_context() + def test_clear_cache(self): context = self._make_one() context.cache["testkey"] = "testdata" context.clear_cache() assert not context.cache + def test__clear_global_cache(self): + context = self._make_one( + global_cache=global_cache._InProcessGlobalCache() + ) + with context.use(): + key = key_module.Key("SomeKind", 1) + cache_key = _cache.global_cache_key(key._key) + context.cache[key] = "testdata" + context.global_cache.cache[cache_key] = "testdata" + context.global_cache.cache["anotherkey"] = "otherdata" + context._clear_global_cache().result() + + assert context.global_cache.cache == {"anotherkey": "otherdata"} + + def test__clear_global_cache_nothing_to_do(self): + context = self._make_one( + global_cache=global_cache._InProcessGlobalCache() + ) + with context.use(): + context.global_cache.cache["anotherkey"] = "otherdata" + context._clear_global_cache().result() + + assert context.global_cache.cache == {"anotherkey": "otherdata"} + def test_flush(self): context = self._make_one() with pytest.raises(NotImplementedError): @@ -94,13 +143,33 @@ def test_get_datastore_policy(self): def test_get_memcache_policy(self): context = self._make_one() - with pytest.raises(NotImplementedError): + context.get_memcache_policy() + assert ( context.get_memcache_policy() + is context_module._default_global_cache_policy + ) + + def test_get_global_cache_policy(self): + context = self._make_one() + context.get_global_cache_policy() + assert ( + context.get_memcache_policy() + is context_module._default_global_cache_policy + ) def test_get_memcache_timeout_policy(self): context = self._make_one() - with pytest.raises(NotImplementedError): + assert ( context.get_memcache_timeout_policy() + is context_module._default_global_cache_timeout_policy + ) + + def test_get_global_cache_timeout_policy(self): + context = self._make_one() + assert ( + context.get_global_cache_timeout_policy() + is context_module._default_global_cache_timeout_policy + ) def test_set_cache_policy(self): policy = object() @@ -120,6 +189,26 @@ def test_set_cache_policy_with_bool(self): context.set_cache_policy(False) assert context.get_cache_policy()(None) is False + def test__use_cache_default_policy(self): + class SomeKind(model.Model): + pass + + context = self._make_one() + with context.use(): + key = key_module.Key("SomeKind", 1) + options = _options.Options() + assert context._use_cache(key, options) is True + + def test__use_cache_from_options(self): + class SomeKind(model.Model): + pass + + context = self._make_one() + with context.use(): + key = "whocares" + options = _options.Options(use_cache=False) + assert context._use_cache(key, options) is False + def test_set_datastore_policy(self): context = self._make_one() with pytest.raises(NotImplementedError): @@ -127,13 +216,88 @@ def test_set_datastore_policy(self): def test_set_memcache_policy(self): context = self._make_one() - with pytest.raises(NotImplementedError): - context.set_memcache_policy(None) + context.set_memcache_policy(None) + assert ( + context.global_cache_policy + is context_module._default_global_cache_policy + ) + + def test_set_global_cache_policy(self): + context = self._make_one() + context.set_global_cache_policy(None) + assert ( + context.global_cache_policy + is context_module._default_global_cache_policy + ) + + def test_set_global_cache_policy_as_bool(self): + context = self._make_one() + context.set_global_cache_policy(True) + assert context.global_cache_policy("whatever") is True + + def test__use_global_cache_no_global_cache(self): + context = self._make_one() + assert context._use_global_cache("key") is False + + def test__use_global_cache_default_policy(self): + class SomeKind(model.Model): + pass + + context = self._make_one(global_cache="yes, there is one") + with context.use(): + key = key_module.Key("SomeKind", 1) + assert context._use_global_cache(key._key) is True + + def test__use_global_cache_from_options(self): + class SomeKind(model.Model): + pass + + context = self._make_one(global_cache="yes, there is one") + with context.use(): + key = "whocares" + options = _options.Options(use_global_cache=False) + assert context._use_global_cache(key, options=options) is False def test_set_memcache_timeout_policy(self): context = self._make_one() - with pytest.raises(NotImplementedError): - context.set_memcache_timeout_policy(None) + context.set_memcache_timeout_policy(None) + assert ( + context.global_cache_timeout_policy + is context_module._default_global_cache_timeout_policy + ) + + def test_set_global_cache_timeout_policy(self): + context = self._make_one() + context.set_global_cache_timeout_policy(None) + assert ( + context.global_cache_timeout_policy + is context_module._default_global_cache_timeout_policy + ) + + def test_set_global_cache_timeout_policy_as_int(self): + context = self._make_one() + context.set_global_cache_timeout_policy(14) + assert context.global_cache_timeout_policy("whatever") == 14 + + def test__global_cache_timeout_default_policy(self): + class SomeKind(model.Model): + pass + + context = self._make_one() + with context.use(): + key = key_module.Key("SomeKind", 1) + timeout = context._global_cache_timeout(key._key, None) + assert timeout is None + + def test__global_cache_timeout_from_options(self): + class SomeKind(model.Model): + pass + + context = self._make_one() + with context.use(): + key = "whocares" + options = _options.Options(global_cache_timeout=49) + assert context._global_cache_timeout(key, options) == 49 def test_call_on_commit(self): context = self._make_one() @@ -149,16 +313,6 @@ def test_default_datastore_policy(self): with pytest.raises(NotImplementedError): context.default_datastore_policy(None) - def test_default_memcache_policy(self): - context = self._make_one() - with pytest.raises(NotImplementedError): - context.default_memcache_policy(None) - - def test_default_memcache_timeout_policy(self): - context = self._make_one() - with pytest.raises(NotImplementedError): - context.default_memcache_timeout_policy(None) - def test_memcache_add(self): context = self._make_one() with pytest.raises(NotImplementedError): @@ -273,31 +427,92 @@ class ThisKind(model.Model): assert context_module._default_cache_policy(key) is False -class TestCache: +class Test_default_global_cache_policy: + @staticmethod + def test_key_is_None(): + assert context_module._default_global_cache_policy(None) is None + + @staticmethod + def test_no_model_class(): + key = mock.Mock(kind="nokind", spec=("kind",)) + assert context_module._default_global_cache_policy(key) is None + + @staticmethod + @pytest.mark.usefixtures("in_context") + def test_standard_model(): + class ThisKind(model.Model): + pass + + key = key_module.Key("ThisKind", 0) + assert context_module._default_global_cache_policy(key._key) is None + + @staticmethod + @pytest.mark.usefixtures("in_context") + def test_standard_model_defines_policy(): + flag = object() + + class ThisKind(model.Model): + @classmethod + def _use_global_cache(cls, key): + return flag + + key = key_module.Key("ThisKind", 0) + assert context_module._default_global_cache_policy(key._key) is flag + @staticmethod - def test_get_and_validate_valid(): - cache = context_module._Cache() - test_entity = mock.Mock(_key="test") - cache["test"] = test_entity - assert cache.get_and_validate("test") is test_entity + @pytest.mark.usefixtures("in_context") + def test_standard_model_defines_policy_as_bool(): + class ThisKind(model.Model): + _use_global_cache = False + + key = key_module.Key("ThisKind", 0) + assert context_module._default_global_cache_policy(key._key) is False + +class Test_default_global_cache_timeout_policy: @staticmethod - def test_get_and_validate_invalid(): - cache = context_module._Cache() - test_entity = mock.Mock(_key="test") - cache["test"] = test_entity - test_entity._key = "changed_key" - with pytest.raises(KeyError): - cache.get_and_validate("test") + def test_key_is_None(): + assert ( + context_module._default_global_cache_timeout_policy(None) is None + ) @staticmethod - def test_get_and_validate_none(): - cache = context_module._Cache() - cache["test"] = None - assert cache.get_and_validate("test") is None + def test_no_model_class(): + key = mock.Mock(kind="nokind", spec=("kind",)) + assert context_module._default_global_cache_timeout_policy(key) is None @staticmethod - def test_get_and_validate_miss(): - cache = context_module._Cache() - with pytest.raises(KeyError): - cache.get_and_validate("nonexistent_key") + @pytest.mark.usefixtures("in_context") + def test_standard_model(): + class ThisKind(model.Model): + pass + + key = key_module.Key("ThisKind", 0) + assert ( + context_module._default_global_cache_timeout_policy(key._key) + is None + ) + + @staticmethod + @pytest.mark.usefixtures("in_context") + def test_standard_model_defines_policy(): + class ThisKind(model.Model): + @classmethod + def _global_cache_timeout(cls, key): + return 13 + + key = key_module.Key("ThisKind", 0) + assert ( + context_module._default_global_cache_timeout_policy(key._key) == 13 + ) + + @staticmethod + @pytest.mark.usefixtures("in_context") + def test_standard_model_defines_policy_as_int(): + class ThisKind(model.Model): + _global_cache_timeout = 12 + + key = key_module.Key("ThisKind", 0) + assert ( + context_module._default_global_cache_timeout_policy(key._key) == 12 + ) diff --git a/tests/unit/test_global_cache.py b/tests/unit/test_global_cache.py new file mode 100644 index 00000000..ffd6409a --- /dev/null +++ b/tests/unit/test_global_cache.py @@ -0,0 +1,146 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +import pytest + +from google.cloud.ndb import global_cache + + +class TestGlobalCache: + def make_one(self): + class MockImpl(global_cache.GlobalCache): + def get(self, keys): + return super(MockImpl, self).get(keys) + + def set(self, items, expires=None): + return super(MockImpl, self).set(items, expires=expires) + + def delete(self, keys): + return super(MockImpl, self).delete(keys) + + def watch(self, keys): + return super(MockImpl, self).watch(keys) + + def compare_and_swap(self, items, expires=None): + return super(MockImpl, self).compare_and_swap( + items, expires=expires + ) + + return MockImpl() + + def test_get(self): + cache = self.make_one() + with pytest.raises(NotImplementedError): + cache.get(b"foo") + + def test_set(self): + cache = self.make_one() + with pytest.raises(NotImplementedError): + cache.set({b"foo": "bar"}) + + def test_delete(self): + cache = self.make_one() + with pytest.raises(NotImplementedError): + cache.delete(b"foo") + + def test_watch(self): + cache = self.make_one() + with pytest.raises(NotImplementedError): + cache.watch(b"foo") + + def test_compare_and_swap(self): + cache = self.make_one() + with pytest.raises(NotImplementedError): + cache.compare_and_swap({b"foo": "bar"}) + + +class TestInProcessGlobalCache: + @staticmethod + def test_set_get_delete(): + cache = global_cache._InProcessGlobalCache() + result = cache.set({b"one": b"foo", b"two": b"bar", b"three": b"baz"}) + assert result is None + + result = cache.get([b"two", b"three", b"one"]) + assert result == [b"bar", b"baz", b"foo"] + + cache = global_cache._InProcessGlobalCache() + result = cache.get([b"two", b"three", b"one"]) + assert result == [b"bar", b"baz", b"foo"] + + result = cache.delete([b"one", b"two", b"three"]) + assert result is None + + result = cache.get([b"two", b"three", b"one"]) + assert result == [None, None, None] + + @staticmethod + @mock.patch("google.cloud.ndb.global_cache.time") + def test_set_get_delete_w_expires(time): + time.time.return_value = 0 + + cache = global_cache._InProcessGlobalCache() + result = cache.set( + {b"one": b"foo", b"two": b"bar", b"three": b"baz"}, expires=5 + ) + assert result is None + + result = cache.get([b"two", b"three", b"one"]) + assert result == [b"bar", b"baz", b"foo"] + + time.time.return_value = 10 + result = cache.get([b"two", b"three", b"one"]) + assert result == [None, None, None] + + @staticmethod + def test_watch_compare_and_swap(): + cache = global_cache._InProcessGlobalCache() + result = cache.watch([b"one", b"two", b"three"]) + assert result is None + + cache.cache[b"two"] = (b"hamburgers", None) + + result = cache.compare_and_swap( + {b"one": b"foo", b"two": b"bar", b"three": b"baz"} + ) + assert result is None + + result = cache.get([b"one", b"two", b"three"]) + assert result == [b"foo", b"hamburgers", b"baz"] + + @staticmethod + @mock.patch("google.cloud.ndb.global_cache.time") + def test_watch_compare_and_swap_with_expires(time): + time.time.return_value = 0 + + cache = global_cache._InProcessGlobalCache() + result = cache.watch([b"one", b"two", b"three"]) + assert result is None + + cache.cache[b"two"] = (b"hamburgers", None) + + result = cache.compare_and_swap( + {b"one": b"foo", b"two": b"bar", b"three": b"baz"}, expires=5 + ) + assert result is None + + result = cache.get([b"one", b"two", b"three"]) + assert result == [b"foo", b"hamburgers", b"baz"] + + time.time.return_value = 10 + + result = cache.get([b"one", b"two", b"three"]) + assert result == [None, b"hamburgers", None] diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py index 42f6dd2f..f51f43e2 100644 --- a/tests/unit/test_model.py +++ b/tests/unit/test_model.py @@ -3610,11 +3610,19 @@ def test__put_no_key(_datastore_api): _datastore_api.put.return_value = future = tasklets.Future() future.set_result(None) - entity_pb = model._entity_to_protobuf(entity) + ds_entity = model._entity_to_ds_entity(entity) assert entity._put() == entity.key - _datastore_api.put.assert_called_once_with( - entity_pb, _options.Options() - ) + + # Can't do a simple "assert_called_once_with" here because entities' + # keys will fail test for equality because Datastore's Key.__eq__ + # method returns False if either key is partial, regardless of whether + # they're effectively equal or not. Have to do this more complicated + # unpacking instead. + assert _datastore_api.put.call_count == 1 + call_ds_entity, call_options = _datastore_api.put.call_args[0] + assert call_ds_entity.key.path == ds_entity.key.path + assert call_ds_entity.items() == ds_entity.items() + assert call_options == _options.Options() @staticmethod @pytest.mark.usefixtures("in_context") @@ -3624,14 +3632,22 @@ def test__put_w_key_no_cache(_datastore_api, in_context): _datastore_api.put.return_value = future = tasklets.Future() key = key_module.Key("SomeKind", 123) - future.set_result(key._key.to_protobuf()) + future.set_result(key._key) - entity_pb = model._entity_to_protobuf(entity) + ds_entity = model._entity_to_ds_entity(entity) assert entity._put(use_cache=False) == key assert not in_context.cache - _datastore_api.put.assert_called_once_with( - entity_pb, _options.Options(use_cache=False) - ) + + # Can't do a simple "assert_called_once_with" here because entities' + # keys will fail test for equality because Datastore's Key.__eq__ + # method returns False if either key is partial, regardless of whether + # they're effectively equal or not. Have to do this more complicated + # unpacking instead. + assert _datastore_api.put.call_count == 1 + call_ds_entity, call_options = _datastore_api.put.call_args[0] + assert call_ds_entity.key.path == ds_entity.key.path + assert call_ds_entity.items() == ds_entity.items() + assert call_options == _options.Options(use_cache=False) @staticmethod @pytest.mark.usefixtures("in_context") @@ -3641,15 +3657,23 @@ def test__put_w_key_with_cache(_datastore_api, in_context): _datastore_api.put.return_value = future = tasklets.Future() key = key_module.Key("SomeKind", 123) - future.set_result(key._key.to_protobuf()) + future.set_result(key._key) - entity_pb = model._entity_to_protobuf(entity) + ds_entity = model._entity_to_ds_entity(entity) assert entity._put(use_cache=True) == key assert in_context.cache[key] == entity assert in_context.cache.get_and_validate(key) == entity - _datastore_api.put.assert_called_once_with( - entity_pb, _options.Options(use_cache=True) - ) + + # Can't do a simple "assert_called_once_with" here because entities' + # keys will fail test for equality because Datastore's Key.__eq__ + # method returns False if either key is partial, regardless of whether + # they're effectively equal or not. Have to do this more complicated + # unpacking instead. + assert _datastore_api.put.call_count == 1 + call_ds_entity, call_options = _datastore_api.put.call_args[0] + assert call_ds_entity.key.path == ds_entity.key.path + assert call_ds_entity.items() == ds_entity.items() + assert call_options == _options.Options(use_cache=True) @staticmethod @pytest.mark.usefixtures("in_context") @@ -3659,13 +3683,21 @@ def test__put_w_key(_datastore_api): _datastore_api.put.return_value = future = tasklets.Future() key = key_module.Key("SomeKind", 123) - future.set_result(key._key.to_protobuf()) + future.set_result(key._key) - entity_pb = model._entity_to_protobuf(entity) + ds_entity = model._entity_to_ds_entity(entity) assert entity._put() == key - _datastore_api.put.assert_called_once_with( - entity_pb, _options.Options() - ) + + # Can't do a simple "assert_called_once_with" here because entities' + # keys will fail test for equality because Datastore's Key.__eq__ + # method returns False if either key is partial, regardless of whether + # they're effectively equal or not. Have to do this more complicated + # unpacking instead. + assert _datastore_api.put.call_count == 1 + call_ds_entity, call_options = _datastore_api.put.call_args[0] + assert call_ds_entity.key.path == ds_entity.key.path + assert call_ds_entity.items() == ds_entity.items() + assert call_options == _options.Options() @staticmethod @pytest.mark.usefixtures("in_context") @@ -3675,14 +3707,22 @@ def test__put_async(_datastore_api): _datastore_api.put.return_value = future = tasklets.Future() key = key_module.Key("SomeKind", 123) - future.set_result(key._key.to_protobuf()) + future.set_result(key._key) - entity_pb = model._entity_to_protobuf(entity) + ds_entity = model._entity_to_ds_entity(entity) tasklet_future = entity._put_async() assert tasklet_future.result() == key - _datastore_api.put.assert_called_once_with( - entity_pb, _options.Options() - ) + + # Can't do a simple "assert_called_once_with" here because entities' + # keys will fail test for equality because Datastore's Key.__eq__ + # method returns False if either key is partial, regardless of whether + # they're effectively equal or not. Have to do this more complicated + # unpacking instead. + assert _datastore_api.put.call_count == 1 + call_ds_entity, call_options = _datastore_api.put.call_args[0] + assert call_ds_entity.key.path == ds_entity.key.path + assert call_ds_entity.items() == ds_entity.items() + assert call_options == _options.Options() @staticmethod @pytest.mark.usefixtures("in_context") @@ -3718,11 +3758,19 @@ def _post_put_hook(self, future, *args, **kwargs): _datastore_api.put.return_value = future = tasklets.Future() future.set_result(None) - entity_pb = model._entity_to_protobuf(entity) + ds_entity = model._entity_to_ds_entity(entity) assert entity._put() == entity.key - _datastore_api.put.assert_called_once_with( - entity_pb, _options.Options() - ) + + # Can't do a simple "assert_called_once_with" here because entities' + # keys will fail test for equality because Datastore's Key.__eq__ + # method returns False if either key is partial, regardless of whether + # they're effectively equal or not. Have to do this more complicated + # unpacking instead. + assert _datastore_api.put.call_count == 1 + call_ds_entity, call_options = _datastore_api.put.call_args[0] + assert call_ds_entity.key.path == ds_entity.key.path + assert call_ds_entity.items() == ds_entity.items() + assert call_options == _options.Options() assert entity.pre_put_calls == [((), {})] assert entity.post_put_calls == [((), {})]