Skip to content

Commit

Permalink
[EventHubs] Update live test for ep (#7676)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunhaoling authored and fengzhou-msft committed Nov 5, 2019
1 parent e429909 commit 41ccef7
Showing 1 changed file with 74 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
import uuid
import warnings
import asyncio

from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager

Expand Down Expand Up @@ -41,90 +42,99 @@ def remove_live_storage_blob_client(container_str):
warnings.warn(UserWarning("storage container teardown failed"))


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_claim_and_list_ownership():
container_str, live_storage_blob_client = get_live_storage_blob_client()
if not live_storage_blob_client:
pytest.skip("Storage blob client can't be created")

async def _claim_and_list_ownership(live_storage_blob_client):
eventhub_name = 'eventhub'
consumer_group_name = '$default'
ownership_cnt = 8
async with live_storage_blob_client:
partition_manager = BlobPartitionManager(container_client=live_storage_blob_client)

try:
async with live_storage_blob_client:

partition_manager = BlobPartitionManager(container_client=live_storage_blob_client)
ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name,
consumer_group_name=consumer_group_name)
assert len(ownership_list) == 0

ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name)
assert len(ownership_list) == 0
ownership_list = []

ownership_list = []
for i in range(ownership_cnt):
ownership = {}
ownership['eventhub_name'] = eventhub_name
ownership['consumer_group_name'] = consumer_group_name
ownership['owner_id'] = 'ownerid'
ownership['partition_id'] = str(i)
ownership['last_modified_time'] = time.time()
ownership["offset"] = "1"
ownership["sequence_number"] = "1"
ownership_list.append(ownership)

for i in range(ownership_cnt):
ownership = {}
ownership['eventhub_name'] = eventhub_name
ownership['consumer_group_name'] = consumer_group_name
ownership['owner_id'] = 'ownerid'
ownership['partition_id'] = str(i)
ownership['last_modified_time'] = time.time()
ownership["offset"] = "1"
ownership["sequence_number"] = "1"
ownership_list.append(ownership)
await partition_manager.claim_ownership(ownership_list)

await partition_manager.claim_ownership(ownership_list)

ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name)
assert len(ownership_list) == ownership_cnt
finally:
remove_live_storage_blob_client(container_str)
ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name,
consumer_group_name=consumer_group_name)
assert len(ownership_list) == ownership_cnt


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_update_checkpoint():
def test_claim_and_list_ownership():
container_str, live_storage_blob_client = get_live_storage_blob_client()
if not live_storage_blob_client:
pytest.skip("Storage blob client can't be created")
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(_claim_and_list_ownership(live_storage_blob_client))
finally:
remove_live_storage_blob_client(container_str)


async def _update_checkpoint(live_storage_blob_client):
eventhub_name = 'eventhub'
consumer_group_name = '$default'
owner_id = 'owner'
partition_cnt = 8

async with live_storage_blob_client:
partition_manager = BlobPartitionManager(container_client=live_storage_blob_client)

ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name,
consumer_group_name=consumer_group_name)
assert len(ownership_list) == 0

ownership_list = []

for i in range(partition_cnt):
ownership = {}
ownership['eventhub_name'] = eventhub_name
ownership['consumer_group_name'] = consumer_group_name
ownership['owner_id'] = owner_id
ownership['partition_id'] = str(i)
ownership['last_modified_time'] = time.time()
ownership['offset'] = '1'
ownership['sequence_number'] = '10'
ownership_list.append(ownership)

await partition_manager.claim_ownership(ownership_list)

ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name,
consumer_group_name=consumer_group_name)
assert len(ownership_list) == partition_cnt

for i in range(partition_cnt):
await partition_manager.update_checkpoint(eventhub_name, consumer_group_name, str(i),
owner_id, '2', '20')

ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name,
consumer_group_name=consumer_group_name)
for ownership in ownership_list:
assert ownership['offset'] == '2'
assert ownership['sequence_number'] == '20'


@pytest.mark.liveTest
def test_update_checkpoint():
container_str, live_storage_blob_client = get_live_storage_blob_client()
if not live_storage_blob_client:
pytest.skip("Storage blob client can't be created")
try:
async with live_storage_blob_client:
partition_manager = BlobPartitionManager(container_client=live_storage_blob_client)

ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name)
assert len(ownership_list) == 0

ownership_list = []

for i in range(partition_cnt):
ownership = {}
ownership['eventhub_name'] = eventhub_name
ownership['consumer_group_name'] = consumer_group_name
ownership['owner_id'] = owner_id
ownership['partition_id'] = str(i)
ownership['last_modified_time'] = time.time()
ownership['offset'] = '1'
ownership['sequence_number'] = '10'
ownership_list.append(ownership)

await partition_manager.claim_ownership(ownership_list)

ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name)
assert len(ownership_list) == partition_cnt

for i in range(partition_cnt):
await partition_manager.update_checkpoint(eventhub_name, consumer_group_name, str(i),
owner_id, '2', '20')

ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name)
for ownership in ownership_list:
assert ownership['offset'] == '2'
assert ownership['sequence_number'] == '20'
loop = asyncio.get_event_loop()
loop.run_until_complete(_update_checkpoint(live_storage_blob_client))
finally:
remove_live_storage_blob_client(container_str)

0 comments on commit 41ccef7

Please sign in to comment.