Skip to content

Commit

Permalink
Tweak docs, minor re-organization.
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Rossi committed Aug 1, 2019
1 parent 27471ff commit 3de46e5
Show file tree
Hide file tree
Showing 13 changed files with 392 additions and 299 deletions.
7 changes: 7 additions & 0 deletions docs/global_cache.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#######
Context
#######

.. automodule:: google.cloud.ndb.global_cache
:members:
:show-inheritance:
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

client
context
global_cache
key
model
query
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ prefetch
protobuf
proxied
QueryOptions
Redis
RequestHandler
runtime
schemas
Expand Down
2 changes: 2 additions & 0 deletions src/google/cloud/ndb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"get_indexes_async",
"get_multi",
"get_multi_async",
"GlobalCache",
"in_transaction",
"Index",
"IndexProperty",
Expand Down Expand Up @@ -135,6 +136,7 @@
from google.cloud.ndb._datastore_api import STRONG
from google.cloud.ndb._datastore_query import Cursor
from google.cloud.ndb._datastore_query import QueryIterator
from google.cloud.ndb.global_cache import GlobalCache
from google.cloud.ndb.key import Key
from google.cloud.ndb.model import BlobKey
from google.cloud.ndb.model import BlobKeyProperty
Expand Down
191 changes: 36 additions & 155 deletions src/google/cloud/ndb/_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import collections
import itertools
import time

from google.cloud.ndb import _batch
from google.cloud.ndb import context as context_module
Expand Down Expand Up @@ -46,73 +44,6 @@ def get_and_validate(self, key):
raise KeyError(key)


class GlobalCache(abc.ABC):
"""Abstract base class for a global entity cache.
A global entity cache is shared across contexts, sessions, and possibly
even servers. A concrete implementation is available which uses Redis.
"""

@abc.abstractmethod
def get(self, keys):
"""Retrieve entities from the cache.
Arguments:
keys (List[bytes]): The keys to get.
Returns:
List[Union[bytes, None]]]: Serialized entities, or :data:`None`,
for each key.
"""
raise NotImplementedError

@abc.abstractmethod
def set(self, items, expires=None):
"""Store entities in the cache.
Arguments:
items (Dict[bytes, Union[bytes, None]]): Mapping of keys to
serialized entities.
expires (Optional[float]): Number of seconds until value expires.
"""
raise NotImplementedError

@abc.abstractmethod
def delete(self, keys):
"""Remove entities from the cache.
Arguments:
keys (List[bytes]): The keys to remove.
"""
raise NotImplementedError

@abc.abstractmethod
def watch(self, keys):
"""Begin an optimistic transaction for the given keys.
A future call to :meth:`compare_and_swap` will only set values for keys
whose values haven't changed since the call to this method.
Arguments:
keys (List[bytes]): The keys to watch.
"""
raise NotImplementedError

@abc.abstractmethod
def compare_and_swap(self, items, expires=None):
"""Like :meth:`set` but using an optimistic transaction.
Only keys whose values haven't changed since a preceeding call to
:meth:`watch` will be changed.
Arguments:
items (Dict[bytes, Union[bytes, None]]): Mapping of keys to
serialized entities.
expires (Optional[float]): Number of seconds until value expires.
"""
raise NotImplementedError


def _future_result(result):
"""Returns a completed Future with the given result.
Expand All @@ -124,89 +55,26 @@ def _future_result(result):
return future


class _InProcessGlobalCache(GlobalCache):
"""Reference implementation of :class:`GlobalCache`.
Not intended for production use. Uses a single process wide dictionary to
keep an in memory cache. For use in testing and to have an easily grokkable
reference implementation. Thread safety is potentially a little sketchy.
"""

cache = {}
"""Dict: The cache.
Relies on atomicity of ``__setitem__`` for thread safety. See:
http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm
class _GlobalCacheBatch:
"""Abstract base for classes used to batch operations for the global cache.
"""

def __init__(self):
self._watch_keys = {}

def get(self, keys):
"""Implements :meth:`GlobalCache.get`."""
now = time.time()
results = [self.cache.get(key) for key in keys]
entity_pbs = []
for result in results:
if result is not None:
entity_pb, expires = result
if expires and expires < now:
entity_pb = None
else:
entity_pb = None

entity_pbs.append(entity_pb)

return _future_result(entity_pbs)

def set(self, items, expires=None):
"""Implements :meth:`GlobalCache.set`."""
if expires:
expires = time.time() + expires

for key, value in items.items():
self.cache[key] = (value, expires) # Supposedly threadsafe

return _future_result(None)

def delete(self, keys):
"""Implements :meth:`GlobalCache.delete`."""
for key in keys:
self.cache.pop(key, None) # Threadsafe?

return _future_result(None)

def watch(self, keys):
"""Implements :meth:`GlobalCache.watch`."""
for key in keys:
self._watch_keys[key] = self.cache.get(key)

return _future_result(None)

def compare_and_swap(self, items, expires=None):
"""Implements :meth:`GlobalCache.compare_and_swap`."""
if expires:
expires = time.time() + expires

