diff --git a/celery/backends/dynamodb.py b/celery/backends/dynamodb.py index 3695446d458..cbce6cb9711 100644 --- a/celery/backends/dynamodb.py +++ b/celery/backends/dynamodb.py @@ -51,9 +51,16 @@ class DynamoDBBackend(KeyValueStoreBackend): #: The endpoint URL that is passed to boto3 (local DynamoDB) (`default`) endpoint_url = None + #: Item time-to-live in seconds (`default`) + time_to_live_seconds = None + + # DynamoDB supports Time to Live as an auto-expiry mechanism. + supports_autoexpire = True + _key_field = DynamoDBAttribute(name='id', data_type='S') _value_field = DynamoDBAttribute(name='result', data_type='B') _timestamp_field = DynamoDBAttribute(name='timestamp', data_type='N') + _ttl_field = DynamoDBAttribute(name='ttl', data_type='N') _available_fields = None def __init__(self, url=None, table_name=None, *args, **kwargs): @@ -118,6 +125,18 @@ def __init__(self, url=None, table_name=None, *args, **kwargs): self.write_capacity_units ) ) + + ttl = query.get('ttl_seconds', self.time_to_live_seconds) + if ttl: + try: + self.time_to_live_seconds = int(ttl) + except ValueError as e: + logger.error( + 'TTL must be a number; got "{ttl}"', + exc_info=e + ) + raise e + self.table_name = table or self.table_name self._available_fields = ( @@ -153,6 +172,11 @@ def _get_client(self, access_key_id=None, secret_access_key=None): **client_parameters ) self._get_or_create_table() + + if self._has_ttl() is not None: + self._validate_ttl_methods() + self._set_table_ttl() + return self._client def _get_table_schema(self): @@ -206,6 +230,193 @@ def _get_or_create_table(self): else: raise e + def _has_ttl(self): + """Return the desired Time to Live config. + + - True: Enable TTL on the table; use expiry. + - False: Disable TTL on the table; don't use expiry. + - None: Ignore TTL on the table; don't use expiry. + """ + + return None if self.time_to_live_seconds is None \ + else self.time_to_live_seconds >= 0 + + def _validate_ttl_methods(self): + """Verify boto support for the DynamoDB Time to Live methods.""" + + # Required TTL methods. + required_methods = ( + 'update_time_to_live', + 'describe_time_to_live', + ) + + # Find missing methods. + missing_methods = [] + for method in list(required_methods): + if not hasattr(self._client, method): + missing_methods.append(method) + + if missing_methods: + logger.error( + ( + 'boto3 method(s) {methods} not found; ensure that ' + 'boto3>=1.9.178 and botocore>=1.12.178 are installed' + ).format( + methods=','.join(missing_methods) + ) + ) + raise AttributeError( + 'boto3 method(s) {methods} not found'.format( + methods=','.join(missing_methods) + ) + ) + + def _get_ttl_specification(self, ttl_attr_name): + """Get the boto3 structure describing the DynamoDB TTL specification.""" + + return { + 'TableName': self.table_name, + 'TimeToLiveSpecification': { + 'Enabled': self._has_ttl(), + 'AttributeName': ttl_attr_name + } + } + + def _get_table_ttl_description(self): + # Get the current TTL description. + try: + description = self._client.describe_time_to_live( + TableName=self.table_name + ) + status = description['TimeToLiveDescription']['TimeToLiveStatus'] + except ClientError as e: + error_code = e.response['Error'].get('Code', 'Unknown') + error_message = e.response['Error'].get('Message', 'Unknown') + logger.error(( + 'Error describing Time to Live on DynamoDB table {table}: ' + '{code}: {message}' + ).format( + table=self.table_name, + code=error_code, + message=error_message, + )) + raise e + + return description + + def _set_table_ttl(self): + """Enable or disable Time to Live on the table.""" + + # Get the table TTL description, and return early when possible. + description = self._get_table_ttl_description() + status = description['TimeToLiveDescription']['TimeToLiveStatus'] + if status in ('ENABLED', 'ENABLING'): + cur_attr_name = \ + description['TimeToLiveDescription']['AttributeName'] + if self._has_ttl(): + if cur_attr_name == self._ttl_field.name: + # We want TTL enabled, and it is currently enabled or being + # enabled, and on the correct attribute. + logger.debug(( + 'DynamoDB Time to Live is {situation} ' + 'on table {table}' + ).format( + situation='already enabled' \ + if status == 'ENABLED' \ + else 'currently being enabled', + table=self.table_name + )) + return description + + elif status in ('DISABLED', 'DISABLING'): + if not self._has_ttl(): + # We want TTL disabled, and it is currently disabled or being + # disabled. + logger.debug(( + 'DynamoDB Time to Live is {situation} ' + 'on table {table}' + ).format( + situation='already disabled' \ + if status == 'DISABLED' \ + else 'currently being disabled', + table=self.table_name + )) + return description + + # The state shouldn't ever have any value beyond the four handled + # above, but to ease troubleshooting of potential future changes, emit + # a log showing the unknown state. + else: # pragma: no cover + logger.warning(( + 'Unknown DynamoDB Time to Live status {status} ' + 'on table {table}. Attempting to continue.' + ).format( + status=status, + table=self.table_name + )) + + # At this point, we have one of the following situations: + # + # We want TTL enabled, + # + # - and it's currently disabled: Try to enable. + # + # - and it's being disabled: Try to enable, but this is almost sure to + # raise ValidationException with message: + # + # Time to live has been modified multiple times within a fixed + # interval + # + # - and it's currently enabling or being enabled, but on the wrong + # attribute: Try to enable, but this will raise ValidationException + # with message: + # + # TimeToLive is active on a different AttributeName: current + # AttributeName is ttlx + # + # We want TTL disabled, + # + # - and it's currently enabled: Try to disable. + # + # - and it's being enabled: Try to disable, but this is almost sure to + # raise ValidationException with message: + # + # Time to live has been modified multiple times within a fixed + # interval + # + attr_name = \ + cur_attr_name if status == 'ENABLED' else self._ttl_field.name + try: + specification = self._client.update_time_to_live( + **self._get_ttl_specification( + ttl_attr_name=attr_name + ) + ) + logger.info( + ( + 'DynamoDB table Time to Live updated: ' + 'table={table} enabled={enabled} attribute={attr}' + ).format( + table=self.table_name, + enabled=self._has_ttl(), + attr=self._ttl_field.name + ) + ) + return specification + except ClientError as e: + error_code = e.response['Error'].get('Code', 'Unknown') + error_message = e.response['Error'].get('Message', 'Unknown') + logger.error(( + 'Error {action} Time to Live on DynamoDB table {table}: ' + '{code}: {message}' + ).format( + action='enabling' if self._has_ttl() else 'disabling', + table=self.table_name, + code=error_code, + message=error_message, + )) + raise e + def _wait_for_table_status(self, expected='ACTIVE'): """Poll for the expected table status.""" achieved_state = False @@ -236,7 +447,8 @@ def _prepare_get_request(self, key): def _prepare_put_request(self, key, value): """Construct the item creation request parameters.""" - return { + timestamp = time() + put_request = { 'TableName': self.table_name, 'Item': { self._key_field.name: { @@ -246,10 +458,18 @@ def _prepare_put_request(self, key, value): self._value_field.data_type: value }, self._timestamp_field.name: { - self._timestamp_field.data_type: str(time()) + self._timestamp_field.data_type: str(timestamp) } } } + if self._has_ttl(): + put_request['Item'].update({ + self._ttl_field.name: { + self._ttl_field.data_type: + str(int(timestamp + self.time_to_live_seconds)) + } + }) + return put_request def _item_to_dict(self, raw_response): """Convert get_item() response to field-value pairs.""" diff --git a/docs/userguide/configuration.rst b/docs/userguide/configuration.rst index 05580cccc08..39739cfb599 100644 --- a/docs/userguide/configuration.rst +++ b/docs/userguide/configuration.rst @@ -1572,6 +1572,18 @@ The fields of the DynamoDB URL in ``result_backend`` are defined as follows: The Read & Write Capacity Units for the created DynamoDB table. Default is ``1`` for both read and write. More details can be found in the `Provisioned Throughput documentation `_. +#. ``ttl_seconds`` + + Time-to-live (in seconds) for results before they expire. The default is to + not expire results, while also leaving the DynamoDB table's Time to Live + settings untouched. If ``ttl_seconds`` is set to a positive value, results + will expire after the specified number of seconds. Setting ``ttl_seconds`` + to a negative value means to not expire results, and also to actively + disable the DynamoDB table's Time to Live setting. Note that trying to + change a table's Time to Live setting multiple times in quick succession + will cause a throttling error. More details can be found in the + `DynamoDB TTL documentation `_ + .. _conf-ironcache-result-backend: IronCache backend settings diff --git a/requirements/extras/dynamodb.txt b/requirements/extras/dynamodb.txt index 6d8caec075f..30e5f8e0f2b 100644 --- a/requirements/extras/dynamodb.txt +++ b/requirements/extras/dynamodb.txt @@ -1 +1 @@ -boto3>=1.9.125 +boto3>=1.9.178 diff --git a/t/unit/backends/test_dynamodb.py b/t/unit/backends/test_dynamodb.py index 98c55a56d78..7a04c82d4e2 100644 --- a/t/unit/backends/test_dynamodb.py +++ b/t/unit/backends/test_dynamodb.py @@ -38,6 +38,13 @@ def test_init_aws_credentials(self): url='dynamodb://a:@' ) + def test_init_invalid_ttl_seconds_raises(self): + with pytest.raises(ValueError): + DynamoDBBackend( + app=self.app, + url='dynamodb://@?ttl_seconds=1d' + ) + def test_get_client_explicit_endpoint(self): table_creation_path = \ 'celery.backends.dynamodb.DynamoDBBackend._get_or_create_table' @@ -95,6 +102,26 @@ def test_get_client_credentials(self): ) assert backend.aws_region == 'test' + @patch('boto3.client') + @patch('celery.backends.dynamodb.DynamoDBBackend._get_or_create_table') + @patch('celery.backends.dynamodb.DynamoDBBackend._validate_ttl_methods') + @patch('celery.backends.dynamodb.DynamoDBBackend._set_table_ttl') + def test_get_client_time_to_live_called( + self, + mock_set_table_ttl, + mock_validate_ttl_methods, + mock_get_or_create_table, + mock_boto_client, + ): + backend = DynamoDBBackend( + app=self.app, + url='dynamodb://key:secret@test?ttl_seconds=30' + ) + client = backend._get_client() + + mock_validate_ttl_methods.assert_called_once() + mock_set_table_ttl.assert_called_once() + def test_get_or_create_table_not_exists(self): self.backend._client = MagicMock() mock_create_table = self.backend._client.create_table = MagicMock() @@ -158,6 +185,214 @@ def test_wait_for_table_status(self): self.backend._wait_for_table_status(expected='SOME_STATE') assert mock_describe_table.call_count == 2 + def test_has_ttl_none_returns_none(self): + self.backend.time_to_live_seconds = None + assert self.backend._has_ttl() is None + + def test_has_ttl_lt_zero_returns_false(self): + self.backend.time_to_live_seconds = -1 + assert self.backend._has_ttl() is False + + def test_has_ttl_gte_zero_returns_true(self): + self.backend.time_to_live_seconds = 30 + assert self.backend._has_ttl() is True + + def test_validate_ttl_methods_present_returns_none(self): + self.backend._client = MagicMock() + assert self.backend._validate_ttl_methods() is None + + def test_validate_ttl_methods_missing_raise(self): + self.backend._client = MagicMock() + delattr(self.backend._client, 'describe_time_to_live') + delattr(self.backend._client, 'update_time_to_live') + + with pytest.raises(AttributeError): + self.backend._validate_ttl_methods() + + with pytest.raises(AttributeError): + self.backend._validate_ttl_methods() + + def test_set_table_ttl_describe_time_to_live_fails_raises(self): + from botocore.exceptions import ClientError + + self.backend.time_to_live_seconds = -1 + self.backend._client = MagicMock() + mock_describe_time_to_live = \ + self.backend._client.describe_time_to_live = MagicMock() + client_error = ClientError( + { + 'Error': { + 'Code': 'Foo', + 'Message': 'Bar', + } + }, + 'DescribeTimeToLive' + ) + mock_describe_time_to_live.side_effect = client_error + + with pytest.raises(ClientError): + self.backend._set_table_ttl() + + def test_set_table_ttl_enable_when_disabled_succeeds(self): + self.backend.time_to_live_seconds = 30 + self.backend._client = MagicMock() + mock_update_time_to_live = self.backend._client.update_time_to_live = \ + MagicMock() + + mock_describe_time_to_live = \ + self.backend._client.describe_time_to_live = MagicMock() + mock_describe_time_to_live.return_value = { + 'TimeToLiveDescription': { + 'TimeToLiveStatus': 'DISABLED', + 'AttributeName': self.backend._ttl_field.name + } + } + + res = self.backend._set_table_ttl() + mock_describe_time_to_live.assert_called_once_with( + TableName=self.backend.table_name + ) + mock_update_time_to_live.assert_called_once() + + def test_set_table_ttl_enable_when_enabled_with_correct_attr_succeeds(self): + self.backend.time_to_live_seconds = 30 + self.backend._client = MagicMock() + mock_update_time_to_live = self.backend._client.update_time_to_live = \ + MagicMock() + + mock_describe_time_to_live = \ + self.backend._client.describe_time_to_live = MagicMock() + mock_describe_time_to_live.return_value = { + 'TimeToLiveDescription': { + 'TimeToLiveStatus': 'ENABLED', + 'AttributeName': self.backend._ttl_field.name + } + } + + self.backend._set_table_ttl() + mock_describe_time_to_live.assert_called_once_with( + TableName=self.backend.table_name + ) + + def test_set_table_ttl_enable_when_currently_disabling_raises(self): + from botocore.exceptions import ClientError + + self.backend.time_to_live_seconds = 30 + self.backend._client = MagicMock() + mock_update_time_to_live = self.backend._client.update_time_to_live = \ + MagicMock() + client_error = ClientError( + { + 'Error': { + 'Code': 'ValidationException', + 'Message': ( + 'Time to live has been modified multiple times ' + 'within a fixed interval' + ) + } + }, + 'UpdateTimeToLive' + ) + mock_update_time_to_live.side_effect = client_error + + mock_describe_time_to_live = \ + self.backend._client.describe_time_to_live = MagicMock() + mock_describe_time_to_live.return_value = { + 'TimeToLiveDescription': { + 'TimeToLiveStatus': 'DISABLING', + 'AttributeName': self.backend._ttl_field.name + } + } + + with pytest.raises(ClientError): + self.backend._set_table_ttl() + + def test_set_table_ttl_enable_when_enabled_with_wrong_attr_raises(self): + from botocore.exceptions import ClientError + + self.backend.time_to_live_seconds = 30 + self.backend._client = MagicMock() + mock_update_time_to_live = self.backend._client.update_time_to_live = \ + MagicMock() + wrong_attr_name = self.backend._ttl_field.name + 'x' + client_error = ClientError( + { + 'Error': { + 'Code': 'ValidationException', + 'Message': ( + 'TimeToLive is active on a different AttributeName: ' + 'current AttributeName is {}' + ).format(wrong_attr_name) + } + }, + 'UpdateTimeToLive' + ) + mock_update_time_to_live.side_effect = client_error + mock_describe_time_to_live = \ + self.backend._client.describe_time_to_live = MagicMock() + + mock_describe_time_to_live.return_value = { + 'TimeToLiveDescription': { + 'TimeToLiveStatus': 'ENABLED', + 'AttributeName': self.backend._ttl_field.name + 'x' + } + } + + with pytest.raises(ClientError): + self.backend._set_table_ttl() + + def test_set_table_ttl_disable_when_disabled_succeeds(self): + self.backend.time_to_live_seconds = -1 + self.backend._client = MagicMock() + mock_update_time_to_live = self.backend._client.update_time_to_live = \ + MagicMock() + mock_describe_time_to_live = \ + self.backend._client.describe_time_to_live = MagicMock() + + mock_describe_time_to_live.return_value = { + 'TimeToLiveDescription': { + 'TimeToLiveStatus': 'DISABLED' + } + } + + self.backend._set_table_ttl() + mock_describe_time_to_live.assert_called_once_with( + TableName=self.backend.table_name + ) + + def test_set_table_ttl_disable_when_currently_enabling_raises(self): + from botocore.exceptions import ClientError + + self.backend.time_to_live_seconds = -1 + self.backend._client = MagicMock() + mock_update_time_to_live = self.backend._client.update_time_to_live = \ + MagicMock() + client_error = ClientError( + { + 'Error': { + 'Code': 'ValidationException', + 'Message': ( + 'Time to live has been modified multiple times ' + 'within a fixed interval' + ) + } + }, + 'UpdateTimeToLive' + ) + mock_update_time_to_live.side_effect = client_error + + mock_describe_time_to_live = \ + self.backend._client.describe_time_to_live = MagicMock() + mock_describe_time_to_live.return_value = { + 'TimeToLiveDescription': { + 'TimeToLiveStatus': 'ENABLING', + 'AttributeName': self.backend._ttl_field.name + } + } + + with pytest.raises(ClientError): + self.backend._set_table_ttl() + def test_prepare_get_request(self): expected = { 'TableName': u'celery', @@ -180,6 +415,25 @@ def test_prepare_put_request(self): result = self.backend._prepare_put_request('abcdef', 'val') assert result == expected + def test_prepare_put_request_with_ttl(self): + ttl = self.backend.time_to_live_seconds = 30 + expected = { + 'TableName': u'celery', + 'Item': { + u'id': {u'S': u'abcdef'}, + u'result': {u'B': u'val'}, + u'timestamp': { + u'N': str(Decimal(self._static_timestamp)) + }, + u'ttl': { + u'N': str(int(self._static_timestamp + ttl)) + } + } + } + with patch('celery.backends.dynamodb.time', self._mock_time): + result = self.backend._prepare_put_request('abcdef', 'val') + assert result == expected + def test_item_to_dict(self): boto_response = { 'Item': { @@ -236,6 +490,30 @@ def test_set(self): assert call_kwargs['Item'] == expected_kwargs['Item'] assert call_kwargs['TableName'] == 'celery' + def test_set_with_ttl(self): + ttl = self.backend.time_to_live_seconds = 30 + + self.backend._client = MagicMock() + self.backend._client.put_item = MagicMock() + + # should return None + with patch('celery.backends.dynamodb.time', self._mock_time): + assert self.backend.set(sentinel.key, sentinel.value) is None + + assert self.backend._client.put_item.call_count == 1 + _, call_kwargs = self.backend._client.put_item.call_args + expected_kwargs = { + 'Item': { + u'timestamp': {u'N': str(self._static_timestamp)}, + u'id': {u'S': string(sentinel.key)}, + u'result': {u'B': sentinel.value}, + u'ttl': {u'N': str(int(self._static_timestamp + ttl))}, + }, + 'TableName': 'celery' + } + assert call_kwargs['Item'] == expected_kwargs['Item'] + assert call_kwargs['TableName'] == 'celery' + def test_delete(self): self.backend._client = Mock(name='_client') mocked_delete = self.backend._client.delete = Mock('client.delete') @@ -255,10 +533,15 @@ def test_backend_by_url(self, url='dynamodb://'): assert url_ == url def test_backend_params_by_url(self): - self.app.conf.result_backend = \ - 'dynamodb://@us-east-1/celery_results?read=10&write=20' + self.app.conf.result_backend = ( + 'dynamodb://@us-east-1/celery_results' + '?read=10' + '&write=20' + '&ttl_seconds=600' + ) assert self.backend.aws_region == 'us-east-1' assert self.backend.table_name == 'celery_results' assert self.backend.read_capacity_units == 10 assert self.backend.write_capacity_units == 20 + assert self.backend.time_to_live_seconds == 600 assert self.backend.endpoint_url is None