Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: dont run rest/registry if external services are used #189

Merged
merged 1 commit into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from subprocess import Popen
from tests.utils import (
Expiration, get_random_port, KAFKA_PORT_RANGE, KafkaConfig, KafkaServers, new_random_name, REGISTRY_PORT_RANGE,
ZK_PORT_RANGE
repeat_until_successful_request, ZK_PORT_RANGE
)
from typing import AsyncIterator, Dict, Iterator, List, Optional, Tuple

Expand Down Expand Up @@ -173,8 +173,22 @@ def fixture_admin(kafka_servers: KafkaServers) -> Iterator[KafkaRestAdminClient]


@pytest.fixture(scope="function", name="rest_async")
async def fixture_rest_async(tmp_path: Path, kafka_servers: KafkaServers,
registry_async_client: Client) -> AsyncIterator[KafkaRest]:
async def fixture_rest_async(
request,
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client: Client,
) -> AsyncIterator[Optional[KafkaRest]]:

# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the listeners that
# won't work and will cause test failures.
rest_url = request.config.getoption("rest_url")
if rest_url:
yield None
return

config_path = tmp_path / "karapace_config.json"

config = set_config_defaults({"bootstrap_uri": kafka_servers.bootstrap_servers, "admin_metadata_max_age": 0})
Expand Down Expand Up @@ -204,6 +218,16 @@ async def fixture_rest_async_client(request, rest_async: KafkaRest, aiohttp_clie
client = Client(client=client_factory)

with closing(client):
# wait until the server is listening, otherwise the tests may fail
await repeat_until_successful_request(
client.get,
"brokers",
json_data=None,
headers=None,
error_msg="REST API is unreachable",
timeout=10,
sleep=0.3,
)
yield client


Expand Down Expand Up @@ -247,7 +271,20 @@ def fixture_registry_async_pair(tmp_path: Path, kafka_servers: KafkaServers):


@pytest.fixture(scope="function", name="registry_async")
async def fixture_registry_async(tmp_path: Path, kafka_servers: KafkaServers) -> AsyncIterator[KarapaceSchemaRegistry]:
async def fixture_registry_async(
request,
tmp_path: Path,
kafka_servers: KafkaServers,
) -> AsyncIterator[Optional[KarapaceSchemaRegistry]]:
# Do not start a registry when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the listeners that
# won't work and will cause test failures.
rest_url = request.config.getoption("registry_url")
if rest_url:
yield None
return

config_path = tmp_path / "karapace_config.json"

config = set_config_defaults({
Expand Down Expand Up @@ -283,6 +320,16 @@ async def fixture_registry_async_client(request, registry_async: KarapaceSchemaR
client = Client(client=client_factory)

with closing(client):
# wait until the server is listening, otherwise the tests may fail
await repeat_until_successful_request(
client.get,
"subjects",
json_data=None,
headers=None,
error_msg="REST API is unreachable",
timeout=10,
sleep=0.3,
)
yield client


Expand Down
25 changes: 21 additions & 4 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from aiohttp.client_exceptions import ClientOSError, ServerDisconnectedError
from dataclasses import dataclass
from kafka.errors import TopicAlreadyExistsError
from karapace.utils import Client
Expand Down Expand Up @@ -235,13 +236,29 @@ async def wait_for_topics(rest_async_client: Client, topic_names: List[str], tim

async def repeat_until_successful_request(
callback, path: str, json_data, headers, error_msg: str, timeout: float, sleep: float
) -> None:
):
expiration = Expiration.from_timeout(timeout=timeout)
ok = False
res = None

res = await callback(path, json=json_data, headers=headers)
while not res.ok:
try:
res = await callback(path, json=json_data, headers=headers)
# ClientOSError: Raised when the listening socket is not yet open in the server
# ServerDisconnectedError: Wrong url
except (ClientOSError, ServerDisconnectedError):
pass
else:
ok = res.ok

while not ok:
await asyncio.sleep(sleep)
expiration.raise_if_expired(msg=f"{error_msg} {res} after {timeout} secs")
res = await callback(path, json=json_data, headers=headers)

try:
res = await callback(path, json=json_data, headers=headers)
except (ClientOSError, ServerDisconnectedError):
pass
else:
ok = res.ok

return res