Skip to content

Commit

Permalink
wip; Update test_change_feed
Browse files Browse the repository at this point in the history
  • Loading branch information
mccoyp committed May 24, 2022
1 parent 3000bd9 commit 821597c
Showing 1 changed file with 65 additions and 54 deletions.
119 changes: 65 additions & 54 deletions sdk/storage/azure-storage-blob-changefeed/tests/test_change_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
except ImportError:
from azure.storage.blob.changefeed import ChangeFeedClient

from devtools_testutils.storage import StorageTestCase
from devtools_testutils import recorded_by_proxy
from devtools_testutils.storage import StorageRecordedTestCase
from settings.testcase import ChangeFeedPreparer

@pytest.mark.playback_test_only
class StorageChangeFeedTest(StorageTestCase):
class StorageChangeFeedTest(StorageRecordedTestCase):

# --Test cases for change feed -----------------------------------------
@ChangeFeedPreparer()
@recorded_by_proxy
def test_get_change_feed_events_by_page(self, storage_account_name, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key)
results_per_page = 10
Expand All @@ -41,9 +43,9 @@ def test_get_change_feed_events_by_page(self, storage_account_name, storage_acco
events_per_page2 = list(change_feed_page2)

# Assert
self.assertEqual(len(events_per_page1), results_per_page)
self.assertEqual(len(events_per_page2), results_per_page)
self.assertNotEqual(events_per_page1[results_per_page-1]['id'], events_per_page2[0]['id'])
assert len(events_per_page1) == results_per_page
assert len(events_per_page2) == results_per_page
assert events_per_page1[results_per_page-1]['id'] != events_per_page2[0]['id']

# Merge the two small pages into one
events_per_page1.extend(events_per_page2)
Expand All @@ -56,16 +58,17 @@ def test_get_change_feed_events_by_page(self, storage_account_name, storage_acco
# Assert
# getting two pages separately gives the same result as getting the big page at once
for i in range(0, len(one_page)):
self.assertTrue(merged_two_pages[i].get('id') == one_page[i].get('id'))
assert merged_two_pages[i].get('id') == one_page[i].get('id')

@ChangeFeedPreparer()
@recorded_by_proxy
def test_get_all_change_feed_events(self, storage_account_name, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key)
change_feed = cf_client.list_changes()
all_events = list(change_feed)
total_events = len(all_events)

self.assertTrue(len(all_events) > 0)
assert len(all_events) > 0

results_per_page = 500
change_feed_by_page = cf_client.list_changes(results_per_page=results_per_page).by_page()
Expand All @@ -75,10 +78,11 @@ def test_get_all_change_feed_events(self, storage_account_name, storage_account_
page_size = len(list(page))
event_number_in_all_pages += page_size

self.assertEqual(ceil(len(all_events)*1.0/results_per_page), len(pages))
self.assertEqual(total_events, event_number_in_all_pages)
assert ceil(len(all_events)*1.0/results_per_page) == len(pages)
assert total_events == event_number_in_all_pages

@ChangeFeedPreparer()
@recorded_by_proxy
def test_get_change_feed_events_with_continuation_token(self, storage_account_name, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key)
# To get the total events number
Expand All @@ -101,9 +105,10 @@ def test_get_change_feed_events_with_continuation_token(self, storage_account_na
rest_events.extend(list(page))

# Assert the
self.assertEqual(total_events, len(events_per_page1) + len(rest_events))
assert total_events == len(events_per_page1) + len(rest_events)

@ChangeFeedPreparer()
@recorded_by_proxy
def test_get_change_feed_events_in_a_time_range(self, storage_account_name, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key)
start_time = datetime(2020, 8, 12)
Expand All @@ -114,18 +119,20 @@ def test_get_change_feed_events_in_a_time_range(self, storage_account_name, stor
page1 = next(change_feed)
events = list(page1)

self.assertIsNot(len(events), 0)
assert len(events) != 0

@ChangeFeedPreparer()
@recorded_by_proxy
def test_change_feed_does_not_fail_on_empty_event_stream(self, storage_account_name, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key)
start_time = datetime(2300, 8, 19)
change_feed = cf_client.list_changes(start_time=start_time)

events = list(change_feed)
self.assertEqual(len(events), 0)
assert len(events) == 0

@ChangeFeedPreparer()
@recorded_by_proxy
def test_read_change_feed_tail_where_3_shards_have_data(self, storage_account_name, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key)

Expand All @@ -144,12 +151,12 @@ def test_read_change_feed_tail_where_3_shards_have_data(self, storage_account_na
token = change_feed.continuation_token

dict_token = json.loads(token)
self.assertTrue(len(events) > 0)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])
assert len(events) > 0
assert dict_token['CursorVersion'] == 1
assert dict_token['UrlHost'] is not None
assert len(dict_token['CurrentSegmentCursor']['ShardCursors']) == 3
assert dict_token['CurrentSegmentCursor']['SegmentPath'] is not None
assert dict_token['CurrentSegmentCursor']['CurrentShardPath'] is not None

if self.is_live:
sleep(120)
Expand All @@ -162,7 +169,7 @@ def test_read_change_feed_tail_where_3_shards_have_data(self, storage_account_na
for event in change_feed_page2:
events2.append(event)

self.assertNotEqual(events2, 0)
assert events2 != 0

if self.is_live:
sleep(120)
Expand All @@ -171,19 +178,20 @@ def test_read_change_feed_tail_where_3_shards_have_data(self, storage_account_na
# restart using the continuation token which has Non-zero EventIndex for 3 shards
token2 = change_feed2.continuation_token
dict_token2 = json.loads(token2)
self.assertEqual(len(dict_token2['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][0]['EventIndex'], 0)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][1]['EventIndex'], 0)
self.assertNotEqual(dict_token2['CurrentSegmentCursor']['ShardCursors'][2]['EventIndex'], 0)
assert len(dict_token2['CurrentSegmentCursor']['ShardCursors']) == 3
assert dict_token2['CurrentSegmentCursor']['ShardCursors'][0]['EventIndex'] != 0
assert dict_token2['CurrentSegmentCursor']['ShardCursors'][1]['EventIndex'] != 0
assert dict_token2['CurrentSegmentCursor']['ShardCursors'][2]['EventIndex'] != 0

change_feed3 = cf_client.list_changes(results_per_page=57).by_page(continuation_token=token2)
change_feed_page3 = next(change_feed3)
events3 = list()
for event in change_feed_page3:
events3.append(event)
self.assertNotEqual(events2, 0)
assert events2 != 0

@ChangeFeedPreparer()
@recorded_by_proxy
def test_read_change_feed_tail_where_only_1_shard_has_data(self, storage_account_name, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key)

Expand All @@ -203,12 +211,12 @@ def test_read_change_feed_tail_where_only_1_shard_has_data(self, storage_account
token = change_feed.continuation_token
dict_token = json.loads(token)

self.assertEqual(len(events_on_first_page), 3)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])
assert len(events_on_first_page) == 3
assert dict_token['CursorVersion'] == 1
assert dict_token['UrlHost'] is not None
assert len(dict_token['CurrentSegmentCursor']['ShardCursors']) == 3
assert dict_token['CurrentSegmentCursor']['SegmentPath'] is not None
assert dict_token['CurrentSegmentCursor']['CurrentShardPath'] is not None

# if self.is_live:
# sleep(120)
Expand All @@ -221,9 +229,10 @@ def test_read_change_feed_tail_where_only_1_shard_has_data(self, storage_account
for event in page:
events2.append(event)

self.assertIsNot(len(events2), 0)
assert len(events2) != 0

@ChangeFeedPreparer()
@recorded_by_proxy
def test_read_change_feed_with_3_shards_in_a_time_range(self, storage_account_name, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key)

Expand All @@ -234,25 +243,26 @@ def test_read_change_feed_with_3_shards_in_a_time_range(self, storage_account_na

page = next(change_feed)
events = list(page)
self.assertEqual(len(events), 16)
assert len(events) == 16

token = change_feed.continuation_token

dict_token = json.loads(token)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['EndTime'])
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])
assert dict_token['CursorVersion'] == 1
assert dict_token['EndTime'] is not None
assert dict_token['UrlHost'] is not None
assert len(dict_token['CurrentSegmentCursor']['ShardCursors']) == 3
assert dict_token['CurrentSegmentCursor']['SegmentPath'] is not None
assert dict_token['CurrentSegmentCursor']['CurrentShardPath'] is not None

change_feed2 = cf_client.list_changes().by_page(continuation_token=token)
events = list(next(change_feed2))

end_time_str = (end_time + timedelta(hours=1)).isoformat()
self.assertTrue(events[len(events) - 1]['eventTime'] < end_time_str)
assert events[len(events) - 1]['eventTime'] < end_time_str

@ChangeFeedPreparer()
@recorded_by_proxy
def test_read_3_shards_change_feed_during_a_time_range_in_multiple_times_gives_same_result_as_reading_all(
self, storage_account_name, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key)
Expand All @@ -272,18 +282,18 @@ def test_read_3_shards_change_feed_during_a_time_range_in_multiple_times_gives_s
token = change_feed.continuation_token

dict_token = json.loads(token)
self.assertTrue(len(events) > 0)
self.assertEqual(dict_token['CursorVersion'], 1)
self.assertIsNotNone(dict_token['UrlHost'])
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['SegmentPath'])
self.assertIsNotNone(dict_token['CurrentSegmentCursor']['CurrentShardPath'])
assert len(events) > 0
assert dict_token['CursorVersion'] == 1
assert dict_token['UrlHost'] is not None
assert len(dict_token['CurrentSegmentCursor']['ShardCursors']) == 3
assert dict_token['CurrentSegmentCursor']['SegmentPath'] is not None
assert dict_token['CurrentSegmentCursor']['CurrentShardPath'] is not None

# make sure end_time and continuation_token are mutual exclusive
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
cf_client.list_changes(results_per_page=50, end_time=datetime.now()).by_page(continuation_token=token)
# make sure start_time and continuation_token are mutual exclusive
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
cf_client.list_changes(results_per_page=50, start_time=datetime.now()).by_page(continuation_token=token)

# restart using the continuation token after waiting for 2 minutes
Expand All @@ -294,12 +304,12 @@ def test_read_3_shards_change_feed_during_a_time_range_in_multiple_times_gives_s
for event in page:
events2.append(event)

self.assertNotEqual(events2, 0)
assert events2 != 0

# restart using the continuation token which has Non-zero EventIndex for 3 shards
token2 = change_feed2.continuation_token
dict_token2 = json.loads(token2)
self.assertEqual(len(dict_token2['CurrentSegmentCursor']['ShardCursors']), 3)
assert len(dict_token2['CurrentSegmentCursor']['ShardCursors']) == 3

change_feed3 = cf_client.list_changes(results_per_page=50).by_page(continuation_token=token2)
events3 = list()
Expand All @@ -310,11 +320,12 @@ def test_read_3_shards_change_feed_during_a_time_range_in_multiple_times_gives_s
token3 = change_feed3.continuation_token
dict_token3 = json.loads(token3)

self.assertNotEqual(events3, 0)
self.assertEqual(len(dict_token3['CurrentSegmentCursor']['ShardCursors']), 3)
self.assertEqual(len(events)+len(events2)+len(events3), len(all_events))
assert events3 != 0
assert len(dict_token3['CurrentSegmentCursor']['ShardCursors']) == 3
assert len(events)+len(events2)+len(events3) == len(all_events)

@ChangeFeedPreparer()
@recorded_by_proxy
def test_list_3_shards_events_works_with_1_shard_cursor(self, storage_account_name, storage_account_key):
cf_client = ChangeFeedClient(self.account_url(storage_account_name, "blob"), storage_account_key)
start_time = datetime(2020, 8, 5, 17)
Expand All @@ -331,5 +342,5 @@ def test_list_3_shards_events_works_with_1_shard_cursor(self, storage_account_na
events.append(event)
dict_token = json.loads(change_feed.continuation_token)
dict_token_with_1_shard = json.loads(token_with_1_shard)
self.assertEqual(len(dict_token_with_1_shard['CurrentSegmentCursor']['ShardCursors']), 1)
self.assertEqual(len(dict_token['CurrentSegmentCursor']['ShardCursors']), 3)
assert len(dict_token_with_1_shard['CurrentSegmentCursor']['ShardCursors']) == 1
assert len(dict_token['CurrentSegmentCursor']['ShardCursors']) == 3

0 comments on commit 821597c

Please sign in to comment.