diff --git a/tests/unit/plugins/modules/test_kinesis_stream.py b/tests/unit/plugins/modules/test_kinesis_stream.py deleted file mode 100644 index 88bff7687da..00000000000 --- a/tests/unit/plugins/modules/test_kinesis_stream.py +++ /dev/null @@ -1,330 +0,0 @@ -# Make coding more python3-ish -from __future__ import (absolute_import, division, print_function) -__metaclass__ = type - -import pytest -import unittest - -boto3 = pytest.importorskip("boto3") -botocore = pytest.importorskip("botocore") - -from ansible_collections.community.aws.plugins.modules import kinesis_stream - -aws_region = 'us-west-2' - - -class AnsibleKinesisStreamFunctions(unittest.TestCase): - - def test_convert_to_lower(self): - example = { - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE' - } - converted_example = kinesis_stream.convert_to_lower(example) - keys = list(converted_example.keys()) - keys.sort() - for i in range(len(keys)): - if i == 0: - self.assertEqual(keys[i], 'has_more_shards') - if i == 1: - self.assertEqual(keys[i], 'retention_period_hours') - if i == 2: - self.assertEqual(keys[i], 'stream_arn') - if i == 3: - self.assertEqual(keys[i], 'stream_name') - if i == 4: - self.assertEqual(keys[i], 'stream_status') - - def test_make_tags_in_aws_format(self): - example = { - 'env': 'development' - } - should_return = [ - { - 'Key': 'env', - 'Value': 'development' - } - ] - aws_tags = kinesis_stream.make_tags_in_aws_format(example) - self.assertEqual(aws_tags, should_return) - - def test_make_tags_in_proper_format(self): - example = [ - { - 'Key': 'env', - 'Value': 'development' - }, - { - 'Key': 'service', - 'Value': 'web' - } - ] - should_return = { - 'env': 'development', - 'service': 'web' - } - proper_tags = kinesis_stream.make_tags_in_proper_format(example) - self.assertEqual(proper_tags, should_return) - - def test_recreate_tags_from_list(self): - example = [('environment', 'development'), ('service', 'web')] - should_return = [ - { - 'Key': 'environment', - 'Value': 'development' - }, - { - 'Key': 'service', - 'Value': 'web' - } - ] - aws_tags = kinesis_stream.recreate_tags_from_list(example) - self.assertEqual(aws_tags, should_return) - - def test_get_tags(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg, tags = kinesis_stream.get_tags(client, 'test', check_mode=True) - self.assertTrue(success) - should_return = [ - { - 'Key': 'DryRunMode', - 'Value': 'true' - } - ] - self.assertEqual(tags, should_return) - - def test_find_stream(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg, stream = ( - kinesis_stream.find_stream(client, 'test', check_mode=True) - ) - should_return = { - 'OpenShardsCount': 5, - 'ClosedShardsCount': 0, - 'ShardsCount': 5, - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE', - 'EncryptionType': 'NONE' - } - self.assertTrue(success) - self.assertEqual(stream, should_return) - - def test_wait_for_status(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg, stream = ( - kinesis_stream.wait_for_status( - client, 'test', 'ACTIVE', check_mode=True - ) - ) - should_return = { - 'OpenShardsCount': 5, - 'ClosedShardsCount': 0, - 'ShardsCount': 5, - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE', - 'EncryptionType': 'NONE' - } - self.assertTrue(success) - self.assertEqual(stream, should_return) - - def test_tags_action_create(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'create', check_mode=True - ) - ) - self.assertTrue(success) - - def test_tags_action_delete(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'delete', check_mode=True - ) - ) - self.assertTrue(success) - - def test_tags_action_invalid(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, err_msg = ( - kinesis_stream.tags_action( - client, 'test', tags, 'append', check_mode=True - ) - ) - self.assertFalse(success) - - def test_update_tags(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, changed, err_msg = ( - kinesis_stream.update_tags( - client, 'test', tags, check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_create(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'create', check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_delete(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'delete', check_mode=True - ) - ) - self.assertTrue(success) - - def test_stream_action_invalid(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.stream_action( - client, 'test', 10, 'append', check_mode=True - ) - ) - self.assertFalse(success) - - def test_retention_action_increase(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 48, 'increase', check_mode=True - ) - ) - self.assertTrue(success) - - def test_retention_action_decrease(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 24, 'decrease', check_mode=True - ) - ) - self.assertTrue(success) - - def test_retention_action_invalid(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.retention_action( - client, 'test', 24, 'create', check_mode=True - ) - ) - self.assertFalse(success) - - def test_update_shard_count(self): - client = boto3.client('kinesis', region_name=aws_region) - success, err_msg = ( - kinesis_stream.update_shard_count( - client, 'test', 5, check_mode=True - ) - ) - self.assertTrue(success) - - def test_update(self): - client = boto3.client('kinesis', region_name=aws_region) - current_stream = { - 'OpenShardsCount': 5, - 'ClosedShardsCount': 0, - 'ShardsCount': 1, - 'HasMoreShards': True, - 'RetentionPeriodHours': 24, - 'StreamName': 'test', - 'StreamARN': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'StreamStatus': 'ACTIVE', - 'EncryptionType': 'NONE' - } - tags = { - 'env': 'development', - 'service': 'web' - } - success, changed, err_msg = ( - kinesis_stream.update( - client, current_stream, 'test', number_of_shards=2, retention_period=48, - tags=tags, check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') - - def test_create_stream(self): - client = boto3.client('kinesis', region_name=aws_region) - tags = { - 'env': 'development', - 'service': 'web' - } - success, changed, err_msg, results = ( - kinesis_stream.create_stream( - client, 'test', number_of_shards=10, retention_period=48, - tags=tags, check_mode=True - ) - ) - should_return = { - 'open_shards_count': 5, - 'closed_shards_count': 0, - 'shards_count': 5, - 'has_more_shards': True, - 'retention_period_hours': 24, - 'stream_name': 'test', - 'stream_arn': 'arn:aws:kinesis:east-side:123456789:stream/test', - 'stream_status': 'ACTIVE', - 'encryption_type': 'NONE', - 'tags': tags, - } - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(results, should_return) - self.assertEqual(err_msg, 'Kinesis Stream test updated successfully.') - - def test_enable_stream_encription(self): - client = boto3.client('kinesis', region_name=aws_region) - success, changed, err_msg, results = ( - kinesis_stream.start_stream_encryption( - client, 'test', encryption_type='KMS', key_id='', wait=True, wait_timeout=60, check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(err_msg, 'Kinesis Stream test encryption started successfully.') - - def test_dsbale_stream_encryption(self): - client = boto3.client('kinesis', region_name=aws_region) - success, changed, err_msg, results = ( - kinesis_stream.stop_stream_encryption( - client, 'test', encryption_type='KMS', key_id='', wait=True, wait_timeout=60, check_mode=True - ) - ) - self.assertTrue(success) - self.assertTrue(changed) - self.assertEqual(err_msg, 'Kinesis Stream test encryption stopped successfully.')