diff --git a/celery/backends/dynamodb.py b/celery/backends/dynamodb.py index cbce6cb9711..3695446d458 100644 --- a/celery/backends/dynamodb.py +++ b/celery/backends/dynamodb.py @@ -51,16 +51,9 @@ class DynamoDBBackend(KeyValueStoreBackend): #: The endpoint URL that is passed to boto3 (local DynamoDB) (`default`) endpoint_url = None - #: Item time-to-live in seconds (`default`) - time_to_live_seconds = None - - # DynamoDB supports Time to Live as an auto-expiry mechanism. - supports_autoexpire = True - _key_field = DynamoDBAttribute(name='id', data_type='S') _value_field = DynamoDBAttribute(name='result', data_type='B') _timestamp_field = DynamoDBAttribute(name='timestamp', data_type='N') - _ttl_field = DynamoDBAttribute(name='ttl', data_type='N') _available_fields = None def __init__(self, url=None, table_name=None, *args, **kwargs): @@ -125,18 +118,6 @@ def __init__(self, url=None, table_name=None, *args, **kwargs): self.write_capacity_units ) ) - - ttl = query.get('ttl_seconds', self.time_to_live_seconds) - if ttl: - try: - self.time_to_live_seconds = int(ttl) - except ValueError as e: - logger.error( - 'TTL must be a number; got "{ttl}"', - exc_info=e - ) - raise e - self.table_name = table or self.table_name self._available_fields = ( @@ -172,11 +153,6 @@ def _get_client(self, access_key_id=None, secret_access_key=None): **client_parameters ) self._get_or_create_table() - - if self._has_ttl() is not None: - self._validate_ttl_methods() - self._set_table_ttl() - return self._client def _get_table_schema(self): @@ -230,193 +206,6 @@ def _get_or_create_table(self): else: raise e - def _has_ttl(self): - """Return the desired Time to Live config. - - - True: Enable TTL on the table; use expiry. - - False: Disable TTL on the table; don't use expiry. - - None: Ignore TTL on the table; don't use expiry. - """ - - return None if self.time_to_live_seconds is None \ - else self.time_to_live_seconds >= 0 - - def _validate_ttl_methods(self): - """Verify boto support for the DynamoDB Time to Live methods.""" - - # Required TTL methods. - required_methods = ( - 'update_time_to_live', - 'describe_time_to_live', - ) - - # Find missing methods. - missing_methods = [] - for method in list(required_methods): - if not hasattr(self._client, method): - missing_methods.append(method) - - if missing_methods: - logger.error( - ( - 'boto3 method(s) {methods} not found; ensure that ' - 'boto3>=1.9.178 and botocore>=1.12.178 are installed' - ).format( - methods=','.join(missing_methods) - ) - ) - raise AttributeError( - 'boto3 method(s) {methods} not found'.format( - methods=','.join(missing_methods) - ) - ) - - def _get_ttl_specification(self, ttl_attr_name): - """Get the boto3 structure describing the DynamoDB TTL specification.""" - - return { - 'TableName': self.table_name, - 'TimeToLiveSpecification': { - 'Enabled': self._has_ttl(), - 'AttributeName': ttl_attr_name - } - } - - def _get_table_ttl_description(self): - # Get the current TTL description. - try: - description = self._client.describe_time_to_live( - TableName=self.table_name - ) - status = description['TimeToLiveDescription']['TimeToLiveStatus'] - except ClientError as e: - error_code = e.response['Error'].get('Code', 'Unknown') - error_message = e.response['Error'].get('Message', 'Unknown') - logger.error(( - 'Error describing Time to Live on DynamoDB table {table}: ' - '{code}: {message}' - ).format( - table=self.table_name, - code=error_code, - message=error_message, - )) - raise e - - return description - - def _set_table_ttl(self): - """Enable or disable Time to Live on the table.""" - - # Get the table TTL description, and return early when possible. - description = self._get_table_ttl_description() - status = description['TimeToLiveDescription']['TimeToLiveStatus'] - if status in ('ENABLED', 'ENABLING'): - cur_attr_name = \ - description['TimeToLiveDescription']['AttributeName'] - if self._has_ttl(): - if cur_attr_name == self._ttl_field.name: - # We want TTL enabled, and it is currently enabled or being - # enabled, and on the correct attribute. - logger.debug(( - 'DynamoDB Time to Live is {situation} ' - 'on table {table}' - ).format( - situation='already enabled' \ - if status == 'ENABLED' \ - else 'currently being enabled', - table=self.table_name - )) - return description - - elif status in ('DISABLED', 'DISABLING'): - if not self._has_ttl(): - # We want TTL disabled, and it is currently disabled or being - # disabled. - logger.debug(( - 'DynamoDB Time to Live is {situation} ' - 'on table {table}' - ).format( - situation='already disabled' \ - if status == 'DISABLED' \ - else 'currently being disabled', - table=self.table_name - )) - return description - - # The state shouldn't ever have any value beyond the four handled - # above, but to ease troubleshooting of potential future changes, emit - # a log showing the unknown state. - else: # pragma: no cover - logger.warning(( - 'Unknown DynamoDB Time to Live status {status} ' - 'on table {table}. Attempting to continue.' - ).format( - status=status, - table=self.table_name - )) - - # At this point, we have one of the following situations: - # - # We want TTL enabled, - # - # - and it's currently disabled: Try to enable. - # - # - and it's being disabled: Try to enable, but this is almost sure to - # raise ValidationException with message: - # - # Time to live has been modified multiple times within a fixed - # interval - # - # - and it's currently enabling or being enabled, but on the wrong - # attribute: Try to enable, but this will raise ValidationException - # with message: - # - # TimeToLive is active on a different AttributeName: current - # AttributeName is ttlx - # - # We want TTL disabled, - # - # - and it's currently enabled: Try to disable. - # - # - and it's being enabled: Try to disable, but this is almost sure to - # raise ValidationException with message: - # - # Time to live has been modified multiple times within a fixed - # interval - # - attr_name = \ - cur_attr_name if status == 'ENABLED' else self._ttl_field.name - try: - specification = self._client.update_time_to_live( - **self._get_ttl_specification( - ttl_attr_name=attr_name - ) - ) - logger.info( - ( - 'DynamoDB table Time to Live updated: ' - 'table={table} enabled={enabled} attribute={attr}' - ).format( - table=self.table_name, - enabled=self._has_ttl(), - attr=self._ttl_field.name - ) - ) - return specification - except ClientError as e: - error_code = e.response['Error'].get('Code', 'Unknown') - error_message = e.response['Error'].get('Message', 'Unknown') - logger.error(( - 'Error {action} Time to Live on DynamoDB table {table}: ' - '{code}: {message}' - ).format( - action='enabling' if self._has_ttl() else 'disabling', - table=self.table_name, - code=error_code, - message=error_message, - )) - raise e - def _wait_for_table_status(self, expected='ACTIVE'): """Poll for the expected table status.""" achieved_state = False @@ -447,8 +236,7 @@ def _prepare_get_request(self, key): def _prepare_put_request(self, key, value): """Construct the item creation request parameters.""" - timestamp = time() - put_request = { + return { 'TableName': self.table_name, 'Item': { self._key_field.name: { @@ -458,18 +246,10 @@ def _prepare_put_request(self, key, value): self._value_field.data_type: value }, self._timestamp_field.name: { - self._timestamp_field.data_type: str(timestamp) + self._timestamp_field.data_type: str(time()) } } } - if self._has_ttl(): - put_request['Item'].update({ - self._ttl_field.name: { - self._ttl_field.data_type: - str(int(timestamp + self.time_to_live_seconds)) - } - }) - return put_request def _item_to_dict(self, raw_response): """Convert get_item() response to field-value pairs.""" diff --git a/docs/userguide/configuration.rst b/docs/userguide/configuration.rst index 39739cfb599..05580cccc08 100644 --- a/docs/userguide/configuration.rst +++ b/docs/userguide/configuration.rst @@ -1572,18 +1572,6 @@ The fields of the DynamoDB URL in ``result_backend`` are defined as follows: The Read & Write Capacity Units for the created DynamoDB table. Default is ``1`` for both read and write. More details can be found in the `Provisioned Throughput documentation `_. -#. ``ttl_seconds`` - - Time-to-live (in seconds) for results before they expire. The default is to - not expire results, while also leaving the DynamoDB table's Time to Live - settings untouched. If ``ttl_seconds`` is set to a positive value, results - will expire after the specified number of seconds. Setting ``ttl_seconds`` - to a negative value means to not expire results, and also to actively - disable the DynamoDB table's Time to Live setting. Note that trying to - change a table's Time to Live setting multiple times in quick succession - will cause a throttling error. More details can be found in the - `DynamoDB TTL documentation `_ - .. _conf-ironcache-result-backend: IronCache backend settings diff --git a/requirements/extras/dynamodb.txt b/requirements/extras/dynamodb.txt index 30e5f8e0f2b..6d8caec075f 100644 --- a/requirements/extras/dynamodb.txt +++ b/requirements/extras/dynamodb.txt @@ -1 +1 @@ -boto3>=1.9.178 +boto3>=1.9.125 diff --git a/t/unit/backends/test_dynamodb.py b/t/unit/backends/test_dynamodb.py index 7a04c82d4e2..98c55a56d78 100644 --- a/t/unit/backends/test_dynamodb.py +++ b/t/unit/backends/test_dynamodb.py @@ -38,13 +38,6 @@ def test_init_aws_credentials(self): url='dynamodb://a:@' ) - def test_init_invalid_ttl_seconds_raises(self): - with pytest.raises(ValueError): - DynamoDBBackend( - app=self.app, - url='dynamodb://@?ttl_seconds=1d' - ) - def test_get_client_explicit_endpoint(self): table_creation_path = \ 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table' @@ -102,26 +95,6 @@ def test_get_client_credentials(self): ) assert backend.aws_region == 'test' - @patch('boto3.client') - @patch('celery.backends.dynamodb.DynamoDBBackend._get_or_create_table') - @patch('celery.backends.dynamodb.DynamoDBBackend._validate_ttl_methods') - @patch('celery.backends.dynamodb.DynamoDBBackend._set_table_ttl') - def test_get_client_time_to_live_called( - self, - mock_set_table_ttl, - mock_validate_ttl_methods, - mock_get_or_create_table, - mock_boto_client, - ): - backend = DynamoDBBackend( - app=self.app, - url='dynamodb://key:secret@test?ttl_seconds=30' - ) - client = backend._get_client() - - mock_validate_ttl_methods.assert_called_once() - mock_set_table_ttl.assert_called_once() - def test_get_or_create_table_not_exists(self): self.backend._client = MagicMock() mock_create_table = self.backend._client.create_table = MagicMock() @@ -185,214 +158,6 @@ def test_wait_for_table_status(self): self.backend._wait_for_table_status(expected='SOME_STATE') assert mock_describe_table.call_count == 2 - def test_has_ttl_none_returns_none(self): - self.backend.time_to_live_seconds = None - assert self.backend._has_ttl() is None - - def test_has_ttl_lt_zero_returns_false(self): - self.backend.time_to_live_seconds = -1 - assert self.backend._has_ttl() is False - - def test_has_ttl_gte_zero_returns_true(self): - self.backend.time_to_live_seconds = 30 - assert self.backend._has_ttl() is True - - def test_validate_ttl_methods_present_returns_none(self): - self.backend._client = MagicMock() - assert self.backend._validate_ttl_methods() is None - - def test_validate_ttl_methods_missing_raise(self): - self.backend._client = MagicMock() - delattr(self.backend._client, 'describe_time_to_live') - delattr(self.backend._client, 'update_time_to_live') - - with pytest.raises(AttributeError): - self.backend._validate_ttl_methods() - - with pytest.raises(AttributeError): - self.backend._validate_ttl_methods() - - def test_set_table_ttl_describe_time_to_live_fails_raises(self): - from botocore.exceptions import ClientError - - self.backend.time_to_live_seconds = -1 - self.backend._client = MagicMock() - mock_describe_time_to_live = \ - self.backend._client.describe_time_to_live = MagicMock() - client_error = ClientError( - { - 'Error': { - 'Code': 'Foo', - 'Message': 'Bar', - } - }, - 'DescribeTimeToLive' - ) - mock_describe_time_to_live.side_effect = client_error - - with pytest.raises(ClientError): - self.backend._set_table_ttl() - - def test_set_table_ttl_enable_when_disabled_succeeds(self): - self.backend.time_to_live_seconds = 30 - self.backend._client = MagicMock() - mock_update_time_to_live = self.backend._client.update_time_to_live = \ - MagicMock() - - mock_describe_time_to_live = \ - self.backend._client.describe_time_to_live = MagicMock() - mock_describe_time_to_live.return_value = { - 'TimeToLiveDescription': { - 'TimeToLiveStatus': 'DISABLED', - 'AttributeName': self.backend._ttl_field.name - } - } - - res = self.backend._set_table_ttl() - mock_describe_time_to_live.assert_called_once_with( - TableName=self.backend.table_name - ) - mock_update_time_to_live.assert_called_once() - - def test_set_table_ttl_enable_when_enabled_with_correct_attr_succeeds(self): - self.backend.time_to_live_seconds = 30 - self.backend._client = MagicMock() - mock_update_time_to_live = self.backend._client.update_time_to_live = \ - MagicMock() - - mock_describe_time_to_live = \ - self.backend._client.describe_time_to_live = MagicMock() - mock_describe_time_to_live.return_value = { - 'TimeToLiveDescription': { - 'TimeToLiveStatus': 'ENABLED', - 'AttributeName': self.backend._ttl_field.name - } - } - - self.backend._set_table_ttl() - mock_describe_time_to_live.assert_called_once_with( - TableName=self.backend.table_name - ) - - def test_set_table_ttl_enable_when_currently_disabling_raises(self): - from botocore.exceptions import ClientError - - self.backend.time_to_live_seconds = 30 - self.backend._client = MagicMock() - mock_update_time_to_live = self.backend._client.update_time_to_live = \ - MagicMock() - client_error = ClientError( - { - 'Error': { - 'Code': 'ValidationException', - 'Message': ( - 'Time to live has been modified multiple times ' - 'within a fixed interval' - ) - } - }, - 'UpdateTimeToLive' - ) - mock_update_time_to_live.side_effect = client_error - - mock_describe_time_to_live = \ - self.backend._client.describe_time_to_live = MagicMock() - mock_describe_time_to_live.return_value = { - 'TimeToLiveDescription': { - 'TimeToLiveStatus': 'DISABLING', - 'AttributeName': self.backend._ttl_field.name - } - } - - with pytest.raises(ClientError): - self.backend._set_table_ttl() - - def test_set_table_ttl_enable_when_enabled_with_wrong_attr_raises(self): - from botocore.exceptions import ClientError - - self.backend.time_to_live_seconds = 30 - self.backend._client = MagicMock() - mock_update_time_to_live = self.backend._client.update_time_to_live = \ - MagicMock() - wrong_attr_name = self.backend._ttl_field.name + 'x' - client_error = ClientError( - { - 'Error': { - 'Code': 'ValidationException', - 'Message': ( - 'TimeToLive is active on a different AttributeName: ' - 'current AttributeName is {}' - ).format(wrong_attr_name) - } - }, - 'UpdateTimeToLive' - ) - mock_update_time_to_live.side_effect = client_error - mock_describe_time_to_live = \ - self.backend._client.describe_time_to_live = MagicMock() - - mock_describe_time_to_live.return_value = { - 'TimeToLiveDescription': { - 'TimeToLiveStatus': 'ENABLED', - 'AttributeName': self.backend._ttl_field.name + 'x' - } - } - - with pytest.raises(ClientError): - self.backend._set_table_ttl() - - def test_set_table_ttl_disable_when_disabled_succeeds(self): - self.backend.time_to_live_seconds = -1 - self.backend._client = MagicMock() - mock_update_time_to_live = self.backend._client.update_time_to_live = \ - MagicMock() - mock_describe_time_to_live = \ - self.backend._client.describe_time_to_live = MagicMock() - - mock_describe_time_to_live.return_value = { - 'TimeToLiveDescription': { - 'TimeToLiveStatus': 'DISABLED' - } - } - - self.backend._set_table_ttl() - mock_describe_time_to_live.assert_called_once_with( - TableName=self.backend.table_name - ) - - def test_set_table_ttl_disable_when_currently_enabling_raises(self): - from botocore.exceptions import ClientError - - self.backend.time_to_live_seconds = -1 - self.backend._client = MagicMock() - mock_update_time_to_live = self.backend._client.update_time_to_live = \ - MagicMock() - client_error = ClientError( - { - 'Error': { - 'Code': 'ValidationException', - 'Message': ( - 'Time to live has been modified multiple times ' - 'within a fixed interval' - ) - } - }, - 'UpdateTimeToLive' - ) - mock_update_time_to_live.side_effect = client_error - - mock_describe_time_to_live = \ - self.backend._client.describe_time_to_live = MagicMock() - mock_describe_time_to_live.return_value = { - 'TimeToLiveDescription': { - 'TimeToLiveStatus': 'ENABLING', - 'AttributeName': self.backend._ttl_field.name - } - } - - with pytest.raises(ClientError): - self.backend._set_table_ttl() - def test_prepare_get_request(self): expected = { 'TableName': u'celery', @@ -415,25 +180,6 @@ def test_prepare_put_request(self): result = self.backend._prepare_put_request('abcdef', 'val') assert result == expected - def test_prepare_put_request_with_ttl(self): - ttl = self.backend.time_to_live_seconds = 30 - expected = { - 'TableName': u'celery', - 'Item': { - u'id': {u'S': u'abcdef'}, - u'result': {u'B': u'val'}, - u'timestamp': { - u'N': str(Decimal(self._static_timestamp)) - }, - u'ttl': { - u'N': str(int(self._static_timestamp + ttl)) - } - } - } - with patch('celery.backends.dynamodb.time', self._mock_time): - result = self.backend._prepare_put_request('abcdef', 'val') - assert result == expected - def test_item_to_dict(self): boto_response = { 'Item': { @@ -490,30 +236,6 @@ def test_set(self): assert call_kwargs['Item'] == expected_kwargs['Item'] assert call_kwargs['TableName'] == 'celery' - def test_set_with_ttl(self): - ttl = self.backend.time_to_live_seconds = 30 - - self.backend._client = MagicMock() - self.backend._client.put_item = MagicMock() - - # should return None - with patch('celery.backends.dynamodb.time', self._mock_time): - assert self.backend.set(sentinel.key, sentinel.value) is None - - assert self.backend._client.put_item.call_count == 1 - _, call_kwargs = self.backend._client.put_item.call_args - expected_kwargs = { - 'Item': { - u'timestamp': {u'N': str(self._static_timestamp)}, - u'id': {u'S': string(sentinel.key)}, - u'result': {u'B': sentinel.value}, - u'ttl': {u'N': str(int(self._static_timestamp + ttl))}, - }, - 'TableName': 'celery' - } - assert call_kwargs['Item'] == expected_kwargs['Item'] - assert call_kwargs['TableName'] == 'celery' - def test_delete(self): self.backend._client = Mock(name='_client') mocked_delete = self.backend._client.delete = Mock('client.delete') @@ -533,15 +255,10 @@ def test_backend_by_url(self, url='dynamodb://'): assert url_ == url def test_backend_params_by_url(self): - self.app.conf.result_backend = ( - 'dynamodb://@us-east-1/celery_results' - '?read=10' - '&write=20' - '&ttl_seconds=600' - ) + self.app.conf.result_backend = \ + 'dynamodb://@us-east-1/celery_results?read=10&write=20' assert self.backend.aws_region == 'us-east-1' assert self.backend.table_name == 'celery_results' assert self.backend.read_capacity_units == 10 assert self.backend.write_capacity_units == 20 - assert self.backend.time_to_live_seconds == 600 assert self.backend.endpoint_url is None