From 0b826ac0e4eb0a93d57dc2565b61bcc80fbb3916 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?McCoy=20Pati=C3=B1o?= Date: Mon, 23 May 2022 17:29:21 -0700 Subject: [PATCH] wip; Update test_change_feed --- .../tests/conftest.py | 13 +- .../tests/test_change_feed.py | 119 ++++++++++-------- 2 files changed, 72 insertions(+), 60 deletions(-) diff --git a/sdk/storage/azure-storage-blob-changefeed/tests/conftest.py b/sdk/storage/azure-storage-blob-changefeed/tests/conftest.py index 321cda36dfe8d..418b5c0a024ac 100644 --- a/sdk/storage/azure-storage-blob-changefeed/tests/conftest.py +++ b/sdk/storage/azure-storage-blob-changefeed/tests/conftest.py @@ -23,12 +23,13 @@ # IN THE SOFTWARE. # # -------------------------------------------------------------------------- -import platform -import sys +import pytest + +from devtools_testutils import test_proxy # fixture needs to be visible from conftest -# Ignore async tests for Python < 3.5 -collect_ignore_glob = [] -if sys.version_info < (3, 5) or platform.python_implementation() == "PyPy": - collect_ignore_glob.append("*_async.py") + +@pytest.fixture(scope="session", autouse=True) +def start_proxy(test_proxy): + return diff --git a/sdk/storage/azure-storage-blob-changefeed/tests/test_change_feed.py b/sdk/storage/azure-storage-blob-changefeed/tests/test_change_feed.py index 04b860916f9b4..5f994bd59bbc8 100644 --- a/sdk/storage/azure-storage-blob-changefeed/tests/test_change_feed.py +++ b/sdk/storage/azure-storage-blob-changefeed/tests/test_change_feed.py @@ -19,14 +19,16 @@ except ImportError: from azure.storage.blob.changefeed import ChangeFeedClient -from devtools_testutils.storage import StorageTestCase +from devtools_testutils import recorded_by_proxy +from devtools_testutils.storage import StorageRecordedTestCase from settings.testcase import ChangeFeedPreparer @pytest.mark.playback_test_only -class StorageChangeFeedTest(StorageTestCase): +class StorageChangeFeedTest(StorageRecordedTestCase): # --Test cases for change feed ----------------------------------------- @ChangeFeedPreparer() + @recorded_by_proxy def test_get_change_feed_events_by_page(self, storage_account_name, storage_account_key): cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key) results_per_page = 10 @@ -41,9 +43,9 @@ def test_get_change_feed_events_by_page(self, storage_account_name, storage_acco events_per_page2 = list(change_feed_page2) # Assert - self.assertEqual(len(events_per_page1), results_per_page) - self.assertEqual(len(events_per_page2), results_per_page) - self.assertNotEqual(events_per_page1[results_per_page-1]['id'], events_per_page2[0]['id']) + assert len(events_per_page1) == results_per_page + assert len(events_per_page2) == results_per_page + assert events_per_page1[results_per_page-1]['id'] != events_per_page2[0]['id'] # Merge the two small pages into one events_per_page1.extend(events_per_page2) @@ -56,16 +58,17 @@ def test_get_change_feed_events_by_page(self, storage_account_name, storage_acco # Assert # getting two pages separately gives the same result as getting the big page at once for i in range(0, len(one_page)): - self.assertTrue(merged_two_pages[i].get('id') == one_page[i].get('id')) + assert merged_two_pages[i].get('id') == one_page[i].get('id') @ChangeFeedPreparer() + @recorded_by_proxy def test_get_all_change_feed_events(self, storage_account_name, storage_account_key): cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key) change_feed = cf_client.list_changes() all_events = list(change_feed) total_events = len(all_events) - self.assertTrue(len(all_events) > 0) + assert len(all_events) > 0 results_per_page = 500 change_feed_by_page = cf_client.list_changes(results_per_page=results_per_page).by_page() @@ -75,10 +78,11 @@ def test_get_all_change_feed_events(self, storage_account_name, storage_account_ page_size = len(list(page)) event_number_in_all_pages += page_size - self.assertEqual(ceil(len(all_events)*1.0/results_per_page), len(pages)) - self.assertEqual(total_events, event_number_in_all_pages) + assert ceil(len(all_events)*1.0/results_per_page) == len(pages) + assert total_events == event_number_in_all_pages @ChangeFeedPreparer() + @recorded_by_proxy def test_get_change_feed_events_with_continuation_token(self, storage_account_name, storage_account_key): cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key) # To get the total events number @@ -101,9 +105,10 @@ def test_get_change_feed_events_with_continuation_token(self, storage_account_na rest_events.extend(list(page)) # Assert the - self.assertEqual(total_events, len(events_per_page1) + len(rest_events)) + assert total_events == len(events_per_page1) + len(rest_events) @ChangeFeedPreparer() + @recorded_by_proxy def test_get_change_feed_events_in_a_time_range(self, storage_account_name, storage_account_key): cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key) start_time = datetime(2020, 8, 12) @@ -114,18 +119,20 @@ def test_get_change_feed_events_in_a_time_range(self, storage_account_name, stor page1 = next(change_feed) events = list(page1) - self.assertIsNot(len(events), 0) + assert len(events) != 0 @ChangeFeedPreparer() + @recorded_by_proxy def test_change_feed_does_not_fail_on_empty_event_stream(self, storage_account_name, storage_account_key): cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key) start_time = datetime(2300, 8, 19) change_feed = cf_client.list_changes(start_time=start_time) events = list(change_feed) - self.assertEqual(len(events), 0) + assert len(events) == 0 @ChangeFeedPreparer() + @recorded_by_proxy def test_read_change_feed_tail_where_3_shards_have_data(self, storage_account_name, storage_account_key): cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key) @@ -144,12 +151,12 @@ def test_read_change_feed_tail_where_3_shards_have_data(self, storage_account_na token = change_feed.continuation_token dict_token = json.loads(token) - self.assertTrue(len(events) > 0) - self.assertEqual(dict_token['CursorVersion'], 1) - self.assertIsNotNone(dict_token['UrlHost']) - self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3) - self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath']) - self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath']) + assert len(events) > 0 + assert dict_token['CursorVersion'] == 1 + assert dict_token['UrlHost'] is not None + assert len(dict_token['CurrentSegmentCursor']['ShardCursors']) == 3 + assert dict_token['CurrentSegmentCursor']['SegmentPath'] is not None + assert dict_token['CurrentSegmentCursor']['CurrentShardPath'] is not None if self.is_live: sleep(120) @@ -162,7 +169,7 @@ def test_read_change_feed_tail_where_3_shards_have_data(self, storage_account_na for event in change_feed_page2: events2.append(event) - self.assertNotEqual(events2, 0) + assert events2 != 0 if self.is_live: sleep(120) @@ -171,19 +178,20 @@ def test_read_change_feed_tail_where_3_shards_have_data(self, storage_account_na # restart using the continuation token which has Non-zero EventIndex for 3 shards token2 = change_feed2.continuation_token dict_token2 = json.loads(token2) - self.assertEqual(len(dict_token2['CurrentSegmentCursor']['ShardCursors']), 3) - self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][0]['EventIndex'], 0) - self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][1]['EventIndex'], 0) - self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][2]['EventIndex'], 0) + assert len(dict_token2['CurrentSegmentCursor']['ShardCursors']) == 3 + assert dict_token2['CurrentSegmentCursor']['ShardCursors'][0]['EventIndex'] != 0 + assert dict_token2['CurrentSegmentCursor']['ShardCursors'][1]['EventIndex'] != 0 + assert dict_token2['CurrentSegmentCursor']['ShardCursors'][2]['EventIndex'] != 0 change_feed3 = cf_client.list_changes(results_per_page=57).by_page(continuation_token=token2) change_feed_page3 = next(change_feed3) events3 = list() for event in change_feed_page3: events3.append(event) - self.assertNotEqual(events2, 0) + assert events2 != 0 @ChangeFeedPreparer() + @recorded_by_proxy def test_read_change_feed_tail_where_only_1_shard_has_data(self, storage_account_name, storage_account_key): cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key) @@ -203,12 +211,12 @@ def test_read_change_feed_tail_where_only_1_shard_has_data(self, storage_account token = change_feed.continuation_token dict_token = json.loads(token) - self.assertEqual(len(events_on_first_page), 3) - self.assertEqual(dict_token['CursorVersion'], 1) - self.assertIsNotNone(dict_token['UrlHost']) - self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3) - self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath']) - self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath']) + assert len(events_on_first_page) == 3 + assert dict_token['CursorVersion'] == 1 + assert dict_token['UrlHost'] is not None + assert len(dict_token['CurrentSegmentCursor']['ShardCursors']) == 3 + assert dict_token['CurrentSegmentCursor']['SegmentPath'] is not None + assert dict_token['CurrentSegmentCursor']['CurrentShardPath'] is not None # if self.is_live: # sleep(120) @@ -221,9 +229,10 @@ def test_read_change_feed_tail_where_only_1_shard_has_data(self, storage_account for event in page: events2.append(event) - self.assertIsNot(len(events2), 0) + assert len(events2) != 0 @ChangeFeedPreparer() + @recorded_by_proxy def test_read_change_feed_with_3_shards_in_a_time_range(self, storage_account_name, storage_account_key): cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key) @@ -234,25 +243,26 @@ def test_read_change_feed_with_3_shards_in_a_time_range(self, storage_account_na page = next(change_feed) events = list(page) - self.assertEqual(len(events), 16) + assert len(events) == 16 token = change_feed.continuation_token dict_token = json.loads(token) - self.assertEqual(dict_token['CursorVersion'], 1) - self.assertIsNotNone(dict_token['EndTime']) - self.assertIsNotNone(dict_token['UrlHost']) - self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3) - self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath']) - self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath']) + assert dict_token['CursorVersion'] == 1 + assert dict_token['EndTime'] is not None + assert dict_token['UrlHost'] is not None + assert len(dict_token['CurrentSegmentCursor']['ShardCursors']) == 3 + assert dict_token['CurrentSegmentCursor']['SegmentPath'] is not None + assert dict_token['CurrentSegmentCursor']['CurrentShardPath'] is not None change_feed2 = cf_client.list_changes().by_page(continuation_token=token) events = list(next(change_feed2)) end_time_str = (end_time + timedelta(hours=1)).isoformat() - self.assertTrue(events[len(events) - 1]['eventTime'] < end_time_str) + assert events[len(events) - 1]['eventTime'] < end_time_str @ChangeFeedPreparer() + @recorded_by_proxy def test_read_3_shards_change_feed_during_a_time_range_in_multiple_times_gives_same_result_as_reading_all( self, storage_account_name, storage_account_key): cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key) @@ -272,18 +282,18 @@ def test_read_3_shards_change_feed_during_a_time_range_in_multiple_times_gives_s token = change_feed.continuation_token dict_token = json.loads(token) - self.assertTrue(len(events) > 0) - self.assertEqual(dict_token['CursorVersion'], 1) - self.assertIsNotNone(dict_token['UrlHost']) - self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3) - self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath']) - self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath']) + assert len(events) > 0 + assert dict_token['CursorVersion'] == 1 + assert dict_token['UrlHost'] is not None + assert len(dict_token['CurrentSegmentCursor']['ShardCursors']) == 3 + assert dict_token['CurrentSegmentCursor']['SegmentPath'] is not None + assert dict_token['CurrentSegmentCursor']['CurrentShardPath'] is not None # make sure end_time and continuation_token are mutual exclusive - with self.assertRaises(ValueError): + with pytest.raises(ValueError): cf_client.list_changes(results_per_page=50, end_time=datetime.now()).by_page(continuation_token=token) # make sure start_time and continuation_token are mutual exclusive - with self.assertRaises(ValueError): + with pytest.raises(ValueError): cf_client.list_changes(results_per_page=50, start_time=datetime.now()).by_page(continuation_token=token) # restart using the continuation token after waiting for 2 minutes @@ -294,12 +304,12 @@ def test_read_3_shards_change_feed_during_a_time_range_in_multiple_times_gives_s for event in page: events2.append(event) - self.assertNotEqual(events2, 0) + assert events2 != 0 # restart using the continuation token which has Non-zero EventIndex for 3 shards token2 = change_feed2.continuation_token dict_token2 = json.loads(token2) - self.assertEqual(len(dict_token2['CurrentSegmentCursor']['ShardCursors']), 3) + assert len(dict_token2['CurrentSegmentCursor']['ShardCursors']) == 3 change_feed3 = cf_client.list_changes(results_per_page=50).by_page(continuation_token=token2) events3 = list() @@ -310,11 +320,12 @@ def test_read_3_shards_change_feed_during_a_time_range_in_multiple_times_gives_s token3 = change_feed3.continuation_token dict_token3 = json.loads(token3) - self.assertNotEqual(events3, 0) - self.assertEqual(len(dict_token3['CurrentSegmentCursor']['ShardCursors']), 3) - self.assertEqual(len(events)+len(events2)+len(events3), len(all_events)) + assert events3 != 0 + assert len(dict_token3['CurrentSegmentCursor']['ShardCursors']) == 3 + assert len(events)+len(events2)+len(events3) == len(all_events) @ChangeFeedPreparer() + @recorded_by_proxy def test_list_3_shards_events_works_with_1_shard_cursor(self, storage_account_name, storage_account_key): cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key) start_time = datetime(2020, 8, 5, 17) @@ -331,5 +342,5 @@ def test_list_3_shards_events_works_with_1_shard_cursor(self, storage_account_na events.append(event) dict_token = json.loads(change_feed.continuation_token) dict_token_with_1_shard = json.loads(token_with_1_shard) - self.assertEqual(len(dict_token_with_1_shard['CurrentSegmentCursor']['ShardCursors']), 1) - self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3) + assert len(dict_token_with_1_shard['CurrentSegmentCursor']['ShardCursors']) == 1 + assert len(dict_token['CurrentSegmentCursor']['ShardCursors']) == 3