Skip to content

Commit

Permalink
test: select random free server ports with 0
Browse files Browse the repository at this point in the history
The previous port selection algorithm relied on port range. The range
was from 48700 to 49000. This is 300 ports of which some may be already
reserved in the system running tests. The free ports are allocated between
the pytest workers. With 4 workers there is 75 ports for each. This causes
flakiness in the tests as ports may not be available or are still in
wait after previous tests. In the test environment it is not necessary
to select ports from IANA dynamic port assignment range. Any free port
is ok and port 0 can be used for dynamically allocated free port.
  • Loading branch information
jjaakola-aiven committed Sep 24, 2024
1 parent 42c7174 commit 3e2202e
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 140 deletions.
3 changes: 0 additions & 3 deletions tests/integration/backup/test_session_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from tests.integration.conftest import create_kafka_server
from tests.integration.utils.config import KafkaDescription
from tests.integration.utils.kafka_server import KafkaServers
from tests.integration.utils.network import PortRangeInclusive

import pytest

Expand All @@ -26,7 +25,6 @@
@pytest.fixture(scope="function", name="kafka_server_session_timeout")
def fixture_kafka_server(
kafka_description: KafkaDescription,
port_range: PortRangeInclusive,
tmp_path_factory: pytest.TempPathFactory,
):
# use custom data and log dir to avoid conflict with other kafka servers
Expand All @@ -40,7 +38,6 @@ def fixture_kafka_server(
session_datadir,
session_logdir,
kafka_description,
port_range,
kafka_config_extra,
)

Expand Down
55 changes: 9 additions & 46 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
maybe_download_kafka,
wait_for_kafka,
)
from tests.integration.utils.network import PortRangeInclusive
from tests.integration.utils.network import allocate_port
from tests.integration.utils.process import stop_process, wait_for_port_subprocess
from tests.integration.utils.rest_client import RetryRestClient
from tests.integration.utils.synchronization import lock_path_for
from tests.integration.utils.zookeeper import configure_and_start_zk
from tests.utils import repeat_until_successful_request
from typing import AsyncIterator, Iterator
from typing import AsyncGenerator, AsyncIterator, Iterator
from urllib.parse import urlparse

import asyncio
Expand All @@ -45,7 +45,6 @@
import pytest
import re
import secrets
import string
import time

REPOSITORY_DIR = pathlib.Path(__file__).parent.parent.parent.absolute()
Expand All @@ -67,29 +66,6 @@ def _clear_test_name(name: str) -> str:
return re.sub(r"[\W]", "_", name)[:30]


@pytest.fixture(scope="session", name="port_range")
def fixture_port_range() -> PortRangeInclusive:
"""Container used by other fixtures to register used ports"""
# To find a good port range use the following:
#
# curl --silent 'https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt' | \
# egrep -i -e '^\s*[0-9]+-[0-9]+\s*unassigned' | \
# awk '{print $1}'
#
start = 48700
end = 49000

# Split the ports among the workers to prevent port reuse
worker_name = os.environ.get("PYTEST_XDIST_WORKER", "0")
worker_id = int(worker_name.lstrip(string.ascii_letters))
worker_count = int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
total_ports = end - start
ports_per_worker = total_ports // worker_count
start_worker = (ports_per_worker * worker_id) + start
end_worker = start_worker + ports_per_worker - 1
return PortRangeInclusive(start_worker, end_worker)


@pytest.fixture(scope="session", name="kafka_description")
def fixture_kafka_description(request: SubRequest) -> KafkaDescription:
kafka_version = request.config.getoption("kafka_version") or KAFKA_VERSION
Expand All @@ -113,7 +89,6 @@ def fixture_kafka_server(
session_datadir: Path,
session_logdir: Path,
kafka_description: KafkaDescription,
port_range: PortRangeInclusive,
) -> Iterator[KafkaServers]:
bootstrap_servers = request.config.getoption("kafka_bootstrap_servers")

Expand All @@ -127,15 +102,13 @@ def fixture_kafka_server(
session_datadir,
session_logdir,
kafka_description,
port_range,
)


