diff --git a/tests/integration/backup/test_session_timeout.py b/tests/integration/backup/test_session_timeout.py index b585e759a..f527c8c09 100644 --- a/tests/integration/backup/test_session_timeout.py +++ b/tests/integration/backup/test_session_timeout.py @@ -12,7 +12,6 @@ from tests.integration.conftest import create_kafka_server from tests.integration.utils.config import KafkaDescription from tests.integration.utils.kafka_server import KafkaServers -from tests.integration.utils.network import PortRangeInclusive import pytest @@ -26,7 +25,6 @@ @pytest.fixture(scope="function", name="kafka_server_session_timeout") def fixture_kafka_server( kafka_description: KafkaDescription, - port_range: PortRangeInclusive, tmp_path_factory: pytest.TempPathFactory, ): # use custom data and log dir to avoid conflict with other kafka servers @@ -40,7 +38,6 @@ def fixture_kafka_server( session_datadir, session_logdir, kafka_description, - port_range, kafka_config_extra, ) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index a4d97ddbb..04fbd7aa1 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -29,13 +29,13 @@ maybe_download_kafka, wait_for_kafka, ) -from tests.integration.utils.network import PortRangeInclusive +from tests.integration.utils.network import allocate_port from tests.integration.utils.process import stop_process, wait_for_port_subprocess from tests.integration.utils.rest_client import RetryRestClient from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk from tests.utils import repeat_until_successful_request -from typing import AsyncIterator, Iterator +from typing import AsyncGenerator, AsyncIterator, Iterator from urllib.parse import urlparse import asyncio @@ -45,7 +45,6 @@ import pytest import re import secrets -import string import time REPOSITORY_DIR = pathlib.Path(__file__).parent.parent.parent.absolute() @@ -67,29 +66,6 @@ def _clear_test_name(name: str) -> str: return re.sub(r"[\W]", "_", name)[:30] -@pytest.fixture(scope="session", name="port_range") -def fixture_port_range() -> PortRangeInclusive: - """Container used by other fixtures to register used ports""" - # To find a good port range use the following: - # - # curl --silent 'https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt' | \ - # egrep -i -e '^\s*[0-9]+-[0-9]+\s*unassigned' | \ - # awk '{print $1}' - # - start = 48700 - end = 49000 - - # Split the ports among the workers to prevent port reuse - worker_name = os.environ.get("PYTEST_XDIST_WORKER", "0") - worker_id = int(worker_name.lstrip(string.ascii_letters)) - worker_count = int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1")) - total_ports = end - start - ports_per_worker = total_ports // worker_count - start_worker = (ports_per_worker * worker_id) + start - end_worker = start_worker + ports_per_worker - 1 - return PortRangeInclusive(start_worker, end_worker) - - @pytest.fixture(scope="session", name="kafka_description") def fixture_kafka_description(request: SubRequest) -> KafkaDescription: kafka_version = request.config.getoption("kafka_version") or KAFKA_VERSION @@ -113,7 +89,6 @@ def fixture_kafka_server( session_datadir: Path, session_logdir: Path, kafka_description: KafkaDescription, - port_range: PortRangeInclusive, ) -> Iterator[KafkaServers]: bootstrap_servers = request.config.getoption("kafka_bootstrap_servers") @@ -127,7 +102,6 @@ def fixture_kafka_server( session_datadir, session_logdir, kafka_description, - port_range, ) @@ -135,7 +109,6 @@ def create_kafka_server( session_datadir: Path, session_logdir: Path, kafka_description: KafkaDescription, - port_range: PortRangeInclusive, kafka_properties: dict[str, int | str] | None = None, ) -> Iterator[KafkaServers]: if kafka_properties is None: @@ -151,9 +124,9 @@ def create_kafka_server( worker_id = os.environ.get("PYTEST_XDIST_WORKER") with ExitStack() as stack: - zk_client_port = stack.enter_context(port_range.allocate_port()) - zk_admin_port = stack.enter_context(port_range.allocate_port()) - kafka_plaintext_port = stack.enter_context(port_range.allocate_port()) + zk_client_port = stack.enter_context(allocate_port()) + zk_admin_port = stack.enter_context(allocate_port()) + kafka_plaintext_port = stack.enter_context(allocate_port()) with FileLock(str(lock_file)): if transfer_file.exists(): @@ -229,7 +202,7 @@ def create_kafka_server( @pytest.fixture(scope="function", name="producer") -def fixture_producer(kafka_servers: KafkaServers) -> KafkaProducer: +def fixture_producer(kafka_servers: KafkaServers) -> Iterator[KafkaProducer]: yield KafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers) @@ -259,7 +232,7 @@ def fixture_consumer( async def fixture_asyncproducer( kafka_servers: KafkaServers, loop: asyncio.AbstractEventLoop, -) -> Iterator[AsyncKafkaProducer]: +) -> AsyncGenerator[AsyncKafkaProducer, None]: asyncproducer = AsyncKafkaProducer(bootstrap_servers=kafka_servers.bootstrap_servers, loop=loop) await asyncproducer.start() yield asyncproducer @@ -270,7 +243,7 @@ async def fixture_asyncproducer( async def fixture_asyncconsumer( kafka_servers: KafkaServers, loop: asyncio.AbstractEventLoop, -) -> Iterator[AsyncKafkaConsumer]: +) -> AsyncGenerator[AsyncKafkaConsumer, None]: asyncconsumer = AsyncKafkaConsumer( bootstrap_servers=kafka_servers.bootstrap_servers, loop=loop, @@ -507,7 +480,6 @@ async def fixture_registry_async_pair( loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument session_logdir: Path, kafka_servers: KafkaServers, - port_range: PortRangeInclusive, ) -> AsyncIterator[list[str]]: """Starts a cluster of two Schema Registry servers and returns their URL endpoints.""" @@ -517,7 +489,6 @@ async def fixture_registry_async_pair( async with start_schema_registry_cluster( config_templates=[config1, config2], data_dir=session_logdir / _clear_test_name(request.node.name), - port_range=port_range, ) as endpoints: yield [server.endpoint.to_url() for server in endpoints] @@ -528,7 +499,6 @@ async def fixture_registry_cluster( loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument session_logdir: Path, kafka_servers: KafkaServers, - port_range: PortRangeInclusive, ) -> AsyncIterator[RegistryDescription]: # Do not start a registry when the user provided an external service. Doing # so would cause this node to join the existing group and participate in @@ -545,7 +515,6 @@ async def fixture_registry_cluster( async with start_schema_registry_cluster( config_templates=[config], data_dir=session_logdir / _clear_test_name(request.node.name), - port_range=port_range, ) as servers: yield servers[0] @@ -555,7 +524,7 @@ async def fixture_registry_async_client( request: SubRequest, registry_cluster: RegistryDescription, loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument -) -> Client: +) -> AsyncGenerator[Client, None]: client = Client( server_uri=registry_cluster.endpoint.to_url(), server_ca=request.config.getoption("server_ca"), @@ -612,7 +581,6 @@ async def fixture_registry_https_endpoint( kafka_servers: KafkaServers, server_cert: str, server_key: str, - port_range: PortRangeInclusive, ) -> AsyncIterator[str]: # Do not start a registry when the user provided an external service. Doing # so would cause this node to join the existing group and participate in @@ -631,7 +599,6 @@ async def fixture_registry_https_endpoint( async with start_schema_registry_cluster( config_templates=[config], data_dir=session_logdir / _clear_test_name(request.node.name), - port_range=port_range, ) as servers: yield servers[0].endpoint.to_url() @@ -671,7 +638,6 @@ async def fixture_registry_http_auth_endpoint( loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument session_logdir: Path, kafka_servers: KafkaServers, - port_range: PortRangeInclusive, ) -> AsyncIterator[str]: # Do not start a registry when the user provided an external service. Doing # so would cause this node to join the existing group and participate in @@ -689,7 +655,6 @@ async def fixture_registry_http_auth_endpoint( async with start_schema_registry_cluster( config_templates=[config], data_dir=session_logdir / _clear_test_name(request.node.name), - port_range=port_range, ) as servers: yield servers[0].endpoint.to_url() @@ -732,7 +697,6 @@ async def fixture_registry_async_auth_pair( loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument session_logdir: Path, kafka_servers: KafkaServers, - port_range: PortRangeInclusive, ) -> AsyncIterator[list[str]]: """Starts a cluster of two Schema Registry servers with authentication enabled and returns their URL endpoints.""" @@ -748,7 +712,6 @@ async def fixture_registry_async_auth_pair( async with start_schema_registry_cluster( config_templates=[config1, config2], data_dir=session_logdir / _clear_test_name(request.node.name), - port_range=port_range, ) as endpoints: yield [server.endpoint.to_url() for server in endpoints] diff --git a/tests/integration/test_karapace.py b/tests/integration/test_karapace.py index 65d99f128..c6352ecfd 100644 --- a/tests/integration/test_karapace.py +++ b/tests/integration/test_karapace.py @@ -2,11 +2,10 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from contextlib import ExitStack +from contextlib import closing, contextmanager, ExitStack from karapace.config import set_config_defaults from pathlib import Path from tests.integration.utils.kafka_server import KafkaServers -from tests.integration.utils.network import PortRangeInclusive from tests.integration.utils.process import stop_process from tests.utils import popen_karapace_all from typing import Iterator @@ -15,8 +14,15 @@ import socket +@contextmanager +def allocate_port_no_reuse() -> Iterator[int]: + """Allocate random free port and do not allow reuse.""" + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.bind(("127.0.0.1", 0)) + yield sock.getsockname()[1] + + def test_regression_server_must_exit_on_exception( - port_range: PortRangeInclusive, tmp_path: Path, kafka_servers: Iterator[KafkaServers], ) -> None: @@ -25,10 +31,10 @@ def test_regression_server_must_exit_on_exception( Karapace was not closing all its background threads, so when an exception was raised an reached the top-level, the webserver created by asyncio would be stopped but the threads would keep the server running. + Karapace exit on exception is done by setting a reserved port as server port. """ with ExitStack() as stack: - port = stack.enter_context(port_range.allocate_port()) - sock = stack.enter_context(socket.socket()) + port = stack.enter_context(allocate_port_no_reuse()) config = set_config_defaults( { @@ -42,7 +48,6 @@ def test_regression_server_must_exit_on_exception( logfile = stack.enter_context((tmp_path / "karapace.log").open("w")) errfile = stack.enter_context((tmp_path / "karapace.err").open("w")) config_path.write_text(json.dumps(config)) - sock.bind(("127.0.0.1", port)) process = popen_karapace_all(config_path, logfile, errfile) stack.callback(stop_process, process) # make sure to stop the process if the test fails assert process.wait(timeout=10) != 0, "Process should have exited with an error, port is already is use" diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index 9acdffdb4..3de98acca 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -7,7 +7,7 @@ from karapace.config import set_config_defaults from karapace.coordinator.master_coordinator import MasterCoordinator from tests.integration.utils.kafka_server import KafkaServers -from tests.integration.utils.network import PortRangeInclusive +from tests.integration.utils.network import allocate_port from tests.integration.utils.rest_client import RetryRestClient from tests.utils import new_random_name @@ -38,9 +38,9 @@ def has_master(mc: MasterCoordinator) -> bool: @pytest.mark.timeout(60) # Github workflows need a bit of extra time @pytest.mark.parametrize("strategy", ["lowest", "highest"]) -async def test_master_selection(port_range: PortRangeInclusive, kafka_servers: KafkaServers, strategy: str) -> None: +async def test_master_selection(kafka_servers: KafkaServers, strategy: str) -> None: # Use random port to allow for parallel runs. - with port_range.allocate_port() as port1, port_range.allocate_port() as port2: + with allocate_port() as port1, allocate_port() as port2: port_aa, port_bb = sorted((port1, port2)) client_id_aa = new_random_name("master_selection_aa_") client_id_bb = new_random_name("master_selection_bb_") @@ -99,7 +99,7 @@ async def test_master_selection(port_range: PortRangeInclusive, kafka_servers: K await mc_bb.close() -async def test_mixed_eligibility_for_primary_role(kafka_servers: KafkaServers, port_range: PortRangeInclusive) -> None: +async def test_mixed_eligibility_for_primary_role(kafka_servers: KafkaServers) -> None: """Test that primary selection works when mixed set of roles is configured for Karapace instances. The Kafka group coordinator leader can be any node, it has no relation to Karapace primary role eligibility. @@ -109,7 +109,7 @@ async def test_mixed_eligibility_for_primary_role(kafka_servers: KafkaServers, p client_id = new_random_name("master_selection_") group_id = new_random_name("group_id") - with port_range.allocate_port() as port1, port_range.allocate_port() as port2, port_range.allocate_port() as port3: + with allocate_port() as port1, allocate_port() as port2, allocate_port() as port3: config_primary = set_config_defaults( { "advertised_hostname": "127.0.0.1", @@ -166,11 +166,11 @@ async def test_mixed_eligibility_for_primary_role(kafka_servers: KafkaServers, p await primary.close() -async def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortRangeInclusive) -> None: +async def test_no_eligible_master(kafka_servers: KafkaServers) -> None: client_id = new_random_name("master_selection_") group_id = new_random_name("group_id") - with port_range.allocate_port() as port: + with allocate_port() as port: config_aa = set_config_defaults( { "advertised_hostname": "127.0.0.1", diff --git a/tests/integration/utils/cluster.py b/tests/integration/utils/cluster.py index 31c06e4bd..04560b453 100644 --- a/tests/integration/utils/cluster.py +++ b/tests/integration/utils/cluster.py @@ -2,14 +2,16 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from __future__ import annotations + from contextlib import asynccontextmanager, ExitStack from dataclasses import dataclass from karapace.config import Config, set_config_defaults, write_config from pathlib import Path -from tests.integration.utils.network import PortRangeInclusive +from tests.integration.utils.network import allocate_port from tests.integration.utils.process import stop_process, wait_for_port_subprocess from tests.utils import new_random_name, popen_karapace_all -from typing import AsyncIterator, List +from typing import AsyncIterator @dataclass(frozen=True) @@ -30,10 +32,9 @@ class RegistryDescription: @asynccontextmanager async def start_schema_registry_cluster( - config_templates: List[Config], + config_templates: list[Config], data_dir: Path, - port_range: PortRangeInclusive, -) -> AsyncIterator[List[RegistryDescription]]: +) -> AsyncIterator[list[RegistryDescription]]: """Start a cluster of schema registries, one process per `config_templates`.""" for template in config_templates: assert "bootstrap_uri" in template, "base_config must have the value `bootstrap_uri` set" @@ -67,7 +68,7 @@ async def start_schema_registry_cluster( ) actual_group_id = config.setdefault("group_id", group_id) - port = config.setdefault("port", stack.enter_context(port_range.allocate_port())) + port = config.setdefault("port", stack.enter_context(allocate_port())) assert isinstance(port, int), "Port must be an integer" group_dir = data_dir / str(actual_group_id) diff --git a/tests/integration/utils/network.py b/tests/integration/utils/network.py index e4f60e641..ef9439e1d 100644 --- a/tests/integration/utils/network.py +++ b/tests/integration/utils/network.py @@ -2,82 +2,19 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from contextlib import contextmanager +from contextlib import closing, contextmanager +from typing import Iterator -import platform -import psutil -import random import socket -def is_time_wait(port: int) -> bool: - """True if the port is still on TIME_WAIT state.""" - return any(conn.laddr.port == port for conn in psutil.net_connections(kind="inet")) - - -class PortRangeInclusive: - PRIVILEGE_END = 2**10 - MAX_PORTS = 2**16 - 1 - - def __init__( - self, - start: int, - end: int, - ) -> None: - # Make sure the range is valid and that we don't need to be root - assert end > start, "there must be at least one port available" - assert end <= self.MAX_PORTS, f"end must be lower than {self.MAX_PORTS}" - assert start > self.PRIVILEGE_END, "start must not be a privileged port" - - self.start = start - self.end = end - self._maybe_available = range(start, end + 1) - self._allocated = set() - - def next_range(self, number_of_ports: int) -> "PortRangeInclusive": - next_start = self.end - next_end = next_start + number_of_ports - return PortRangeInclusive(next_start, next_end) - - @contextmanager - def allocate_port(self) -> int: - """Find a random port in the range `PortRangeInclusive`. - - Note: - This function is *not* aware of the ports currently open in the system, - the blacklist only prevents two services of the same type to randomly - get the same ports for *a single test run*. - - Because of that, the port range should be chosen such that there is no - system service in the range. Also note that running two sessions of the - tests with the same range is not supported and will lead to flakiness. - """ - if len(self._maybe_available) == 0: - raise RuntimeError(f"No free ports available. start: {self.start} end: {self.end}") - - unallocated = [port for port in self._maybe_available if port not in self._allocated] - - if platform.platform().lower().startswith("linux"): - filtered_ports = (port for port in unallocated if not is_time_wait(port)) - try: - port = next(filtered_ports) - except StopIteration as e: - raise RuntimeError( - f"No free ports available. start: {self.start} end: {self.end} time_wait: {unallocated}" - ) from e - else: - # psutil.net_connections requires running as privileged user on Macos, so we - # put our trust in entropy instead. - port = random.choice(unallocated) - - self._allocated.add(port) - - try: - yield port - finally: - # Remove the port at the end, this is a hack to give extra time for a TIME_WAIT socket to - # close, but it is not sufficient. - self._allocated.remove(port) +@contextmanager +def allocate_port() -> Iterator[int]: + """Allocate random free port.""" + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.bind(("127.0.0.1", 0)) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + yield sock.getsockname()[1] def port_is_listening(hostname: str, port: int, ipv6: bool) -> bool: