Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RedisCache #150

Merged
merged 1 commit into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
("py:class", "Optional"),
("py:class", "Tuple"),
("py:class", "Union"),
("py:class", "redis.Redis"),
]

# Add any Sphinx extension module names here, as strings. They can be
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ def main():
readme_filename = os.path.join(package_root, "README.md")
with io.open(readme_filename, encoding="utf-8") as readme_file:
readme = readme_file.read()
dependencies = ["google-cloud-datastore >= 1.7.0"]
dependencies = [
"google-cloud-datastore >= 1.7.0",
"redis",
]

setuptools.setup(
name="google-cloud-ndb",
Expand Down
114 changes: 113 additions & 1 deletion src/google/cloud/ndb/global_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""GlobalCache interface and its implementations."""

import abc
import collections
import os
import time
import uuid

"""GlobalCache interface and its implementations."""
import redis as redis_module


class GlobalCache(abc.ABC):
Expand Down Expand Up @@ -160,3 +165,110 @@ def compare_and_swap(self, items, expires=None):
current_value = self.cache.get(key)
if watch_value == current_value:
self.cache[key] = (new_value, expires)


_Pipeline = collections.namedtuple("_Pipeline", ("pipe", "id"))


class RedisCache(GlobalCache):
"""Redis implementation of the :class:`GlobalCache`.

This is a synchronous implementation. The idea is that calls to Redis
should be fast enough not to warrant the added complexity of an
asynchronous implementation.

Args:
redis (redis.Redis): Instance of Redis client to use.
"""

@classmethod
def from_environment(cls):
"""Generate a class:`RedisCache` from an environment variable.

This class method looks for the ``REDIS_CACHE_URL`` environment
variable and, if it is set, passes its value to ``Redis.from_url`` to
construct a ``Redis`` instance which is then used to instantiate a
``RedisCache`` instance.

Returns:
Optional[RedisCache]: A :class:`RedisCache` instance or
:data:`None`, if ``REDIS_CACHE_URL`` is not set in the
environment.
"""
url = os.environ.get("REDIS_CACHE_URL")
if url:
return cls(redis_module.Redis.from_url(url))

def __init__(self, redis):
self.redis = redis
self.pipes = {}

def get(self, keys):
"""Implements :meth:`GlobalCache.get`."""
res = self.redis.mget(keys)
return res

def set(self, items, expires=None):
"""Implements :meth:`GlobalCache.set`."""
self.redis.mset(items)
if expires:
for key in items.keys():
self.redis.expire(key, expires)

def delete(self, keys):
"""Implements :meth:`GlobalCache.delete`."""
self.redis.delete(*keys)

def watch(self, keys):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is watch() part of the original ndb's spec for caching? I don't understand how they made this work with the memcached api.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watch and compare_and_swap are equivalent to using get and set in memcached using CAS tokens. Had to generalize the concept a bit to work with Redis. I also did away with the CAS acronym because it's a bit opaque. (I had to do some Googling when I first encountered it.)

"""Implements :meth:`GlobalCache.watch`."""
pipe = self.redis.pipeline()
pipe.watch(*keys)
holder = _Pipeline(pipe, str(uuid.uuid4()))
for key in keys:
self.pipes[key] = holder

def compare_and_swap(self, items, expires=None):
"""Implements :meth:`GlobalCache.compare_and_swap`."""
pipes = {}
mappings = {}
results = {}
remove_keys = []

# get associated pipes
for key, value in items.items():
remove_keys.append(key)
if key not in self.pipes:
continue

pipe = self.pipes[key]
pipes[pipe.id] = pipe
mapping = mappings.setdefault(pipe.id, {})
mapping[key] = value

# execute transaction for each pipes
for pipe_id, mapping in mappings.items():
pipe = pipes[pipe_id].pipe
try:
pipe.multi()
pipe.mset(mapping)
if expires:
for key in mapping.keys():
pipe.expire(key, expires)
pipe.execute()

except redis_module.exceptions.WatchError:
pass

finally:
pipe.reset()

# get keys associated to pipes but not updated
for key, pipe in self.pipes.items():
if pipe.id in pipes:
remove_keys.append(key)

# remote keys
for key in remove_keys:
self.pipes.pop(key, None)

return results
93 changes: 93 additions & 0 deletions tests/system/test_crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datetime
import functools
import operator
import os
import threading

from unittest import mock
Expand All @@ -32,6 +33,8 @@

from tests.system import KIND, eventually

USE_REDIS_CACHE = bool(os.environ.get("REDIS_CACHE_URL"))


def _equals(n):
return functools.partial(operator.eq, n)
Expand Down Expand Up @@ -110,6 +113,40 @@ class SomeKind(ndb.Model):
assert entity.baz == "night"


@pytest.mark.skipif(not USE_REDIS_CACHE, reason="Redis is not configured")
def test_retrieve_entity_with_redis_cache(ds_entity, client_context):
entity_id = test_utils.system.unique_resource_id()
ds_entity(KIND, entity_id, foo=42, bar="none", baz=b"night")

class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()
bar = ndb.StringProperty()
baz = ndb.StringProperty()

global_cache = global_cache_module.RedisCache.from_environment()
with client_context.new(global_cache=global_cache).use() as context:
context.set_global_cache_policy(None) # Use default

key = ndb.Key(KIND, entity_id)
entity = key.get()
assert isinstance(entity, SomeKind)
assert entity.foo == 42
assert entity.bar == "none"
assert entity.baz == "night"

cache_key = _cache.global_cache_key(key._key)
assert global_cache.redis.get(cache_key) is not None

patch = mock.patch("google.cloud.ndb._datastore_api._LookupBatch.add")
patch.side_effect = Exception("Shouldn't call this")
with patch:
entity = key.get()
assert isinstance(entity, SomeKind)
assert entity.foo == 42
assert entity.bar == "none"
assert entity.baz == "night"


@pytest.mark.usefixtures("client_context")
def test_retrieve_entity_not_found(ds_entity):
entity_id = test_utils.system.unique_resource_id()
Expand Down Expand Up @@ -316,6 +353,37 @@ class SomeKind(ndb.Model):
dispose_of(key._key)


@pytest.mark.skipif(not USE_REDIS_CACHE, reason="Redis is not configured")
def test_insert_entity_with_redis_cache(dispose_of, client_context):
class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()
bar = ndb.StringProperty()

global_cache = global_cache_module.RedisCache.from_environment()
with client_context.new(global_cache=global_cache).use() as context:
context.set_global_cache_policy(None) # Use default

entity = SomeKind(foo=42, bar="none")
key = entity.put()
cache_key = _cache.global_cache_key(key._key)
assert global_cache.redis.get(cache_key) is None

retrieved = key.get()
assert retrieved.foo == 42
assert retrieved.bar == "none"

assert global_cache.redis.get(cache_key) is not None

entity.foo = 43
entity.put()

# This is py27 behavior. I can see a case being made for caching the
# entity on write rather than waiting for a subsequent lookup.
assert global_cache.redis.get(cache_key) is None

dispose_of(key._key)


@pytest.mark.usefixtures("client_context")
def test_update_entity(ds_entity):
entity_id = test_utils.system.unique_resource_id()
Expand Down Expand Up @@ -453,6 +521,31 @@ class SomeKind(ndb.Model):
assert cache_dict[cache_key][0] == b"0"


@pytest.mark.skipif(not USE_REDIS_CACHE, reason="Redis is not configured")
def test_delete_entity_with_redis_cache(ds_entity, client_context):
entity_id = test_utils.system.unique_resource_id()
ds_entity(KIND, entity_id, foo=42)

class SomeKind(ndb.Model):
foo = ndb.IntegerProperty()

key = ndb.Key(KIND, entity_id)
cache_key = _cache.global_cache_key(key._key)
global_cache = global_cache_module.RedisCache.from_environment()

with client_context.new(global_cache=global_cache).use():
assert key.get().foo == 42
assert global_cache.redis.get(cache_key) is not None

assert key.delete() is None
assert global_cache.redis.get(cache_key) is None

# This is py27 behavior. Not entirely sold on leaving _LOCKED value for
# Datastore misses.
assert key.get() is None
assert global_cache.redis.get(cache_key) == b"0"


@pytest.mark.usefixtures("client_context")
def test_delete_entity_in_transaction(ds_entity):
entity_id = test_utils.system.unique_resource_id()
Expand Down
Loading