Skip to content

Commit

Permalink
tests: dont run rest/registry if external services are used
Browse files Browse the repository at this point in the history
Otherwise the services will join the existing group and the tests may
fail if the services started by the fixtures can not communicate with
the external services.
  • Loading branch information
Augusto F. Hack committed Mar 25, 2021
1 parent d535be7 commit 8045eff
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 8 deletions.
55 changes: 51 additions & 4 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from subprocess import Popen
from tests.utils import (
Expiration, get_random_port, KAFKA_PORT_RANGE, KafkaConfig, KafkaServers, new_random_name, REGISTRY_PORT_RANGE,
ZK_PORT_RANGE
repeat_until_successful_request, ZK_PORT_RANGE
)
from typing import AsyncIterator, Dict, Iterator, List, Optional, Tuple

Expand Down Expand Up @@ -173,8 +173,22 @@ def fixture_admin(kafka_servers: KafkaServers) -> Iterator[KafkaRestAdminClient]


@pytest.fixture(scope="function", name="rest_async")
async def fixture_rest_async(tmp_path: Path, kafka_servers: KafkaServers,
registry_async_client: Client) -> AsyncIterator[KafkaRest]:
async def fixture_rest_async(
request,
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client: Client,
) -> AsyncIterator[Optional[KafkaRest]]:

# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the the listeners
# that won't work and will cause test failures.
rest_url = request.config.getoption("rest_url")
if rest_url:
yield None
return

config_path = tmp_path / "karapace_config.json"

config = set_config_defaults({"bootstrap_uri": kafka_servers.bootstrap_servers, "admin_metadata_max_age": 0})
Expand Down Expand Up @@ -204,6 +218,16 @@ async def fixture_rest_async_client(request, rest_async: KafkaRest, aiohttp_clie
client = Client(client=client_factory)

with closing(client):
# wait until the server is listening, otherwise the tests may fail
await repeat_until_successful_request(
client.get,
"brokers",
json_data=None,
headers=None,
error_msg="REST API is unreachable",
timeout=10,
sleep=0.3,
)
yield client


Expand Down Expand Up @@ -247,7 +271,20 @@ def fixture_registry_async_pair(tmp_path: Path, kafka_servers: KafkaServers):


@pytest.fixture(scope="function", name="registry_async")
async def fixture_registry_async(tmp_path: Path, kafka_servers: KafkaServers) -> AsyncIterator[KarapaceSchemaRegistry]:
async def fixture_registry_async(
request,
tmp_path: Path,
kafka_servers: KafkaServers,
) -> AsyncIterator[Optional[KarapaceSchemaRegistry]]:
# Do not start a registry when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the the listeners
# that won't work and will cause test failures.
rest_url = request.config.getoption("registry_url")
if rest_url:
yield None
return

config_path = tmp_path / "karapace_config.json"

config = set_config_defaults({
Expand Down Expand Up @@ -283,6 +320,16 @@ async def fixture_registry_async_client(request, registry_async: KarapaceSchemaR
client = Client(client=client_factory)

with closing(client):
# wait until the server is listening, otherwise the tests may fail
await repeat_until_successful_request(
client.get,
"subjects",
json_data=None,
headers=None,
error_msg="REST API is unreachable",
timeout=10,
sleep=0.3,
)
yield client


Expand Down
25 changes: 21 additions & 4 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from aiohttp.client_exceptions import ClientOSError, ServerDisconnectedError
from dataclasses import dataclass
from kafka.errors import TopicAlreadyExistsError
from karapace.utils import Client
Expand Down Expand Up @@ -235,13 +236,29 @@ async def wait_for_topics(rest_async_client: Client, topic_names: List[str], tim

async def repeat_until_successful_request(
callback, path: str, json_data, headers, error_msg: str, timeout: float, sleep: float
) -> None:
):
expiration = Expiration.from_timeout(timeout=timeout)
ok = False
res = None

res = await callback(path, json=json_data, headers=headers)
while not res.ok:
try:
res = await callback(path, json=json_data, headers=headers)
# ClientOSError: Raised when the listening socket is not yet open in the server
# ServerDisconnectedError: Wrong url
except (ClientOSError, ServerDisconnectedError):
pass
else:
ok = res.ok

while not ok:
await asyncio.sleep(sleep)
expiration.raise_if_expired(msg=f"{error_msg} {res} after {timeout} secs")
res = await callback(path, json=json_data, headers=headers)

try:
res = await callback(path, json=json_data, headers=headers)
except (ClientOSError, ServerDisconnectedError):
pass
else:
ok = res.ok

return res

0 comments on commit 8045eff

Please sign in to comment.