Skip to content

Commit

Permalink
test: add retries to schema coordinator workflow test
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Sep 24, 2024
1 parent a74421a commit 749ed36
Showing 1 changed file with 27 additions and 19 deletions.
46 changes: 27 additions & 19 deletions tests/integration/test_schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
from karapace.coordinator.schema_coordinator import Assignment, SchemaCoordinator, SchemaCoordinatorGroupRebalance
from karapace.utils import json_encode
from karapace.version import __version__
from tenacity import retry, stop_after_delay, TryAgain, wait_fixed
from tests.integration.utils.kafka_server import KafkaServers
from typing import AsyncGenerator, Iterator
from tests.utils import new_random_name
from typing import AsyncGenerator, Final, Iterator
from unittest import mock

import aiokafka.errors as Errors
Expand All @@ -35,6 +37,9 @@

LOG = logging.getLogger(__name__)

RETRY_TIME: Final = 20
RETRY_WAIT_SECONDS: Final = 0.5


@pytest.fixture(scope="function", name="mocked_client")
def fixture_mocked_aiokafka_client() -> Iterator[AIOKafkaClient]:
Expand Down Expand Up @@ -82,12 +87,26 @@ async def get_client(
await client.close()


@retry(stop=stop_after_delay(RETRY_TIME), wait=wait_fixed(RETRY_WAIT_SECONDS))
async def wait_for_ready(coordinator: SchemaCoordinator) -> None:
if not coordinator.ready and coordinator.coordinator_id is None:
raise TryAgain()


@retry(stop=stop_after_delay(RETRY_TIME), wait=wait_fixed(RETRY_WAIT_SECONDS))
async def wait_for_primary_state(coordinator: SchemaCoordinator) -> None:
if not coordinator.are_we_master:
raise TryAgain()
await asyncio.sleep(0.1)


@pytest.mark.parametrize("primary_selection_strategy", ["highest", "lowest"])
async def test_coordinator_workflow(
primary_selection_strategy: str,
client: AIOKafkaClient,
kafka_servers: KafkaServers,
) -> None:
group_name = new_random_name("tg-")
# Check if 2 coordinators will coordinate rebalances correctly
# Check if the initial group join is performed correctly with minimal
# setup
Expand All @@ -98,21 +117,16 @@ async def test_coordinator_workflow(
"https",
True,
primary_selection_strategy,
"test-group",
group_name,
session_timeout_ms=10000,
heartbeat_interval_ms=500,
retry_backoff_ms=100,
)
coordinator.start()
assert coordinator.coordinator_id is None
while not coordinator.ready():
await asyncio.sleep(0.5)
assert coordinator.coordinator_id is not None

await wait_for_ready(coordinator)
await coordinator.ensure_coordinator_known()
assert coordinator.coordinator_id is not None

assert coordinator.are_we_master
await wait_for_primary_state(coordinator)

# Check if adding an additional coordinator will rebalance correctly
client2 = await _get_client(kafka_servers=kafka_servers)
Expand All @@ -123,37 +137,31 @@ async def test_coordinator_workflow(
"https",
True,
primary_selection_strategy,
"test-group",
group_name,
session_timeout_ms=10000,
heartbeat_interval_ms=500,
retry_backoff_ms=100,
)
coordinator2.start()
assert coordinator2.coordinator_id is None

while not coordinator2.ready():
await asyncio.sleep(0.5)
assert coordinator2.coordinator_id is not None

await wait_for_ready(coordinator2)
await coordinator2.ensure_coordinator_known()
assert coordinator2.coordinator_id is not None

# Helper variables to distinguish the expected primary and secondary
primary = coordinator2 if primary_selection_strategy == "highest" else coordinator
primary_client = client2 if primary_selection_strategy == "highest" else client
secondary = coordinator if primary_selection_strategy == "highest" else coordinator2
secondary_client = client if primary_selection_strategy == "highest" else client2

assert primary.are_we_master
await wait_for_primary_state(primary)
assert not secondary.are_we_master

# Check is closing the primary coordinator will rebalance the secondary to change to primary
await primary.close()
await primary_client.close()

while not secondary.are_we_master:
await asyncio.sleep(0.5)
assert secondary.are_we_master
await wait_for_primary_state(secondary)
await secondary.close()
await secondary_client.close()

Expand Down

0 comments on commit 749ed36

Please sign in to comment.