Skip to content

Commit

Permalink
Add initial implementation of S3Cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjamin Yolken committed Feb 3, 2017
1 parent 0f7189b commit 1e94498
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 7 deletions.
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def get_git_sha():
zip_safe=False,
scripts=['superset/bin/superset'],
install_requires=[
'boto3==1.4.4',
'celery==3.1.23',
'cryptography==1.5.3',
'flask-appbuilder==1.8.1',
Expand Down
2 changes: 1 addition & 1 deletion superset/assets/version_info.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"GIT_SHA": "2d08e240285288b71df98747ddd4b6cca3220c5a", "version": "0.15.2"}
{"GIT_SHA": "0f7189b859f4a782fd43af694012029645f81b44", "version": "0.15.4"}
114 changes: 108 additions & 6 deletions superset/results_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,106 @@
from __future__ import print_function
from __future__ import unicode_literals

import cPickle
import logging
import StringIO

import boto3
from werkzeug.contrib.cache import BaseCache

from superset import app

config = app.config


class S3Cache(BaseCache):

"""S3 cache"""
"""S3 cache implementation.
Adapted from examples in
https://github.com/pallets/werkzeug/blob/master/werkzeug/contrib/cache.py.
Timeout parameters are ignored as S3 doesn't support key-level expiration. To expire
keys, set up an expiration policy as described in
https://aws.amazon.com/blogs/aws/amazon-s3-object-expiration/.
"""

def __init__(self, default_timeout=300):
self.default_timeout = default_timeout

self.s3_client = boto3.client('s3')
self.bucket = self.s3_resource.Bucket(config.get('S3_CACHE_BUCKET'))
self.key_prefix = config.get('S3_CACHE_KEY_PREFIX')

def get(self, key):
return None
"""Look up key in the cache and return the value for it.
:param key: the key to be looked up.
:returns: The value if it exists and is readable, else ``None``.
"""
if not self._key_exists(key):
return None
else:
value_file = StringIO.StringIO()

try:
self.s3_client.download_fileobj(self.bucket, self._full_s3_key(key), value_file)
except Exception as e:
logging.warn('Exception while trying to get %s: %s', key, e)
return None
else:
value_file.seek(0)
return cPickle.load(value_file)

def delete(self, key):
return True
"""Delete `key` from the cache.
:param key: the key to delete.
:returns: Whether the key existed and has been deleted.
:rtype: boolean
"""
if not self._key_exists(key):
return False
else:
try:
response = self.s3_client.delete_objects(
Bucket=self.bucket,
Delete={
'Objects': [
{
'Key': self._full_s3_key(key)
}
]
}
)
except Exception as e:
logging.warn('Exception while trying to delete %s: %s', key, e)
return False
else:
return True

def set(self, key, value, timeout=None):
return True
"""Add a new key/value to the cache (overwrites value, if key already
exists in the cache).
:param key: the key to set
:param value: the value for the key
:param timeout: the cache timeout for the key in seconds (if not
specified, it uses the default timeout). A timeout of
0 idicates that the cache never expires.
:returns: ``True`` if key has been updated, ``False`` for backend
errors. Pickling errors, however, will raise a subclass of
``pickle.PickleError``.
:rtype: boolean
"""
value_file = StringIO.StringIO()
cPickle.dump(value, value_file)

try:
value_file.seek(0)
self.s3_client.upload_fileobj(value_file, self.bucket, self._full_s3_key(key))
except Exception as e:
logging.warn('Exception while trying to set %s: %s', key, e)
return False
else:
return True

def add(self, key, value, timeout=None):
"""Works like :meth:`set` but does not overwrite the values of already
Expand All @@ -38,12 +120,32 @@ def add(self, key, value, timeout=None):
existing keys.
:rtype: boolean
"""
return True
if self._key_exists(key):
return False
else:
return self.set(key, value, timeout=timeout)

def clear(self):
"""Clears the cache. Keep in mind that not all caches support
completely clearing the cache.
:returns: Whether the cache has been cleared.
:rtype: boolean
"""
return True
return False

def _full_s3_key(self, key):
"""Convert a cache key to a full S3 key, including the key prefix."""
return '%s%s' % (self.key_prefix, key)

def _key_exists(self, key):
"""Determine whether the given key exists in the bucket."""
try:
response = self.s3_client.head_object(
Bucket=self.bucket,
Key=self._full_s3_key(key)
)
except Exception as e:
# head_object throws an exception when object doesn't exist
return False
else:
return True

0 comments on commit 1e94498

Please sign in to comment.