Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Google Cloud Storage Cache #214

Merged
merged 10 commits into from
Mar 6, 2021
199 changes: 199 additions & 0 deletions flask_caching/contrib/googlecloudstoragecache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# -*- coding: utf-8 -*-
import datetime
import json
import logging

from flask_caching.backends.base import BaseCache


logger = logging.getLogger(__name__)


try:
from google.auth.credentials import AnonymousCredentials
from google.cloud import storage, exceptions
except ImportError:
raise RuntimeError("no google-cloud-storage module found")


class GoogleCloudStorageCache(BaseCache):
"""Uses an Google Cloud Storage bucket as a cache backend.
Note: User-contributed functionality. This project does not guarantee that
this functionality will be maintained or functional at any given time.
Note: Cache keys must meet GCS criteria for a valid object name (a sequence
of Unicode characters whose UTF-8 encoding is at most 1024 bytes long).
Note: Expired cache objects are not automatically purged. If
delete_expired_objects_on_read=True, they will be deleted following an
attempted read (which reduces performance). Otherwise, you have to delete
stale objects yourself. Consider an GCS bucket lifecycle rule or other
out-of-band process. For example you can use the following rule.
{"rule": [{"action": {"type": "Delete"}, "condition": {"daysSinceCustomTime": 0}}]}
https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime
:param bucket: Required. Name of the bucket to use. It must already exist.
:param key_prefix: A prefix that should be added to all keys.
:param default_timeout: the default timeout that is used if no timeout is
specified on :meth:`~BaseCache.set`. A timeout of
0 indicates that the cache never expires.
:param delete_expired_objects_on_read: If True, if a read finds a stale
object, it will be deleted before
a response is returned. Will slow
down responses.
:param anonymous: If true, use anonymous credentials. Useful for testing.
Any additional keyword arguments will be passed to ``google.cloud.storage.Client``.
"""

def __init__(
self,
bucket,
key_prefix=None,
default_timeout=300,
delete_expired_objects_on_read=False,
anonymous=False,
**kwargs
):
super(GoogleCloudStorageCache, self).__init__(default_timeout)
if not isinstance(bucket, str):
raise ValueError("GCSCache bucket parameter must be a string")
if anonymous:
self._client = storage.Client(
credentials=AnonymousCredentials(), project="test", **kwargs
)
else:
self._client = storage.Client(**kwargs)
self.bucket = self._client.get_bucket(bucket)
self.key_prefix = key_prefix or ""
self.default_timeout = default_timeout
self.delete_expired_objects_on_read = delete_expired_objects_on_read

@classmethod
def factory(cls, app, config, args, kwargs):
args.insert(0, config["CACHE_GCS_BUCKET"])
key_prefix = config.get("CACHE_KEY_PREFIX")
if key_prefix:
kwargs["key_prefix"] = key_prefix
return cls(*args, **kwargs)

def get(self, key):
result = None
expired = False
hit_or_miss = "miss"
full_key = self.key_prefix + key
blob = self.bucket.get_blob(full_key)
if blob is not None:
expired = blob.custom_time and self._now() > blob.custom_time
if expired:
# Object is stale
if self.delete_expired_objects_on_read:
self._delete(full_key)
else:
try:
result = blob.download_as_bytes()
hit_or_miss = "hit"
if blob.content_type == "application/json":
result = json.loads(result)
except exceptions.NotFound:
pass
expiredstr = "(expired)" if expired else ""
logger.debug("get key %r -> %s %s", full_key, hit_or_miss, expiredstr)
return result

def set(self, key, value, timeout=None):
result = False
full_key = self.key_prefix + key
content_type = "application/json"
try:
value = json.dumps(value)
except (UnicodeDecodeError, TypeError):
content_type = "application/octet-stream"
blob = self.bucket.blob(full_key)
if timeout is None:
timeout = self.default_timeout
if timeout != 0:
# Use 'Custom-Time' for expiry
# https://cloud.google.com/storage/docs/metadata#custom-time
blob.custom_time = self._now(delta=timeout)
try:
blob.upload_from_string(value, content_type=content_type)
result = True
except exceptions.TooManyRequests:
pass
logger.debug("set key %r -> %s", full_key, result)
return result

def add(self, key, value, timeout=None):
full_key = self.key_prefix + key
if self._has(full_key):
logger.debug("add key %r -> not added", full_key)
return False
else:
return self.set(key, value, timeout)

def delete(self, key):
full_key = self.key_prefix + key
return self._delete(full_key)

def delete_many(self, *keys):
return self._delete_many(self.key_prefix + key for key in keys)

def has(self, key):
full_key = self.key_prefix + key
return self._has(full_key)

def clear(self):
return self._prune(clear_all=True)

def _prune(self, clear_all=False):
# Delete in batches of 100 which is much faster than individual deletes
nremoved = 0
now = self._now()
response_iterator = self._client.list_blobs(
self.bucket,
prefix=self.key_prefix,
fields="items(name,customTime),nextPageToken",
)
to_delete = []
for blob in response_iterator:
if clear_all or blob.custom_time and blob.custom_time < now:
to_delete.append(blob.name)
nremoved += 1
if len(to_delete) == 100:
self._delete_many(to_delete)
to_delete = []
# Delete the remainder
if to_delete:
self._delete_many(to_delete)
logger.debug("evicted %d key(s)", nremoved)
return True

def _delete(self, key):
return self._delete_many([key])

def _delete_many(self, keys):
try:
with self._client.batch():
for key in keys:
self.bucket.delete_blob(key)
except (exceptions.NotFound, exceptions.TooManyRequests):
pass
return True

def _has(self, key):
result = False
expired = False
blob = self.bucket.get_blob(key)
if blob is not None:
expired = blob.custom_time and self._now() > blob.custom_time
if expired:
# Exists but is stale
if self.delete_expired_objects_on_read:
self._delete(key)
else:
result = True
expiredstr = "(expired)" if expired else ""
logger.debug("has key %r -> %s %s", key, result, expiredstr)
return result

def _now(self, delta=0):
return datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
seconds=delta
)