Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #81 from launchdarkly/eb/ch28329/dynamodb
Browse files Browse the repository at this point in the history
add DynamoDB  support
eli-darkly authored Jan 9, 2019
2 parents ac0f2ea + fa56526 commit 3a1c2dc
Showing 7 changed files with 350 additions and 24 deletions.
6 changes: 6 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -40,28 +40,34 @@ jobs:
docker:
- image: circleci/python:2.7-jessie
- image: redis
- image: amazon/dynamodb-local
test-3.3:
<<: *test-template
docker:
- image: circleci/python:3.3-jessie
- image: redis
- image: amazon/dynamodb-local
test-3.4:
<<: *test-template
docker:
- image: circleci/python:3.4-jessie
- image: redis
- image: amazon/dynamodb-local
test-3.5:
<<: *test-template
docker:
- image: circleci/python:3.5-jessie
- image: redis
- image: amazon/dynamodb-local
test-3.6:
<<: *test-template
docker:
- image: circleci/python:3.6-jessie
- image: redis
- image: amazon/dynamodb-local
test-3.7:
<<: *test-template
docker:
- image: circleci/python:3.7-stretch
- image: redis
- image: amazon/dynamodb-local
1 change: 1 addition & 0 deletions dynamodb-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
boto3>=1.9.71
191 changes: 191 additions & 0 deletions ldclient/dynamodb_feature_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import json

have_dynamodb = False
try:
import boto3
have_dynamodb = True
except ImportError:
pass

from ldclient import log
from ldclient.feature_store import CacheConfig
from ldclient.feature_store_helpers import CachingStoreWrapper
from ldclient.interfaces import FeatureStore, FeatureStoreCore

#
# Internal implementation of the DynamoDB feature store.
#
# Implementation notes:
#
# * Feature flags, segments, and any other kind of entity the LaunchDarkly client may wish
# to store, are all put in the same table. The only two required attributes are "key" (which
# is present in all storeable entities) and "namespace" (a parameter from the client that is
# used to disambiguate between flags and segments).
#
# * Because of DynamoDB's restrictions on attribute values (e.g. empty strings are not
# allowed), the standard DynamoDB marshaling mechanism with one attribute per object property
# is not used. Instead, the entire object is serialized to JSON and stored in a single
# attribute, "item". The "version" property is also stored as a separate attribute since it
# is used for updates.
#
# * Since DynamoDB doesn't have transactions, the init() method - which replaces the entire data
# store - is not atomic, so there can be a race condition if another process is adding new data
# via upsert(). To minimize this, we don't delete all the data at the start; instead, we update
# the items we've received, and then delete all other items. That could potentially result in
# deleting new data from another process, but that would be the case anyway if the init()
# happened to execute later than the upsert(); we are relying on the fact that normally the
# process that did the init() will also receive the new data shortly and do its own upsert().
#
# * DynamoDB has a maximum item size of 400KB. Since each feature flag or user segment is
# stored as a single item, this mechanism will not work for extremely large flags or segments.
#

class _DynamoDBFeatureStoreCore(FeatureStoreCore):
PARTITION_KEY = 'namespace'
SORT_KEY = 'key'
VERSION_ATTRIBUTE = 'version'
ITEM_JSON_ATTRIBUTE = 'item'

def __init__(self, table_name, prefix, dynamodb_opts):
if not have_dynamodb:
raise NotImplementedError("Cannot use DynamoDB feature store because AWS SDK (boto3 package) is not installed")
self._table_name = table_name
self._prefix = None if prefix == "" else prefix
self._client = boto3.client('dynamodb', **dynamodb_opts)

def init_internal(self, all_data):
# Start by reading the existing keys; we will later delete any of these that weren't in all_data.
unused_old_keys = self._read_existing_keys(all_data.keys())
requests = []
num_items = 0
inited_key = self._inited_key()

# Insert or update every provided item
for kind, items in all_data.items():
for key, item in items.items():
encoded_item = self._marshal_item(kind, item)
requests.append({ 'PutRequest': { 'Item': encoded_item } })
combined_key = (self._namespace_for_kind(kind), key)
unused_old_keys.discard(combined_key)
num_items = num_items + 1

