Skip to content

Commit

Permalink
Convert functional tests to pytest
Browse files Browse the repository at this point in the history
  • Loading branch information
nateprewitt committed Sep 23, 2021
1 parent e836f35 commit 09041c4
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 117 deletions.
18 changes: 8 additions & 10 deletions tests/functional/docs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
class BaseDocsFunctionalTests(unittest.TestCase):
def assert_contains_lines_in_order(self, lines, contents):
for line in lines:
self.assertIn(line, contents)
assert line in contents
beginning = contents.find(line)
contents = contents[(beginning + len(line)):]

def get_class_document_block(self, class_name, contents):
start_class_document = '.. py:class:: %s' % class_name
start_index = contents.find(start_class_document)
self.assertNotEqual(start_index, -1, 'Class is not found in contents')
assert start_index != -1, 'Class is not found in contents'
contents = contents[start_index:]
end_index = contents.find(
' .. py:class::', len(start_class_document))
Expand All @@ -32,7 +32,7 @@ def get_class_document_block(self, class_name, contents):
def get_method_document_block(self, method_name, contents):
start_method_document = ' .. py:method:: %s(' % method_name
start_index = contents.find(start_method_document)
self.assertNotEqual(start_index, -1, 'Method is not found in contents')
assert start_index != -1, 'Method is not found in contents'
contents = contents[start_index:]
end_index = contents.find(
' .. py:method::', len(start_method_document))
Expand All @@ -41,8 +41,7 @@ def get_method_document_block(self, method_name, contents):
def get_request_syntax_document_block(self, contents):
start_marker = '**Request Syntax**'
start_index = contents.find(start_marker)
self.assertNotEqual(
start_index, -1, 'There is no request syntax section')
assert start_index != -1, 'There is no request syntax section'
contents = contents[start_index:]
end_index = contents.find(
':type', len(start_marker))
Expand All @@ -51,8 +50,7 @@ def get_request_syntax_document_block(self, contents):
def get_response_syntax_document_block(self, contents):
start_marker = '**Response Syntax**'
start_index = contents.find(start_marker)
self.assertNotEqual(
start_index, -1, 'There is no response syntax section')
assert start_index != -1, 'There is no response syntax section'
contents = contents[start_index:]
end_index = contents.find(
'**Response Structure**', len(start_marker))
Expand All @@ -61,19 +59,19 @@ def get_response_syntax_document_block(self, contents):
def get_request_parameter_document_block(self, param_name, contents):
start_param_document = ':type %s:' % param_name
start_index = contents.find(start_param_document)
self.assertNotEqual(start_index, -1, 'Param is not found in contents')
assert start_index != -1, 'Param is not found in contents'
contents = contents[start_index:]
end_index = contents.find(':type', len(start_param_document))
return contents[:end_index]

def get_response_parameter_document_block(self, param_name, contents):
start_param_document = '**Response Structure**'
start_index = contents.find(start_param_document)
self.assertNotEqual(start_index, -1, 'There is no response structure')
assert start_index != -1, 'There is no response structure'

start_param_document = '- **%s**' % param_name
start_index = contents.find(start_param_document)
self.assertNotEqual(start_index, -1, 'Param is not found in contents')
assert start_index != -1, 'Param is not found in contents'
contents = contents[start_index:]
end_index = contents.find('- **', len(start_param_document))
return contents[:end_index]
4 changes: 2 additions & 2 deletions tests/functional/dynamodb/test_stubber_conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_table_query_can_be_stubbed_with_expressions(self):
response = table.query(KeyConditionExpression=key_expr,
FilterExpression=filter_expr)

self.assertEqual(list(), response['Items'])
assert response['Items'] == []
stubber.assert_no_pending_responses()

def test_table_scan_can_be_stubbed_with_expressions(self):
Expand All @@ -59,5 +59,5 @@ def test_table_scan_can_be_stubbed_with_expressions(self):
with stubber:
response = table.scan(FilterExpression=filter_expr)