def create_kafka_server(
session_datadir: Path,
session_logdir: Path,
kafka_description: KafkaDescription,
port_range: PortRangeInclusive,
kafka_properties: dict[str, int | str] | None = None,
) -> Iterator[KafkaServers]:
if kafka_properties is None:
Expand All @@ -151,9 +124,9 @@ def create_kafka_server(
worker_id = os.environ.get("PYTEST_XDIST_WORKER")

with ExitStack() as stack:
zk_client_port = stack.enter_context(port_range.allocate_port())
zk_admin_port = stack.enter_context(port_range.allocate_port())
kafka_plaintext_port = stack.enter_context(port_range.allocate_port())
zk_client_port = stack.enter_context(allocate_port())
zk_admin_port = stack.enter_context(allocate_port())
kafka_plaintext_port = stack.enter_context(allocate_port())

with FileLock(str(lock_file)):
if transfer_file.exists():
Expand Down Expand Up @@ -229,7 +202,7 @@ def create_kafka_server(


@pytest.fixture(scope="function", name="producer")
def fixture_producer(kafka_servers: KafkaServers) -> KafkaProducer:
def fixture_producer(kafka_servers: KafkaServers) -> Iterator[KafkaProducer]:
yield KafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers)


Expand Down Expand Up @@ -259,7 +232,7 @@ def fixture_consumer(
async def fixture_asyncproducer(
kafka_servers: KafkaServers,
loop: asyncio.AbstractEventLoop,
) -> Iterator[AsyncKafkaProducer]:
) -> AsyncGenerator[AsyncKafkaProducer, None]:
asyncproducer = AsyncKafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers, loop=loop)
await asyncproducer.start()
yield asyncproducer
Expand All @@ -270,7 +243,7 @@ async def fixture_asyncproducer(
async def fixture_asyncconsumer(
kafka_servers: KafkaServers,
loop: asyncio.AbstractEventLoop,
) -> Iterator[AsyncKafkaConsumer]:
) -> AsyncGenerator[AsyncKafkaConsumer, None]:
asyncconsumer = AsyncKafkaConsumer(
bootstrap_servers=kafka_servers.bootstrap_servers,
loop=loop,
Expand Down Expand Up @@ -507,7 +480,6 @@ async def fixture_registry_async_pair(
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
session_logdir: Path,
kafka_servers: KafkaServers,
port_range: PortRangeInclusive,
) -> AsyncIterator[list[str]]:
"""Starts a cluster of two Schema Registry servers and returns their URL endpoints."""

Expand All @@ -517,7 +489,6 @@ async def fixture_registry_async_pair(
async with start_schema_registry_cluster(
config_templates=[config1, config2],
data_dir=session_logdir / _clear_test_name(request.node.name),
port_range=port_range,
) as endpoints:
yield [server.endpoint.to_url() for server in endpoints]

Expand All @@ -528,7 +499,6 @@ async def fixture_registry_cluster(
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
session_logdir: Path,
kafka_servers: KafkaServers,
port_range: PortRangeInclusive,
) -> AsyncIterator[RegistryDescription]:
# Do not start a registry when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
Expand All @@ -545,7 +515,6 @@ async def fixture_registry_cluster(
async with start_schema_registry_cluster(
config_templates=[config],
data_dir=session_logdir / _clear_test_name(request.node.name),
port_range=port_range,
) as servers:
yield servers[0]

Expand All @@ -555,7 +524,7 @@ async def fixture_registry_async_client(
request: SubRequest,
registry_cluster: RegistryDescription,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
) -> Client:
) -> AsyncGenerator[Client, None]:
client = Client(
server_uri=registry_cluster.endpoint.to_url(),
server_ca=request.config.getoption("server_ca"),
Expand Down Expand Up @@ -612,7 +581,6 @@ async def fixture_registry_https_endpoint(
kafka_servers: KafkaServers,
server_cert: str,
server_key: str,
port_range: PortRangeInclusive,
) -> AsyncIterator[str]:
# Do not start a registry when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
Expand All @@ -631,7 +599,6 @@ async def fixture_registry_https_endpoint(
async with start_schema_registry_cluster(
config_templates=[config],
data_dir=session_logdir / _clear_test_name(request.node.name),
port_range=port_range,
) as servers:
yield servers[0].endpoint.to_url()

Expand Down Expand Up @@ -671,7 +638,6 @@ async def fixture_registry_http_auth_endpoint(
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
session_logdir: Path,
kafka_servers: KafkaServers,
port_range: PortRangeInclusive,
) -> AsyncIterator[str]:
# Do not start a registry when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
Expand All @@ -689,7 +655,6 @@ async def fixture_registry_http_auth_endpoint(
async with start_schema_registry_cluster(
config_templates=[config],
data_dir=session_logdir / _clear_test_name(request.node.name),
port_range=port_range,
) as servers:
yield servers[0].endpoint.to_url()

Expand Down Expand Up @@ -732,7 +697,6 @@ async def fixture_registry_async_auth_pair(
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
session_logdir: Path,
kafka_servers: KafkaServers,
port_range: PortRangeInclusive,
) -> AsyncIterator[list[str]]:
"""Starts a cluster of two Schema Registry servers with authentication enabled and returns their URL endpoints."""

Expand All @@ -748,7 +712,6 @@ async def fixture_registry_async_auth_pair(
async with start_schema_registry_cluster(
config_templates=[config1, config2],
data_dir=session_logdir / _clear_test_name(request.node.name),
port_range=port_range,
) as endpoints:
yield [server.endpoint.to_url() for server in endpoints]

Expand Down
17 changes: 11 additions & 6 deletions tests/integration/test_karapace.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from contextlib import ExitStack
from contextlib import closing, contextmanager, ExitStack
from karapace.config import set_config_defaults
from pathlib import Path
from tests.integration.utils.kafka_server import KafkaServers
from tests.integration.utils.network import PortRangeInclusive
from tests.integration.utils.process import stop_process
from tests.utils import popen_karapace_all
from typing import Iterator
Expand All @@ -15,8 +14,15 @@
import socket


@contextmanager
def allocate_port_no_reuse() -> Iterator[int]:
"""Allocate random free port and do not allow reuse."""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(("127.0.0.1", 0))
yield sock.getsockname()[1]


def test_regression_server_must_exit_on_exception(
port_range: PortRangeInclusive,
tmp_path: Path,
kafka_servers: Iterator[KafkaServers],
) -> None:
Expand All @@ -25,10 +31,10 @@ def test_regression_server_must_exit_on_exception(
Karapace was not closing all its background threads, so when an exception
was raised an reached the top-level, the webserver created by asyncio would
be stopped but the threads would keep the server running.
Karapace exit on exception is done by setting a reserved port as server port.
"""
with ExitStack() as stack:
port = stack.enter_context(port_range.allocate_port())
sock = stack.enter_context(socket.socket())
port = stack.enter_context(allocate_port_no_reuse())

config = set_config_defaults(
{
Expand All @@ -42,7 +48,6 @@ def test_regression_server_must_exit_on_exception(
logfile = stack.enter_context((tmp_path / "karapace.log").open("w"))
errfile = stack.enter_context((tmp_path / "karapace.err").open("w"))
config_path.write_text(json.dumps(config))
sock.bind(("127.0.0.1", port))
process = popen_karapace_all(config_path, logfile, errfile)
stack.callback(stop_process, process) # make sure to stop the process if the test fails
assert process.wait(timeout=10) != 0, "Process should have exited with an error, port is already is use"
14 changes: 7 additions & 7 deletions tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from karapace.config import set_config_defaults
from karapace.coordinator.master_coordinator import MasterCoordinator
from tests.integration.utils.kafka_server import KafkaServers
from tests.integration.utils.network import PortRangeInclusive
from tests.integration.utils.network import allocate_port
from tests.integration.utils.rest_client import RetryRestClient
from tests.utils import new_random_name

Expand Down Expand Up @@ -38,9 +38,9 @@ def has_master(mc: MasterCoordinator) -> bool:

@pytest.mark.timeout(60) # Github workflows need a bit of extra time
@pytest.mark.parametrize("strategy", ["lowest", "highest"])
async def test_master_selection(port_range: PortRangeInclusive, kafka_servers: KafkaServers, strategy: str) -> None:
async def test_master_selection(kafka_servers: KafkaServers, strategy: str) -> None:
# Use random port to allow for parallel runs.
with port_range.allocate_port() as port1, port_range.allocate_port() as port2:
with allocate_port() as port1, allocate_port() as port2:
port_aa, port_bb = sorted((port1, port2))
client_id_aa = new_random_name("master_selection_aa_")
client_id_bb = new_random_name("master_selection_bb_")
Expand Down Expand Up @@ -99,7 +99,7 @@ async def test_master_selection(port_range: PortRangeInclusive, kafka_servers: K
await mc_bb.close()


async def test_mixed_eligibility_for_primary_role(kafka_servers: KafkaServers, port_range: PortRangeInclusive) -> None:
async def test_mixed_eligibility_for_primary_role(kafka_servers: KafkaServers) -> None:
"""Test that primary selection works when mixed set of roles is configured for Karapace instances.
The Kafka group coordinator leader can be any node, it has no relation to Karapace primary role eligibility.
Expand All @@ -109,7 +109,7 @@ async def test_mixed_eligibility_for_primary_role(kafka_servers: KafkaServers, p
client_id = new_random_name("master_selection_")
group_id = new_random_name("group_id")

with port_range.allocate_port() as port1, port_range.allocate_port() as port2, port_range.allocate_port() as port3:
with allocate_port() as port1, allocate_port() as port2, allocate_port() as port3:
config_primary = set_config_defaults(
{
"advertised_hostname": "127.0.0.1",
Expand Down Expand Up @@ -166,11 +166,11 @@ async def test_mixed_eligibility_for_primary_role(kafka_servers: KafkaServers, p
await primary.close()


async def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortRangeInclusive) -> None:
async def test_no_eligible_master(kafka_servers: KafkaServers) -> None:
client_id = new_random_name("master_selection_")
group_id = new_random_name("group_id")

with port_range.allocate_port() as port:
with allocate_port() as port:
config_aa = set_config_defaults(
{
"advertised_hostname": "127.0.0.1",
Expand Down
13 changes: 7 additions & 6 deletions tests/integration/utils/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from contextlib import asynccontextmanager, ExitStack
from dataclasses import dataclass
from karapace.config import Config, set_config_defaults, write_config
from pathlib import Path
from tests.integration.utils.network import PortRangeInclusive
from tests.integration.utils.network import allocate_port
from tests.integration.utils.process import stop_process, wait_for_port_subprocess
from tests.utils import new_random_name, popen_karapace_all
from typing import AsyncIterator, List
from typing import AsyncIterator


@dataclass(frozen=True)
Expand All @@ -30,10 +32,9 @@ class RegistryDescription:

@asynccontextmanager
async def start_schema_registry_cluster(
config_templates: List[Config],
config_templates: list[Config],
data_dir: Path,
port_range: PortRangeInclusive,
) -> AsyncIterator[List[RegistryDescription]]:
) -> AsyncIterator[list[RegistryDescription]]:
"""Start a cluster of schema registries, one process per `config_templates`."""
for template in config_templates:
assert "bootstrap_uri" in template, "base_config must have the value `bootstrap_uri` set"
Expand Down Expand Up @@ -67,7 +68,7 @@ async def start_schema_registry_cluster(
)
actual_group_id = config.setdefault("group_id", group_id)

port = config.setdefault("port", stack.enter_context(port_range.allocate_port()))
port = config.setdefault("port", stack.enter_context(allocate_port()))
assert isinstance(port, int), "Port must be an integer"

group_dir = data_dir / str(actual_group_id)
Expand Down
Loading

0 comments on commit 3e2202e

Please sign in to comment.