# Now delete any previously existing items whose keys were not in the current data
for combined_key in unused_old_keys:
if combined_key[0] != inited_key:
requests.append({ 'DeleteRequest': { 'Key': self._make_keys(combined_key[0], combined_key[1]) } })

# Now set the special key that we check in initialized_internal()
requests.append({ 'PutRequest': { 'Item': self._make_keys(inited_key, inited_key) } })

_DynamoDBHelpers.batch_write_requests(self._client, self._table_name, requests)
log.info('Initialized table %s with %d items', self._table_name, num_items)

def get_internal(self, kind, key):
resp = self._get_item_by_keys(self._namespace_for_kind(kind), key)
return self._unmarshal_item(resp.get('Item'))

def get_all_internal(self, kind):
items_out = {}
paginator = self._client.get_paginator('query')
for resp in paginator.paginate(**self._make_query_for_kind(kind)):
for item in resp['Items']:
item_out = self._unmarshal_item(item)
items_out[item_out['key']] = item_out
return items_out

def upsert_internal(self, kind, item):
encoded_item = self._marshal_item(kind, item)
try:
req = {
'TableName': self._table_name,
'Item': encoded_item,
'ConditionExpression': 'attribute_not_exists(#namespace) or attribute_not_exists(#key) or :version > #version',
'ExpressionAttributeNames': {
'#namespace': self.PARTITION_KEY,
'#key': self.SORT_KEY,
'#version': self.VERSION_ATTRIBUTE
},
'ExpressionAttributeValues': {
':version': { 'N': str(item['version']) }
}
}
self._client.put_item(**req)
except self._client.exceptions.ConditionalCheckFailedException:
# The item was not updated because there's a newer item in the database. We must now
# read the item that's in the database and return it, so CachingStoreWrapper can cache it.
return self.get_internal(kind, item['key'])
return item

def initialized_internal(self):
resp = self._get_item_by_keys(self._inited_key(), self._inited_key())
return resp.get('Item') is not None and len(resp['Item']) > 0

def _prefixed_namespace(self, base):
return base if self._prefix is None else (self._prefix + ':' + base)

def _namespace_for_kind(self, kind):
return self._prefixed_namespace(kind.namespace)

def _inited_key(self):
return self._prefixed_namespace('$inited')

def _make_keys(self, namespace, key):
return {
self.PARTITION_KEY: { 'S': namespace },
self.SORT_KEY: { 'S': key }
}

def _make_query_for_kind(self, kind):
return {
'TableName': self._table_name,
'ConsistentRead': True,
'KeyConditions': {
self.PARTITION_KEY: {
'AttributeValueList': [
{ 'S': self._namespace_for_kind(kind) }
],
'ComparisonOperator': 'EQ'
}
}
}

def _get_item_by_keys(self, namespace, key):
return self._client.get_item(TableName=self._table_name, Key=self._make_keys(namespace, key))

def _read_existing_keys(self, kinds):
keys = set()
for kind in kinds:
req = self._make_query_for_kind(kind)
req['ProjectionExpression'] = '#namespace, #key'
req['ExpressionAttributeNames'] = {
'#namespace': self.PARTITION_KEY,
'#key': self.SORT_KEY
}
paginator = self._client.get_paginator('query')
for resp in paginator.paginate(**req):
for item in resp['Items']:
namespace = item[self.PARTITION_KEY]['S']
key = item[self.SORT_KEY]['S']
keys.add((namespace, key))
return keys

def _marshal_item(self, kind, item):
json_str = json.dumps(item)
ret = self._make_keys(self._namespace_for_kind(kind), item['key'])
ret[self.VERSION_ATTRIBUTE] = { 'N': str(item['version']) }
ret[self.ITEM_JSON_ATTRIBUTE] = { 'S': json_str }
return ret