self.assertEqual(list(), response['Items'])
assert response['Items'] == []
stubber.assert_no_pending_responses()
2 changes: 1 addition & 1 deletion tests/functional/dynamodb/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def setUp(self):

def test_resource_has_batch_writer_added(self):
table = self.resource.Table('mytable')
self.assertTrue(hasattr(table, 'batch_writer'))
assert hasattr(table, 'batch_writer')

def test_operation_without_output(self):
table = self.resource.Table('mytable')
Expand Down
5 changes: 2 additions & 3 deletions tests/functional/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ def setUp(self):
self.ec2_resource = self.session.resource('ec2')

def test_can_use_collection_methods(self):
self.assertIsInstance(
self.ec2_resource.instances.all(), ResourceCollection)
assert isinstance(self.ec2_resource.instances.all(), ResourceCollection)

def test_can_chain_methods(self):
self.assertIsInstance(
assert isinstance(
self.ec2_resource.instances.all().page_size(5), ResourceCollection)
26 changes: 12 additions & 14 deletions tests/functional/test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@ def test_resource(self):
table.scan(FilterExpression=Attr('mykey').eq('myvalue'))
request = self.make_request_mock.call_args_list[0][0][1]
request_params = json.loads(request['body'].decode('utf-8'))
self.assertEqual(
request_params,
{'TableName': 'MyTable',
'FilterExpression': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'mykey'},
'ExpressionAttributeValues': {':v0': {'S': 'myvalue'}}}
)
assert request_params == {
'TableName': 'MyTable',
'FilterExpression': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'mykey'},
'ExpressionAttributeValues': {':v0': {'S': 'myvalue'}}
}

def test_client(self):
dynamodb = self.session.client('dynamodb')
Expand All @@ -62,10 +61,9 @@ def test_client(self):
)
request = self.make_request_mock.call_args_list[0][0][1]
request_params = json.loads(request['body'].decode('utf-8'))
self.assertEqual(
request_params,
{'TableName': 'MyTable',
'FilterExpression': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'mykey'},
'ExpressionAttributeValues': {':v0': {'S': 'myvalue'}}}
)
assert request_params == {
'TableName': 'MyTable',
'FilterExpression': '#n0 = :v0',
'ExpressionAttributeNames': {'#n0': 'mykey'},
'ExpressionAttributeValues': {':v0': {'S': 'myvalue'}}
}
5 changes: 2 additions & 3 deletions tests/functional/test_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ def setUp(self):
self.instance_resource = self.service_resource.Instance('i-abc123')

def test_delete_tags_injected(self):
self.assertTrue(hasattr(self.instance_resource, 'delete_tags'),
'delete_tags was not injected onto Instance resource.')
assert hasattr(self.instance_resource, 'delete_tags')

def test_delete_tags(self):
stubber = Stubber(self.instance_resource.meta.client)
stubber.add_response('delete_tags', {})
stubber.activate()
response = self.instance_resource.delete_tags(Tags=[{'Key': 'foo'}])
stubber.assert_no_pending_responses()
self.assertEqual(response, {})
assert response == {}
stubber.deactivate()

def test_mutating_filters(self):
Expand Down
14 changes: 7 additions & 7 deletions tests/functional/test_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest

import boto3
from boto3.exceptions import ResourceNotExistsError

Expand All @@ -35,21 +37,19 @@ def test_can_inject_method_onto_resource(self):
self.botocore_session.register('creating-resource-class.s3',
self.add_new_method(name='my_method'))
resource = session.resource('s3')
self.assertTrue(hasattr(resource, 'my_method'))
self.assertEqual(resource.my_method('anything'), 'anything')
assert hasattr(resource, 'my_method')
assert resource.my_method('anything') == 'anything'


