From 1e38ac10afceb7a4b34ada8351e4c9552070f563 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Sat, 29 Dec 2018 15:22:39 -0800 Subject: [PATCH 1/3] add DynamoDB support --- .circleci/config.yml | 6 + dynamodb-requirements.txt | 1 + ldclient/dynamodb_feature_store.py | 191 +++++++++++++++++++++++++++++ ldclient/integrations.py | 25 +++- ldclient/redis_feature_store.py | 11 +- test-requirements.txt | 1 + testing/test_feature_store.py | 134 ++++++++++++++++---- 7 files changed, 345 insertions(+), 24 deletions(-) create mode 100644 dynamodb-requirements.txt create mode 100644 ldclient/dynamodb_feature_store.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 05cb973c..92699a3c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -40,28 +40,34 @@ jobs: docker: - image: circleci/python:2.7-jessie - image: redis + - image: amazon/dynamodb-local test-3.3: <<: *test-template docker: - image: circleci/python:3.3-jessie - image: redis + - image: amazon/dynamodb-local test-3.4: <<: *test-template docker: - image: circleci/python:3.4-jessie - image: redis + - image: amazon/dynamodb-local test-3.5: <<: *test-template docker: - image: circleci/python:3.5-jessie - image: redis + - image: amazon/dynamodb-local test-3.6: <<: *test-template docker: - image: circleci/python:3.6-jessie - image: redis + - image: amazon/dynamodb-local test-3.7: <<: *test-template docker: - image: circleci/python:3.7-stretch - image: redis + - image: amazon/dynamodb-local diff --git a/dynamodb-requirements.txt b/dynamodb-requirements.txt new file mode 100644 index 00000000..b72b66b6 --- /dev/null +++ b/dynamodb-requirements.txt @@ -0,0 +1 @@ +boto3>=1.9.71 diff --git a/ldclient/dynamodb_feature_store.py b/ldclient/dynamodb_feature_store.py new file mode 100644 index 00000000..f3879d71 --- /dev/null +++ b/ldclient/dynamodb_feature_store.py @@ -0,0 +1,191 @@ +import json + +have_dynamodb = False +try: + import boto3 + have_dynamodb = True +except ImportError: + pass + +from ldclient import log +from ldclient.feature_store import CacheConfig +from ldclient.feature_store_helpers import CachingStoreWrapper +from ldclient.interfaces import FeatureStore, FeatureStoreCore + +# +# Internal implementation of the DynamoDB feature store. +# +# Implementation notes: +# +# * Feature flags, segments, and any other kind of entity the LaunchDarkly client may wish +# to store, are all put in the same table. The only two required attributes are "key" (which +# is present in all storeable entities) and "namespace" (a parameter from the client that is +# used to disambiguate between flags and segments). +# +# * Because of DynamoDB's restrictions on attribute values (e.g. empty strings are not +# allowed), the standard DynamoDB marshaling mechanism with one attribute per object property +# is not used. Instead, the entire object is serialized to JSON and stored in a single +# attribute, "item". The "version" property is also stored as a separate attribute since it +# is used for updates. +# +# * Since DynamoDB doesn't have transactions, the init() method - which replaces the entire data +# store - is not atomic, so there can be a race condition if another process is adding new data +# via upsert(). To minimize this, we don't delete all the data at the start; instead, we update +# the items we've received, and then delete all other items. That could potentially result in +# deleting new data from another process, but that would be the case anyway if the init() +# happened to execute later than the upsert(); we are relying on the fact that normally the +# process that did the init() will also receive the new data shortly and do its own upsert(). +# +# * DynamoDB has a maximum item size of 400KB. Since each feature flag or user segment is +# stored as a single item, this mechanism will not work for extremely large flags or segments. +# + +class _DynamoDBFeatureStoreCore(FeatureStoreCore): + PARTITION_KEY = 'namespace' + SORT_KEY = 'key' + VERSION_ATTRIBUTE = 'version' + ITEM_JSON_ATTRIBUTE = 'item' + + def __init__(self, table_name, prefix, dynamodb_opts): + if not have_dynamodb: + raise NotImplementedError("Cannot use DynamoDB feature store because AWS SDK (boto3 package) is not installed") + self._table_name = table_name + self._prefix = None if prefix == "" else prefix + self._client = boto3.client('dynamodb', **dynamodb_opts) + + def init_internal(self, all_data): + # Start by reading the existing keys; we will later delete any of these that weren't in all_data. + unused_old_keys = self._read_existing_keys(all_data.keys()) + requests = [] + num_items = 0 + inited_key = self._inited_key() + + # Insert or update every provided item + for kind, items in all_data.items(): + for key, item in items.items(): + encoded_item = self._marshal_item(kind, item) + requests.append({ 'PutRequest': { 'Item': encoded_item } }) + combined_key = (self._namespace_for_kind(kind), key) + unused_old_keys.discard(combined_key) + num_items = num_items + 1 + + # Now delete any previously existing items whose keys were not in the current data + for combined_key in unused_old_keys: + if combined_key[0] != inited_key: + requests.append({ 'DeleteRequest': { 'Key': self._make_keys(combined_key[0], combined_key[1]) } }) + + # Now set the special key that we check in initialized_internal() + requests.append({ 'PutRequest': { 'Item': self._make_keys(inited_key, inited_key) } }) + + _DynamoDBHelpers.batch_write_requests(self._client, self._table_name, requests) + log.info('Initialized table %s with %d items', self._table_name, num_items) + + def get_internal(self, kind, key): + resp = self._get_item_by_keys(self._namespace_for_kind(kind), key) + return self._unmarshal_item(resp.get('Item')) + + def get_all_internal(self, kind): + items_out = {} + paginator = self._client.get_paginator('query') + for resp in paginator.paginate(**self._make_query_for_kind(kind)): + for item in resp['Items']: + item_out = self._unmarshal_item(item) + items_out[item_out['key']] = item_out + return items_out + + def upsert_internal(self, kind, item): + encoded_item = self._marshal_item(kind, item) + try: + req = { + 'TableName': self._table_name, + 'Item': encoded_item, + 'ConditionExpression': 'attribute_not_exists(#namespace) or attribute_not_exists(#key) or :version > #version', + 'ExpressionAttributeNames': { + '#namespace': self.PARTITION_KEY, + '#key': self.SORT_KEY, + '#version': self.VERSION_ATTRIBUTE + }, + 'ExpressionAttributeValues': { + ':version': { 'N': str(item['version']) } + } + } + self._client.put_item(**req) + except self._client.exceptions.ConditionalCheckFailedException: + # The item was not updated because there's a newer item in the database. We must now + # read the item that's in the database and return it, so CachingStoreWrapper can cache it. + return self.get_internal(kind, item['key']) + return item + + def initialized_internal(self): + resp = self._get_item_by_keys(self._inited_key(), self._inited_key()) + return resp.get('Item') is not None and len(resp['Item']) > 0 + + def _prefixed_namespace(self, base): + return base if self._prefix is None else (self._prefix + ':' + base) + + def _namespace_for_kind(self, kind): + return self._prefixed_namespace(kind.namespace) + + def _inited_key(self): + return self._prefixed_namespace('$inited') + + def _make_keys(self, namespace, key): + return { + self.PARTITION_KEY: { 'S': namespace }, + self.SORT_KEY: { 'S': key } + } + + def _make_query_for_kind(self, kind): + return { + 'TableName': self._table_name, + 'ConsistentRead': True, + 'KeyConditions': { + self.PARTITION_KEY: { + 'AttributeValueList': [ + { 'S': self._namespace_for_kind(kind) } + ], + 'ComparisonOperator': 'EQ' + } + } + } + + def _get_item_by_keys(self, namespace, key): + return self._client.get_item(TableName=self._table_name, Key=self._make_keys(namespace, key)) + + def _read_existing_keys(self, kinds): + keys = set() + for kind in kinds: + req = self._make_query_for_kind(kind) + req['ProjectionExpression'] = '#namespace, #key' + req['ExpressionAttributeNames'] = { + '#namespace': self.PARTITION_KEY, + '#key': self.SORT_KEY + } + paginator = self._client.get_paginator('query') + for resp in paginator.paginate(**req): + for item in resp['Items']: + namespace = item[self.PARTITION_KEY]['S'] + key = item[self.SORT_KEY]['S'] + keys.add((namespace, key)) + return keys + + def _marshal_item(self, kind, item): + json_str = json.dumps(item) + ret = self._make_keys(self._namespace_for_kind(kind), item['key']) + ret[self.VERSION_ATTRIBUTE] = { 'N': str(item['version']) } + ret[self.ITEM_JSON_ATTRIBUTE] = { 'S': json_str } + return ret + + def _unmarshal_item(self, item): + if item is None: + return None + json_attr = item.get(self.ITEM_JSON_ATTRIBUTE) + return None if json_attr is None else json.loads(json_attr['S']) + + +class _DynamoDBHelpers(object): + @staticmethod + def batch_write_requests(client, table_name, requests): + batch_size = 25 + for batch in (requests[i:i+batch_size] for i in xrange(0, len(requests), batch_size)): + client.batch_write_item(RequestItems={ table_name: batch }) diff --git a/ldclient/integrations.py b/ldclient/integrations.py index 86b5248d..80063389 100644 --- a/ldclient/integrations.py +++ b/ldclient/integrations.py @@ -1,10 +1,33 @@ from ldclient.feature_store import CacheConfig from ldclient.feature_store_helpers import CachingStoreWrapper +from ldclient.dynamodb_feature_store import _DynamoDBFeatureStoreCore from ldclient.redis_feature_store import _RedisFeatureStoreCore +class DynamoDB(object): + """Provides factory methods for integrations between the LaunchDarkly SDK and DynamoDB. + """ + + @staticmethod + def new_feature_store(table_name, + prefix=None, + dynamodb_opts={}, + caching=CacheConfig.default()): + """Creates a DynamoDB-backed implementation of `:class:ldclient.feature_store.FeatureStore`. + + :param string table_name: The name of an existing DynamoDB table + :param string prefix: An optional namespace prefix to be prepended to all Redis keys + :param dict dynamodb_opts: Optional parameters for configuring the DynamoDB client, as defined in + the boto3 API + :param CacheConfig caching: Specifies whether local caching should be enabled and if so, + sets the cache properties; defaults to `CacheConfig.default()` + """ + core = _DynamoDBFeatureStoreCore(table_name, prefix, dynamodb_opts) + return CachingStoreWrapper(core, caching) + + class Redis(object): - """Provides factory methods for integrations between the LaunchDarkly SDK and Redis, + """Provides factory methods for integrations between the LaunchDarkly SDK and Redis. """ DEFAULT_URL = 'redis://localhost:6379/0' DEFAULT_PREFIX = 'launchdarkly' diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index b9bdf731..02df0e57 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -1,6 +1,11 @@ import json -import redis +have_redis = False +try: + import redis + have_redis = True +except ImportError: + pass from ldclient import log from ldclient.feature_store import CacheConfig @@ -21,7 +26,8 @@ def __init__(self, max_connections=16, expiration=15, capacity=1000): - + if not have_redis: + raise NotImplementedError("Cannot use Redis feature store because redis package is not installed") self.core = _RedisFeatureStoreCore(url, prefix, max_connections) # exposed for testing self._wrapper = CachingStoreWrapper(self.core, CacheConfig(expiration=expiration, capacity=capacity)) @@ -47,6 +53,7 @@ def initialized(self): class _RedisFeatureStoreCore(FeatureStoreCore): def __init__(self, url, prefix, max_connections): + self._prefix = prefix self._pool = redis.ConnectionPool.from_url(url=url, max_connections=max_connections) self.test_update_hook = None # exposed for testing diff --git a/test-requirements.txt b/test-requirements.txt index 413ef355..88cbbc2e 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,7 @@ mock>=2.0.0 pytest>=2.8 redis>=2.10.5 +boto3>=1.9.71 coverage>=4.4 pytest-capturelog>=0.7 pytest-cov>=2.4.0 diff --git a/testing/test_feature_store.py b/testing/test_feature_store.py index 5716fa0e..003434b1 100644 --- a/testing/test_feature_store.py +++ b/testing/test_feature_store.py @@ -1,9 +1,12 @@ +import boto3 import json import pytest import redis +import time +from ldclient.dynamodb_feature_store import _DynamoDBFeatureStoreCore, _DynamoDBHelpers from ldclient.feature_store import CacheConfig, InMemoryFeatureStore -from ldclient.integrations import Redis +from ldclient.integrations import DynamoDB, Redis from ldclient.redis_feature_store import RedisFeatureStore from ldclient.versioned_data_kind import FEATURES @@ -16,38 +19,124 @@ def get_log_lines(caplog): return loglines -class TestFeatureStore: +class InMemoryTester(object): + def init_store(self): + return InMemoryFeatureStore() + + +class RedisTester(object): redis_host = 'localhost' redis_port = 6379 - def clear_redis_data(self): + def __init__(self, cache_config): + self._cache_config = cache_config + + def init_store(self): + self._clear_data() + return Redis.new_feature_store(caching=self._cache_config) + + def _clear_data(self): r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0) r.delete("launchdarkly:features") - def in_memory(self): - return InMemoryFeatureStore() - def redis_with_local_cache(self): - self.clear_redis_data() - return Redis.new_feature_store() - - def redis_no_local_cache(self): - self.clear_redis_data() - return Redis.new_feature_store(caching=CacheConfig.disabled()) - - def deprecated_redis_with_local_cache(self): - self.clear_redis_data() - return RedisFeatureStore() +class RedisWithDeprecatedConstructorTester(RedisTester): + def init_store(self): + self._clear_data() + return RedisFeatureStore(expiration=(30 if self._cache_config.enabled else 0)) + + +class DynamoDBTester(object): + table_name = 'LD_DYNAMODB_TEST_TABLE' + table_created = False + options = { 'endpoint_url': 'http://localhost:8000', 'region_name': 'us-east-1' } + + def __init__(self, cache_config): + self._cache_config = cache_config + + def init_store(self): + self._create_table() + self._clear_data() + return DynamoDB.new_feature_store(self.table_name, dynamodb_opts=self.options) + + def _create_table(self): + if self.table_created: + return + client = boto3.client('dynamodb', **self.options) + try: + client.describe_table(TableName=self.table_name) + self.table_created = True + return + except client.exceptions.ResourceNotFoundException: + pass + req = { + 'TableName': self.table_name, + 'KeySchema': [ + { + 'AttributeName': _DynamoDBFeatureStoreCore.PARTITION_KEY, + 'KeyType': 'HASH', + }, + { + 'AttributeName': _DynamoDBFeatureStoreCore.SORT_KEY, + 'KeyType': 'RANGE' + } + ], + 'AttributeDefinitions': [ + { + 'AttributeName': _DynamoDBFeatureStoreCore.PARTITION_KEY, + 'AttributeType': 'S' + }, + { + 'AttributeName': _DynamoDBFeatureStoreCore.SORT_KEY, + 'AttributeType': 'S' + } + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 1, + 'WriteCapacityUnits': 1 + } + } + client.create_table(**req) + while True: + try: + client.describe_table(TableName=self.table_name) + self.table_created = True + return + except client.exceptions.ResourceNotFoundException: + time.sleep(0.5) + + def _clear_data(self): + client = boto3.client('dynamodb', **self.options) + delete_requests = [] + req = { + 'TableName': self.table_name, + 'ConsistentRead': True, + 'ProjectionExpression': '#namespace, #key', + 'ExpressionAttributeNames': { + '#namespace': _DynamoDBFeatureStoreCore.PARTITION_KEY, + '#key': _DynamoDBFeatureStoreCore.SORT_KEY + } + } + for resp in client.get_paginator('scan').paginate(**req): + for item in resp['Items']: + delete_requests.append({ 'DeleteRequest': { 'Key': item } }) + _DynamoDBHelpers.batch_write_requests(client, self.table_name, delete_requests) - def deprecated_redis_no_local_cache(self): - self.clear_redis_data() - return RedisFeatureStore(expiration=0) - params = [in_memory, redis_with_local_cache, redis_no_local_cache] +class TestFeatureStore: + params = [ + InMemoryTester(), + RedisTester(CacheConfig.default()), + RedisTester(CacheConfig.disabled()), + RedisWithDeprecatedConstructorTester(CacheConfig.default()), + RedisWithDeprecatedConstructorTester(CacheConfig.disabled()), + DynamoDBTester(CacheConfig.default()), + DynamoDBTester(CacheConfig.disabled()) + ] @pytest.fixture(params=params) def store(self, request): - return request.param(self) + return request.param.init_store() @staticmethod def make_feature(key, ver): @@ -79,6 +168,9 @@ def base_initialized_store(self, store): }) return store + def test_not_initialized_before_init(self, store): + assert store.initialized is False + def test_initialized(self, store): store = self.base_initialized_store(store) assert store.initialized is True From 431dddf55ea9bdc16d1e15d680e519287ed14723 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Sat, 29 Dec 2018 15:25:52 -0800 Subject: [PATCH 2/3] add test credentials --- testing/test_feature_store.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/testing/test_feature_store.py b/testing/test_feature_store.py index 003434b1..229a0f40 100644 --- a/testing/test_feature_store.py +++ b/testing/test_feature_store.py @@ -49,7 +49,12 @@ def init_store(self): class DynamoDBTester(object): table_name = 'LD_DYNAMODB_TEST_TABLE' table_created = False - options = { 'endpoint_url': 'http://localhost:8000', 'region_name': 'us-east-1' } + options = { + 'aws_access_key_id': 'key', # not used by local DynamoDB, but still required + 'aws_secret_access_key': 'secret', + 'endpoint_url': 'http://localhost:8000', + 'region_name': 'us-east-1' + } def __init__(self, cache_config): self._cache_config = cache_config From 3aa5644edf5c5f65f201733c20bb21e924fd10ef Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 31 Dec 2018 11:34:53 -0800 Subject: [PATCH 3/3] link in comment --- ldclient/integrations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ldclient/integrations.py b/ldclient/integrations.py index 80063389..6102d354 100644 --- a/ldclient/integrations.py +++ b/ldclient/integrations.py @@ -18,7 +18,7 @@ def new_feature_store(table_name, :param string table_name: The name of an existing DynamoDB table :param string prefix: An optional namespace prefix to be prepended to all Redis keys :param dict dynamodb_opts: Optional parameters for configuring the DynamoDB client, as defined in - the boto3 API + the boto3 API; see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client :param CacheConfig caching: Specifies whether local caching should be enabled and if so, sets the cache properties; defaults to `CacheConfig.default()` """