def _unmarshal_item(self, item):
if item is None:
return None
json_attr = item.get(self.ITEM_JSON_ATTRIBUTE)
return None if json_attr is None else json.loads(json_attr['S'])


class _DynamoDBHelpers(object):
@staticmethod
def batch_write_requests(client, table_name, requests):
batch_size = 25
for batch in (requests[i:i+batch_size] for i in range(0, len(requests), batch_size)):
client.batch_write_item(RequestItems={ table_name: batch })
25 changes: 24 additions & 1 deletion ldclient/integrations.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
from ldclient.feature_store import CacheConfig
from ldclient.feature_store_helpers import CachingStoreWrapper
from ldclient.dynamodb_feature_store import _DynamoDBFeatureStoreCore
from ldclient.redis_feature_store import _RedisFeatureStoreCore


class DynamoDB(object):
"""Provides factory methods for integrations between the LaunchDarkly SDK and DynamoDB.
"""

@staticmethod
def new_feature_store(table_name,
prefix=None,
dynamodb_opts={},
caching=CacheConfig.default()):
"""Creates a DynamoDB-backed implementation of `:class:ldclient.feature_store.FeatureStore`.
:param string table_name: The name of an existing DynamoDB table
:param string prefix: An optional namespace prefix to be prepended to all Redis keys
:param dict dynamodb_opts: Optional parameters for configuring the DynamoDB client, as defined in
the boto3 API; see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client
:param CacheConfig caching: Specifies whether local caching should be enabled and if so,
sets the cache properties; defaults to `CacheConfig.default()`
"""
core = _DynamoDBFeatureStoreCore(table_name, prefix, dynamodb_opts)
return CachingStoreWrapper(core, caching)