class TestSessionErrorMessages(unittest.TestCase):
def test_has_good_error_message_when_no_resource(self):
bad_resource_name = 'doesnotexist'
err_regex = (
'%s.*resource does not exist.' % bad_resource_name
)
with self.assertRaisesRegex(ResourceNotExistsError, err_regex):
err_regex = f'{bad_resource_name}.*resource does not exist.'
with pytest.raises(ResourceNotExistsError, match=err_regex):
boto3.resource(bad_resource_name)


class TestGetAvailableSubresources(unittest.TestCase):
def test_s3_available_subresources_exists(self):
s3 = boto3.resource('s3')
self.assertTrue(hasattr(s3, 'get_available_subresources'))
assert hasattr(s3, 'get_available_subresources')
72 changes: 31 additions & 41 deletions tests/functional/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import pytest

from tests import unittest

import botocore
Expand All @@ -26,36 +28,26 @@ class TestS3MethodInjection(unittest.TestCase):
def test_transfer_methods_injected_to_client(self):
session = boto3.session.Session(region_name='us-west-2')
client = session.client('s3')
self.assertTrue(hasattr(client, 'upload_file'),
'upload_file was not injected onto S3 client')
self.assertTrue(hasattr(client, 'download_file'),
'download_file was not injected onto S3 client')
self.assertTrue(hasattr(client, 'copy'),
'copy was not injected onto S3 client')
assert hasattr(client, 'upload_file')
assert hasattr(client, 'download_file')
assert hasattr(client, 'copy')

def test_bucket_resource_has_load_method(self):
session = boto3.session.Session(region_name='us-west-2')
bucket = session.resource('s3').Bucket('fakebucket')
self.assertTrue(hasattr(bucket, 'load'),
'load() was not injected onto S3 Bucket resource.')
assert hasattr(bucket, 'load')

def test_transfer_methods_injected_to_bucket(self):
bucket = boto3.resource('s3').Bucket('my_bucket')
self.assertTrue(hasattr(bucket, 'upload_file'),
'upload_file was not injected onto S3 bucket')
self.assertTrue(hasattr(bucket, 'download_file'),
'download_file was not injected onto S3 bucket')
self.assertTrue(hasattr(bucket, 'copy'),
'copy was not injected onto S3 bucket')
assert hasattr(bucket, 'upload_file')
assert hasattr(bucket, 'download_file')
assert hasattr(bucket, 'copy')

def test_transfer_methods_injected_to_object(self):
obj = boto3.resource('s3').Object('my_bucket', 'my_key')
self.assertTrue(hasattr(obj, 'upload_file'),
'upload_file was not injected onto S3 object')
self.assertTrue(hasattr(obj, 'download_file'),
'download_file was not injected onto S3 object')
self.assertTrue(hasattr(obj, 'copy'),
'copy was not injected onto S3 object')
assert hasattr(obj, 'upload_file')
assert hasattr(obj, 'download_file')
assert hasattr(obj, 'copy')


class BaseTransferTest(unittest.TestCase):
Expand Down Expand Up @@ -208,22 +200,22 @@ def test_client_copy(self):
response = self.s3.meta.client.copy(
self.copy_source, self.bucket, self.key)
# The response will be none on a successful transfer.
self.assertIsNone(response)
assert response is None

def test_bucket_copy(self):
self.stub_single_part_copy()
bucket = self.s3.Bucket(self.bucket)
with self.stubber:
response = bucket.copy(self.copy_source, self.key)
# The response will be none on a successful transfer.
self.assertIsNone(response)
assert response is None

def test_object_copy(self):
self.stub_single_part_copy()
obj = self.s3.Object(self.bucket, self.key)
with self.stubber:
response = obj.copy(self.copy_source)
self.assertIsNone(response)
assert response is None

def test_copy_progress(self):
chunksize = 8 * (1024 ** 2)
Expand All @@ -243,8 +235,8 @@ def progress_callback(amount):

# Assert that the progress callback was called the correct number of
# times with the correct amounts.
self.assertEqual(self.progress_times_called, 3)
self.assertEqual(self.progress, chunksize * 3)
assert self.progress_times_called == 3
assert self.progress == chunksize * 3


