Skip to content

Commit

Permalink
prepare 5.0.2 release (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
eli-darkly authored Mar 27, 2018
1 parent 31ff79f commit 0e52d0f
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 30 deletions.
73 changes: 43 additions & 30 deletions ldclient/redis_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,20 @@ def all(self, kind, callback):
return callback(results)

def get(self, kind, key, callback=lambda x: x):
item = self._get_even_if_deleted(kind, key)
item = self._get_even_if_deleted(kind, key, check_cache=True)
if item is not None and item.get('deleted', False) is True:
log.debug("RedisFeatureStore: get returned deleted item %s in '%s'. Returning None.", key, kind.namespace)
return callback(None)
return callback(item)

def _get_even_if_deleted(self, kind, key):
def _get_even_if_deleted(self, kind, key, check_cache = True):
cacheKey = self._cache_key(kind, key)
item = self._cache.get(cacheKey)
if item is not None:
# reset ttl
self._cache[cacheKey] = item
return item
if check_cache:
item = self._cache.get(cacheKey)
if item is not None:
# reset ttl
self._cache[cacheKey] = item
return item

try:
r = redis.Redis(connection_pool=self._pool)
Expand All @@ -110,17 +111,11 @@ def _get_even_if_deleted(self, kind, key):
return item

def delete(self, kind, key, version):
r = redis.Redis(connection_pool=self._pool)
baseKey = self._items_key(kind)
r.watch(baseKey)
item_json = r.hget(baseKey, key)
item = None if item_json is None else json.loads(item_json.decode('utf-8'))
if item is None or item['version'] < version:
deletedItem = { "deleted": True, "version": version }
item_json = json.dumps(deletedItem)
r.hset(baseKey, key, item_json)
self._cache[self._cache_key(kind, key)] = deletedItem
r.unwatch()
deleted_item = { "key": key, "version": version, "deleted": True }
self._update_with_versioning(kind, deleted_item)

def upsert(self, kind, item):
self._update_with_versioning(kind, item)

@property
def initialized(self):
Expand All @@ -130,18 +125,36 @@ def _query_init(self):
r = redis.Redis(connection_pool=self._pool)
return r.exists(self._items_key(FEATURES))

def upsert(self, kind, item):
def _update_with_versioning(self, kind, item):
r = redis.Redis(connection_pool=self._pool)
baseKey = self._items_key(kind)
base_key = self._items_key(kind)
key = item['key']
r.watch(baseKey)
old = self._get_even_if_deleted(kind, key)
if old:
if old['version'] >= item['version']:
r.unwatch()
return

item_json = json.dumps(item)
r.hset(baseKey, key, item_json)
self._cache[self._cache_key(kind, key)] = item
r.unwatch()

while True:
pipeline = r.pipeline()
pipeline.watch(base_key)
old = self._get_even_if_deleted(kind, key, check_cache=False)
self._before_update_transaction(base_key, key)
if old and old['version'] >= item['version']:
log.debug('RedisFeatureStore: Attempted to %s key: %s version %d with a version that is the same or older: %d in "%s"',
'delete' if item.get('deleted') else 'update',
key, old['version'], item['version'], kind.namespace)
pipeline.unwatch()
break
else:
pipeline.multi()
pipeline.hset(base_key, key, item_json)
try:
pipeline.execute()
# Unlike Redis implementations for other platforms, in redis-py a failed WATCH
# produces an exception rather than a null result from execute().
except redis.exceptions.WatchError:
log.debug("RedisFeatureStore: concurrent modification detected, retrying")
continue
self._cache[self._cache_key(kind, key)] = item
break

def _before_update_transaction(self, base_key, key):
# exposed for testing
pass
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mock>=2.0.0
pytest>=2.8
pytest-timeout>=1.0
redis>=2.10.5
Expand Down
42 changes: 42 additions & 0 deletions testing/test_feature_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json
from mock import patch
import pytest
import redis

Expand Down Expand Up @@ -120,3 +122,43 @@ def test_upsert_older_version_after_delete(self, store):
old_ver = self.make_feature('foo', 9)
store.upsert(FEATURES, old_ver)
assert store.get(FEATURES, 'foo', lambda x: x) is None


class TestRedisFeatureStoreExtraTests:
@patch.object(RedisFeatureStore, '_before_update_transaction')
def test_upsert_race_condition_against_external_client_with_higher_version(self, mock_method):
other_client = redis.StrictRedis(host='localhost', port=6379, db=0)
store = RedisFeatureStore()
store.init({ FEATURES: {} })

other_version = {u'key': u'flagkey', u'version': 2}
def hook(base_key, key):
if other_version['version'] <= 4:
other_client.hset(base_key, key, json.dumps(other_version))
other_version['version'] = other_version['version'] + 1
mock_method.side_effect = hook

feature = { u'key': 'flagkey', u'version': 1 }

store.upsert(FEATURES, feature)
result = store.get(FEATURES, 'flagkey', lambda x: x)
assert result['version'] == 2

@patch.object(RedisFeatureStore, '_before_update_transaction')
def test_upsert_race_condition_against_external_client_with_lower_version(self, mock_method):
other_client = redis.StrictRedis(host='localhost', port=6379, db=0)
store = RedisFeatureStore()
store.init({ FEATURES: {} })

other_version = {u'key': u'flagkey', u'version': 2}
def hook(base_key, key):
if other_version['version'] <= 4:
other_client.hset(base_key, key, json.dumps(other_version))
other_version['version'] = other_version['version'] + 1
mock_method.side_effect = hook

feature = { u'key': 'flagkey', u'version': 5 }

store.upsert(FEATURES, feature)
result = store.get(FEATURES, 'flagkey', lambda x: x)
assert result['version'] == 5

0 comments on commit 0e52d0f

Please sign in to comment.