for key, new_value in items.items():
watch_value = self._watch_keys.get(key)
current_value = self.cache.get(key)
if watch_value == current_value:
self.cache[key] = (new_value, expires)

return _future_result(None)


class _GlobalCacheBatch:
def idle_callback(self):
"""Get keys from the global cache."""
"""Call the cache operation.
Also, schedule a callback for the completed operation.
"""
cache_call = self.make_call()
if not isinstance(cache_call, tasklets.Future):
cache_call = _future_result(cache_call)
cache_call.add_done_callback(self.done_callback)

def done_callback(self, cache_call):
"""Process results of call to global cache."""
"""Process results of call to global cache.
If there is an exception for the cache call, distribute that to waiting
futures, otherwise set the result for all waiting futures to ``None``.
"""
exception = cache_call.exception()
if exception:
for future in self.futures:
Expand All @@ -216,6 +84,14 @@ def done_callback(self, cache_call):
for future in self.futures:
future.set_result(None)

def make_call(self):
"""Make the actual call to the global cache. To be overridden."""
raise NotImplementedError

def future_info(self, key):
"""Generate info string for Future. To be overridden."""
raise NotImplementedError


def global_get(key):
"""Get entity from global cache.
Expand Down Expand Up @@ -265,7 +141,12 @@ def add(self, key):
return future

def done_callback(self, cache_call):
"""Process results of call to global cache."""
"""Process results of call to global cache.
If there is an exception for the cache call, distribute that to waiting
futures, otherwise distribute cache hits or misses to their respective
waiting futures.
"""
exception = cache_call.exception()
if exception:
for future in itertools.chain(*self.todo.values()):
Expand All @@ -280,12 +161,12 @@ def done_callback(self, cache_call):
future.set_result(result)

def make_call(self):
"""Make the actual call. To be overridden."""
"""Call :method:`GlobalCache.get`."""
cache = context_module.get_context().global_cache
return cache.get(self.todo.keys())

def future_info(self, key):
"""Generate info string for Future. To be overridden."""
"""Generate info string for Future."""
return "GlobalCache.get({})".format(key)


Expand Down Expand Up @@ -332,12 +213,12 @@ def add(self, key, value):
return future

def make_call(self):
"""Make the actual call. To be overridden."""
"""Call :method:`GlobalCache.set`."""
cache = context_module.get_context().global_cache
return cache.set(self.todo, expires=self.expires)

def future_info(self, key, value):
"""Generate info string for Future. To be overridden."""
"""Generate info string for Future."""
return "GlobalCache.set({}, {})".format(key, value)


Expand All @@ -355,7 +236,7 @@ def global_delete(key):


class _GlobalCacheDeleteBatch(_GlobalCacheBatch):
"""Batch for global cache set requests. """
"""Batch for global cache delete requests."""

def __init__(self, ignore_options):
self.keys = []
Expand All @@ -376,12 +257,12 @@ def add(self, key):
return future

def make_call(self):
"""Make the actual call. To be overridden."""
"""Call :method:`GlobalCache.delete`."""
cache = context_module.get_context().global_cache
return cache.delete(self.keys)

def future_info(self, key):
"""Generate info string for Future. To be overridden."""
"""Generate info string for Future."""
return "GlobalCache.delete({})".format(key)


Expand Down Expand Up @@ -409,12 +290,12 @@ def __init__(self, ignore_options):
self.futures = []

def make_call(self):
"""Make the actual call. To be overridden."""
"""Call :method:`GlobalCache.watch`."""
cache = context_module.get_context().global_cache
return cache.watch(self.keys)

def future_info(self, key):
"""Generate info string for Future. To be overridden."""
"""Generate info string for Future."""
return "GlobalWatch.delete({})".format(key)


Expand Down Expand Up @@ -444,12 +325,12 @@ class _GlobalCacheCompareAndSwapBatch(_GlobalCacheSetBatch):
"""Batch for global cache compare and swap requests. """

def make_call(self):
"""Make the actual call. To be overridden."""
"""Call :method:`GlobalCache.compare_and_swap`."""
cache = context_module.get_context().global_cache
return cache.compare_and_swap(self.todo, expires=self.expires)

def future_info(self, key, value):
"""Generate info string for Future. To be overridden."""
"""Generate info string for Future."""
return "GlobalCache.compare_and_swap({}, {})".format(key, value)


Expand Down
4 changes: 2 additions & 2 deletions src/google/cloud/ndb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ def context(
cache_policy (Optional[Callable[[key.Key], bool]]): The
cache policy to use in this context. See:
:meth:`~google.cloud.ndb.context.Context.set_cache_policy`.
global_cache (Optional[_cache.GlobalCache]):
global_cache (Optional[global_cache.GlobalCache]):
The global cache for this context. See:
:class:`~google.cloud.ndb._cache.GlobalCache`.
:class:`~google.cloud.ndb.global_cache.GlobalCache`.
global_cache_policy (Optional[Callable[[key.Key], bool]]): The
global cache policy to use in this context. See:
:meth:`~google.cloud.ndb.context.Context.set_global_cache_policy`.
Expand Down
Loading

0 comments on commit 3de46e5

Please sign in to comment.