class TestUploadFileobj(BaseTransferTest):
Expand Down Expand Up @@ -310,7 +302,7 @@ def test_client_upload(self):

def test_raises_value_error_on_invalid_fileobj(self):
with self.stubber:
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
self.s3.meta.client.upload_fileobj(
Fileobj='foo', Bucket=self.bucket, Key=self.key)

Expand Down Expand Up @@ -427,12 +419,12 @@ def test_client_download(self):
self.s3.meta.client.download_fileobj(
Bucket=self.bucket, Key=self.key, Fileobj=self.fileobj)

self.assertEqual(self.fileobj.getvalue(), self.contents)
assert self.fileobj.getvalue() == self.contents
self.stubber.assert_no_pending_responses()

def test_raises_value_error_on_invalid_fileobj(self):
with self.stubber:
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
self.s3.meta.client.download_fileobj(
Bucket=self.bucket, Key=self.key, Fileobj='foo')

Expand All @@ -442,7 +434,7 @@ def test_bucket_download(self):
with self.stubber:
bucket.download_fileobj(Key=self.key, Fileobj=self.fileobj)

self.assertEqual(self.fileobj.getvalue(), self.contents)
assert self.fileobj.getvalue() == self.contents
self.stubber.assert_no_pending_responses()

def test_object_download(self):
Expand All @@ -451,7 +443,7 @@ def test_object_download(self):
with self.stubber:
obj.download_fileobj(Fileobj=self.fileobj)

self.assertEqual(self.fileobj.getvalue(), self.contents)
assert self.fileobj.getvalue() == self.contents
self.stubber.assert_no_pending_responses()

def test_multipart_download(self):
Expand All @@ -467,7 +459,7 @@ def test_multipart_download(self):
Bucket=self.bucket, Key=self.key, Fileobj=self.fileobj,
Config=transfer_config)

self.assertEqual(self.fileobj.getvalue(), self.contents)
assert self.fileobj.getvalue() == self.contents
self.stubber.assert_no_pending_responses()

def test_download_progress(self):
Expand All @@ -489,8 +481,8 @@ def progress_callback(amount):

# Assert that the progress callback was called the correct number of
# times with the correct amounts.
self.assertEqual(self.progress_times_called, 11)
self.assertEqual(self.progress, 55)
assert self.progress_times_called == 11
assert self.progress == 55
self.stubber.assert_no_pending_responses()


Expand Down Expand Up @@ -520,19 +512,19 @@ def tearDown(self):
self.stubber.deactivate()

def test_has_load(self):
self.assertTrue(hasattr(self.obj_summary, 'load'),
'load() was not injected onto ObjectSummary resource.')
# Validate load was injected onto ObjectSummary.
assert hasattr(self.obj_summary, 'load')

def test_autoloads_correctly(self):
# In HeadObject the parameter returned is ContentLength, this
# should get mapped to Size of ListObject since the resource uses
# the shape returned to by ListObjects.
self.assertEqual(self.obj_summary.size, self.obj_summary_size)
assert self.obj_summary.size == self.obj_summary_size

def test_cannot_access_other_non_related_parameters(self):
# Even though an HeadObject was used to load this, it should
# only expose the attributes from its shape defined in ListObjects.
self.assertFalse(hasattr(self.obj_summary, 'content_length'))
assert not hasattr(self.obj_summary, 'content_length')


class TestServiceResource(unittest.TestCase):
Expand All @@ -542,7 +534,5 @@ def setUp(self):
def test_unsigned_signature_version_is_not_corrupted(self):
config = Config(signature_version=botocore.UNSIGNED)
resource = self.session.resource('s3', config=config)
self.assertIs(
resource.meta.client.meta.config.signature_version,
botocore.UNSIGNED
)
sig_version = resource.meta.client.meta.config.signature_version
assert sig_version is botocore.UNSIGNED
Loading

0 comments on commit 09041c4

Please sign in to comment.