class Redis(object):
"""Provides factory methods for integrations between the LaunchDarkly SDK and Redis,
"""Provides factory methods for integrations between the LaunchDarkly SDK and Redis.
"""
DEFAULT_URL = 'redis://localhost:6379/0'
DEFAULT_PREFIX = 'launchdarkly'
11 changes: 9 additions & 2 deletions ldclient/redis_feature_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import json

import redis
have_redis = False
try:
import redis
have_redis = True
except ImportError:
pass

from ldclient import log
from ldclient.feature_store import CacheConfig
@@ -26,7 +31,8 @@ def __init__(self,
max_connections=16,
expiration=15,
capacity=1000):

if not have_redis:
raise NotImplementedError("Cannot use Redis feature store because redis package is not installed")
self.core = _RedisFeatureStoreCore(url, prefix, max_connections) # exposed for testing
self._wrapper = CachingStoreWrapper(self.core, CacheConfig(expiration=expiration, capacity=capacity))

@@ -52,6 +58,7 @@ def initialized(self):

class _RedisFeatureStoreCore(FeatureStoreCore):
def __init__(self, url, prefix, max_connections):

self._prefix = prefix
self._pool = redis.ConnectionPool.from_url(url=url, max_connections=max_connections)
self.test_update_hook = None # exposed for testing
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mock>=2.0.0
pytest>=2.8
redis>=2.10.5
boto3>=1.9.71
coverage>=4.4
pytest-capturelog>=0.7
pytest-cov>=2.4.0
139 changes: 118 additions & 21 deletions testing/test_feature_store.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,139 @@
import boto3
import json
import pytest
import redis
import time

from ldclient.dynamodb_feature_store import _DynamoDBFeatureStoreCore, _DynamoDBHelpers
from ldclient.feature_store import CacheConfig, InMemoryFeatureStore
from ldclient.integrations import Redis
from ldclient.integrations import DynamoDB, Redis
from ldclient.redis_feature_store import RedisFeatureStore
from ldclient.versioned_data_kind import FEATURES


class TestFeatureStore:
class InMemoryTester(object):
def init_store(self):
return InMemoryFeatureStore()


class RedisTester(object):
redis_host = 'localhost'
redis_port = 6379

def clear_redis_data(self):
def __init__(self, cache_config):
self._cache_config = cache_config

def init_store(self):
self._clear_data()
return Redis.new_feature_store(caching=self._cache_config)

def _clear_data(self):
r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0)
r.delete("launchdarkly:features")

def in_memory(self):
return InMemoryFeatureStore()

def redis_with_local_cache(self):
self.clear_redis_data()
return Redis.new_feature_store()

def redis_no_local_cache(self):
self.clear_redis_data()
return Redis.new_feature_store(caching=CacheConfig.disabled())

def deprecated_redis_with_local_cache(self):
self.clear_redis_data()
return RedisFeatureStore()
class RedisWithDeprecatedConstructorTester(RedisTester):
def init_store(self):
self._clear_data()
return RedisFeatureStore(expiration=(30 if self._cache_config.enabled else 0))


class DynamoDBTester(object):
table_name = 'LD_DYNAMODB_TEST_TABLE'
table_created = False
options = {
'aws_access_key_id': 'key', # not used by local DynamoDB, but still required
'aws_secret_access_key': 'secret',
'endpoint_url': 'http://localhost:8000',
'region_name': 'us-east-1'
}

def __init__(self, cache_config):
self._cache_config = cache_config

def init_store(self):
self._create_table()
self._clear_data()
return DynamoDB.new_feature_store(self.table_name, dynamodb_opts=self.options)

def _create_table(self):
if self.table_created:
return
client = boto3.client('dynamodb', **self.options)
try:
client.describe_table(TableName=self.table_name)
self.table_created = True
return
except client.exceptions.ResourceNotFoundException:
pass
req = {
'TableName': self.table_name,
'KeySchema': [
{
'AttributeName': _DynamoDBFeatureStoreCore.PARTITION_KEY,
'KeyType': 'HASH',
},
{
'AttributeName': _DynamoDBFeatureStoreCore.SORT_KEY,
'KeyType': 'RANGE'
}
],
'AttributeDefinitions': [
{
'AttributeName': _DynamoDBFeatureStoreCore.PARTITION_KEY,
'AttributeType': 'S'
},
{
'AttributeName': _DynamoDBFeatureStoreCore.SORT_KEY,
'AttributeType': 'S'
}
],
'ProvisionedThroughput': {
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
}
client.create_table(**req)
while True:
try:
client.describe_table(TableName=self.table_name)
self.table_created = True
return
except client.exceptions.ResourceNotFoundException:
time.sleep(0.5)

def _clear_data(self):
client = boto3.client('dynamodb', **self.options)
delete_requests = []
req = {
'TableName': self.table_name,
'ConsistentRead': True,
'ProjectionExpression': '#namespace, #key',
'ExpressionAttributeNames': {
'#namespace': _DynamoDBFeatureStoreCore.PARTITION_KEY,
'#key': _DynamoDBFeatureStoreCore.SORT_KEY
}
}
for resp in client.get_paginator('scan').paginate(**req):
for item in resp['Items']:
delete_requests.append({ 'DeleteRequest': { 'Key': item } })
_DynamoDBHelpers.batch_write_requests(client, self.table_name, delete_requests)

def deprecated_redis_no_local_cache(self):
self.clear_redis_data()
return RedisFeatureStore(expiration=0)

params = [in_memory, redis_with_local_cache, redis_no_local_cache]
class TestFeatureStore:
params = [
InMemoryTester(),
RedisTester(CacheConfig.default()),
RedisTester(CacheConfig.disabled()),
RedisWithDeprecatedConstructorTester(CacheConfig.default()),
RedisWithDeprecatedConstructorTester(CacheConfig.disabled()),
DynamoDBTester(CacheConfig.default()),
DynamoDBTester(CacheConfig.disabled())
]

@pytest.fixture(params=params)
def store(self, request):
return request.param(self)
return request.param.init_store()

@staticmethod
def make_feature(key, ver):
@@ -71,6 +165,9 @@ def base_initialized_store(self, store):
})
return store

def test_not_initialized_before_init(self, store):
assert store.initialized is False

def test_initialized(self, store):
store = self.base_initialized_store(store)
assert store.initialized is True

0 comments on commit 3a1c2dc

Please sign in to comment.