From 51a419c41eca692bb19f1e0070a912f9f64b4cbc Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Thu, 25 Mar 2021 11:52:55 +0100 Subject: [PATCH] tests: dont run rest/registry if external services are used Otherwise the services will join the existing group and the tests may fail if the services started by the fixtures can not communicate with the external services. --- tests/integration/conftest.py | 55 ++++++++++++++++++++++++++++++++--- tests/utils.py | 25 +++++++++++++--- 2 files changed, 72 insertions(+), 8 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 54cc1c95d..416261ca5 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -17,7 +17,7 @@ from subprocess import Popen from tests.utils import ( Expiration, get_random_port, KAFKA_PORT_RANGE, KafkaConfig, KafkaServers, new_random_name, REGISTRY_PORT_RANGE, - ZK_PORT_RANGE + repeat_until_successful_request, ZK_PORT_RANGE ) from typing import AsyncIterator, Dict, Iterator, List, Optional, Tuple @@ -173,8 +173,22 @@ def fixture_admin(kafka_servers: KafkaServers) -> Iterator[KafkaRestAdminClient] @pytest.fixture(scope="function", name="rest_async") -async def fixture_rest_async(tmp_path: Path, kafka_servers: KafkaServers, - registry_async_client: Client) -> AsyncIterator[KafkaRest]: +async def fixture_rest_async( + request, + tmp_path: Path, + kafka_servers: KafkaServers, + registry_async_client: Client, +) -> AsyncIterator[Optional[KafkaRest]]: + + # Do not start a REST api when the user provided an external service. Doing + # so would cause this node to join the existing group and participate in + # the election process. Without proper configuration for the listeners that + # won't work and will cause test failures. + rest_url = request.config.getoption("rest_url") + if rest_url: + yield None + return + config_path = tmp_path / "karapace_config.json" config = set_config_defaults({"bootstrap_uri": kafka_servers.bootstrap_servers, "admin_metadata_max_age": 0}) @@ -204,6 +218,16 @@ async def fixture_rest_async_client(request, rest_async: KafkaRest, aiohttp_clie client = Client(client=client_factory) with closing(client): + # wait until the server is listening, otherwise the tests may fail + await repeat_until_successful_request( + client.get, + "brokers", + json_data=None, + headers=None, + error_msg="REST API is unreachable", + timeout=10, + sleep=0.3, + ) yield client @@ -247,7 +271,20 @@ def fixture_registry_async_pair(tmp_path: Path, kafka_servers: KafkaServers): @pytest.fixture(scope="function", name="registry_async") -async def fixture_registry_async(tmp_path: Path, kafka_servers: KafkaServers) -> AsyncIterator[KarapaceSchemaRegistry]: +async def fixture_registry_async( + request, + tmp_path: Path, + kafka_servers: KafkaServers, +) -> AsyncIterator[Optional[KarapaceSchemaRegistry]]: + # Do not start a registry when the user provided an external service. Doing + # so would cause this node to join the existing group and participate in + # the election process. Without proper configuration for the listeners that + # won't work and will cause test failures. + rest_url = request.config.getoption("registry_url") + if rest_url: + yield None + return + config_path = tmp_path / "karapace_config.json" config = set_config_defaults({ @@ -283,6 +320,16 @@ async def fixture_registry_async_client(request, registry_async: KarapaceSchemaR client = Client(client=client_factory) with closing(client): + # wait until the server is listening, otherwise the tests may fail + await repeat_until_successful_request( + client.get, + "subjects", + json_data=None, + headers=None, + error_msg="REST API is unreachable", + timeout=10, + sleep=0.3, + ) yield client diff --git a/tests/utils.py b/tests/utils.py index a27360e33..3a17cc931 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,3 +1,4 @@ +from aiohttp.client_exceptions import ClientOSError, ServerDisconnectedError from dataclasses import dataclass from kafka.errors import TopicAlreadyExistsError from karapace.utils import Client @@ -235,13 +236,29 @@ async def wait_for_topics(rest_async_client: Client, topic_names: List[str], tim async def repeat_until_successful_request( callback, path: str, json_data, headers, error_msg: str, timeout: float, sleep: float -) -> None: +): expiration = Expiration.from_timeout(timeout=timeout) + ok = False + res = None - res = await callback(path, json=json_data, headers=headers) - while not res.ok: + try: + res = await callback(path, json=json_data, headers=headers) + # ClientOSError: Raised when the listening socket is not yet open in the server + # ServerDisconnectedError: Wrong url + except (ClientOSError, ServerDisconnectedError): + pass + else: + ok = res.ok + + while not ok: await asyncio.sleep(sleep) expiration.raise_if_expired(msg=f"{error_msg} {res} after {timeout} secs") - res = await callback(path, json=json_data, headers=headers) + + try: + res = await callback(path, json=json_data, headers=headers) + except (ClientOSError, ServerDisconnectedError): + pass + else: + ok = res.ok return res