Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix test master selection and tests with timings issues for topics #183

Merged
merged 3 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 11 additions & 81 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
from karapace.utils import Client
from pathlib import Path
from subprocess import Popen
from tests.utils import KafkaConfig, KafkaServers, new_random_name
from tests.utils import (
Expiration, get_random_port, KAFKA_PORT_RANGE, KafkaConfig, KafkaServers, new_random_name, REGISTRY_PORT_RANGE,
ZK_PORT_RANGE
)
from typing import AsyncIterator, Dict, Iterator, List, Optional, Tuple

import json
import os
import pytest
import random
import signal
import socket
import time
Expand All @@ -32,56 +34,6 @@
KAFKA_WAIT_TIMEOUT = 60


@dataclass(frozen=True)
class PortRangeInclusive:
start: int
end: int

PRIVILEGE_END = 2 ** 10
MAX_PORTS = 2 ** 16 - 1

def __post_init__(self):
# Make sure the range is valid and that we don't need to be root
assert self.end > self.start, "there must be at least one port available"
assert self.end <= self.MAX_PORTS, f"end must be lower than {self.MAX_PORTS}"
assert self.start > self.PRIVILEGE_END, "start must not be a privileged port"

def next_range(self, number_of_ports: int) -> "PortRangeInclusive":
next_start = self.end + 1
next_end = next_start + number_of_ports - 1 # -1 because the range is inclusive

return PortRangeInclusive(next_start, next_end)


# To find a good port range use the following:
#
# curl --silent 'https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt' | \
# egrep -i -e '^\s*[0-9]+-[0-9]+\s*unassigned' | \
# awk '{print $1}'
#
KAFKA_PORTS = PortRangeInclusive(48700, 48800)
ZK_PORT_RANGE = KAFKA_PORTS.next_range(100)
REGISTRY_PORT_RANGE = ZK_PORT_RANGE.next_range(100)


class Timeout(Exception):
pass


@dataclass(frozen=True)
class Expiration:
msg: str
deadline: float

@classmethod
def from_timeout(cls, msg: str, timeout: float):
return cls(msg, time.monotonic() + timeout)

def raise_if_expired(self):
if time.monotonic() > self.deadline:
raise Timeout(self.msg)


@dataclass
class ZKConfig:
client_port: int
Expand Down Expand Up @@ -119,14 +71,12 @@ def port_is_listening(hostname: str, port: int, ipv6: bool) -> bool:

def wait_for_kafka(kafka_servers: KafkaServers, wait_time) -> None:
for server in kafka_servers.bootstrap_servers:
expiration = Expiration.from_timeout(
msg=f"Could not contact kafka cluster on host `{server}`",
timeout=wait_time,
)
expiration = Expiration.from_timeout(timeout=wait_time)

list_topics_successful = False
msg = f"Could not contact kafka cluster on host `{server}`"
while not list_topics_successful:
expiration.raise_if_expired()
expiration.raise_if_expired(msg)
try:
KafkaRestAdminClient(bootstrap_servers=server).cluster_metadata()
# ValueError:
Expand All @@ -145,37 +95,17 @@ def wait_for_kafka(kafka_servers: KafkaServers, wait_time) -> None:

def wait_for_port(port: int, *, hostname: str = "127.0.0.1", wait_time: float = 20.0, ipv6: bool = False) -> None:
start_time = time.monotonic()
expiration = Expiration(
msg=f"Timeout waiting for `{hostname}:{port}`",
deadline=start_time + wait_time,
)
expiration = Expiration(deadline=start_time + wait_time)
msg = f"Timeout waiting for `{hostname}:{port}`"

while not port_is_listening(hostname, port, ipv6):
expiration.raise_if_expired()
expiration.raise_if_expired(msg)
time.sleep(2.0)

elapsed = time.monotonic() - start_time
print(f"Server `{hostname}:{port}` listening after {elapsed} seconds")


def get_random_port(*, port_range: PortRangeInclusive, blacklist: List[int]) -> int:
""" Find a random port in the range `PortRangeInclusive`.

Note:
This function is *not* aware of the ports currently open in the system,
the blacklist only prevents two services of the same type to randomly
get the same ports for *a single test run*.

Because of that, the port range should be chosen such that there is no
system service in the range. Also note that running two sessions of the
tests with the same range is not supported and will lead to flakiness.
"""
value = random.randint(port_range.start, port_range.end)
while value in blacklist:
value = random.randint(port_range.start, port_range.end)
return value


def lock_path_for(path: Path) -> Path:
""" Append .lock to path """
suffixes = path.suffixes
Expand Down Expand Up @@ -423,7 +353,7 @@ def configure_and_start_kafka(kafka_dir: Path, zk: ZKConfig) -> Tuple[KafkaConfi
data_dir.mkdir(parents=True)
config_dir.mkdir(parents=True)

plaintext_port = get_random_port(port_range=KAFKA_PORTS, blacklist=[])
plaintext_port = get_random_port(port_range=KAFKA_PORT_RANGE, blacklist=[])

config = KafkaConfig(
datadir=str(data_dir),
Expand Down
91 changes: 50 additions & 41 deletions tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
Copyright (c) 2019 Aiven Ltd
See LICENSE for details
"""
from contextlib import closing
from karapace.config import set_config_defaults
from karapace.master_coordinator import MasterCoordinator
from tests.utils import KafkaServers
from typing import Optional
from tests.utils import get_random_port, KafkaServers, new_random_name, TESTS_PORT_RANGE

import asyncio
import json
Expand Down Expand Up @@ -41,46 +41,55 @@ def has_master(mc: MasterCoordinator) -> bool:
return bool(mc.sc and not mc.sc.master and mc.sc.master_url)


@pytest.mark.timeout(60)
@pytest.mark.timeout(60) # Github workflows need a bit of extra time
@pytest.mark.parametrize("strategy", ["lowest", "highest"])
def test_master_selection(kafka_servers: Optional[KafkaServers], strategy: str) -> None:
config_aa = set_config_defaults({})
config_aa["advertised_hostname"] = "127.0.0.1"
config_aa["bootstrap_uri"] = kafka_servers.bootstrap_servers
config_aa["client_id"] = "aa"
config_aa["port"] = 1234
config_aa["master_election_strategy"] = strategy
mc_aa = init_admin(config_aa)
config_bb = set_config_defaults({})
config_bb["advertised_hostname"] = "127.0.0.1"
config_bb["bootstrap_uri"] = kafka_servers.bootstrap_servers
config_bb["client_id"] = "bb"
config_bb["port"] = 5678
config_bb["master_election_strategy"] = strategy
mc_bb = init_admin(config_bb)

if strategy == "lowest":
master = mc_aa
slave = mc_bb
else:
master = mc_bb
slave = mc_aa

# Wait for the election to happen
while not is_master(master):
time.sleep(0.3)

while not has_master(slave):
time.sleep(0.3)

# Make sure the end configuration is as expected
master_url = f'http://{master.config["host"]}:{master.config["port"]}'
assert master.sc.election_strategy == strategy
assert slave.sc.election_strategy == strategy
assert master.sc.master_url == master_url
assert slave.sc.master_url == master_url
mc_aa.close()
mc_bb.close()
def test_master_selection(kafka_servers: KafkaServers, strategy: str) -> None:
# Use random port to allow for parallel runs.
port1 = get_random_port(port_range=TESTS_PORT_RANGE, blacklist=[])
port2 = get_random_port(port_range=TESTS_PORT_RANGE, blacklist=[port1])
port_aa, port_bb = sorted((port1, port2))
client_id_aa = new_random_name("master_selection_aa_")
client_id_bb = new_random_name("master_selection_bb_")
group_id = new_random_name("group_id")

config_aa = set_config_defaults({
"advertised_hostname": "127.0.0.1",
"bootstrap_uri": kafka_servers.bootstrap_servers,
"client_id": client_id_aa,
"group_id": group_id,
"port": port_aa,
"master_election_strategy": strategy,
})
config_bb = set_config_defaults({
"advertised_hostname": "127.0.0.1",
"bootstrap_uri": kafka_servers.bootstrap_servers,
"client_id": client_id_bb,
"group_id": group_id,
"port": port_bb,
"master_election_strategy": strategy,
})

with closing(init_admin(config_aa)) as mc_aa, closing(init_admin(config_bb)) as mc_bb:
if strategy == "lowest":
master = mc_aa
slave = mc_bb
else:
master = mc_bb
slave = mc_aa

# Wait for the election to happen
while not is_master(master):
time.sleep(0.3)

while not has_master(slave):
time.sleep(0.3)

# Make sure the end configuration is as expected
master_url = f'http://{master.config["host"]}:{master.config["port"]}'
assert master.sc.election_strategy == strategy
assert slave.sc.election_strategy == strategy
assert master.sc.master_url == master_url
assert slave.sc.master_url == master_url


async def test_schema_request_forwarding(registry_async_pair):
Expand Down
19 changes: 12 additions & 7 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from kafka.errors import UnknownTopicOrPartitionError
from pytest import raises
from tests.utils import new_topic, REST_HEADERS, schema_avro_json, second_obj, second_schema_json, test_objects_avro
from tests.utils import (
new_topic, REST_HEADERS, schema_avro_json, second_obj, second_schema_json, test_objects_avro, wait_for_topics
)

NEW_TOPIC_TIMEOUT = 10


def check_successful_publish_response(success_response, objects, partition_id=None):
Expand All @@ -18,6 +22,7 @@ def check_successful_publish_response(success_response, objects, partition_id=No

async def test_content_types(rest_async_client, admin_client):
tn = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
valid_headers = [
"application/vnd.kafka.v1+json",
"application/vnd.kafka.binary.v1+json",
Expand Down Expand Up @@ -91,6 +96,7 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie
# pylint: disable=W0612
tn = new_topic(admin_client)
other_tn = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[tn, other_tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
header = REST_HEADERS["avro"]
# check succeeds with 1 record and brand new schema
res = await registry_async_client.post(f"subjects/{other_tn}/versions", json={"schema": second_schema_json})
Expand Down Expand Up @@ -118,7 +124,7 @@ async def test_avro_publish(rest_async_client, registry_async_client, admin_clie
# assert res.status == 422, f"Expecting schema {second_schema_json} to not match records {test_objects}"


def test_admin_client(admin_client, producer):
async def test_admin_client(admin_client, producer):
topic_names = [new_topic(admin_client) for i in range(10, 13)]
topic_info = admin_client.cluster_metadata()
retrieved_names = list(topic_info["topics"].keys())
Expand Down Expand Up @@ -178,11 +184,7 @@ async def test_internal(rest_async, admin_client):
async def test_topics(rest_async_client, admin_client):
topic_foo = "foo"
tn = new_topic(admin_client)
res = await rest_async_client.get("/topics")
assert res.ok, "Status code is not 200: %r" % res.status_code
current_topics = set(res.json())
assert topic_foo not in current_topics, f"Topic {topic_foo} should not exist"
assert {tn}.difference(current_topics) == set(), f"Retrieved topic names do not match: {current_topics}"
await wait_for_topics(rest_async_client, topic_names=[tn], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
res = await rest_async_client.get(f"/topics/{tn}")
assert res.ok, "Status code is not 200: %r" % res.status_code
data = res.json()
Expand All @@ -202,6 +204,7 @@ async def test_topics(rest_async_client, admin_client):

async def test_publish(rest_async_client, admin_client):
topic = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
topic_url = f"/topics/{topic}"
partition_url = f"/topics/{topic}/partitions/0"
# Proper Json / Binary
Expand All @@ -219,6 +222,7 @@ async def test_publish(rest_async_client, admin_client):

async def test_publish_malformed_requests(rest_async_client, admin_client):
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
for url in [f"/topics/{topic_name}", f"/topics/{topic_name}/partitions/0"]:
# Malformed schema ++ empty records
for js in [{"records": []}, {"foo": "bar"}, {"records": [{"valur": {"foo": "bar"}}]}]:
Expand Down Expand Up @@ -247,6 +251,7 @@ async def test_brokers(rest_async_client):
async def test_partitions(rest_async_client, admin_client, producer):
# TODO -> This seems to be the only combination accepted by the offsets endpoint
topic_name = new_topic(admin_client)
await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
header = {"Accept": "*/*", "Content-Type": "application/vnd.kafka.v2+json"}
all_partitions_res = await rest_async_client.get(f"/topics/{topic_name}/partitions")
assert all_partitions_res.ok, "Topic should exist"
Expand Down
37 changes: 28 additions & 9 deletions tests/integration/test_rest_consumer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from tests.utils import consumer_valid_payload, new_consumer, new_random_name, new_topic, REST_HEADERS, schema_data
from tests.utils import (
consumer_valid_payload, new_consumer, new_random_name, new_topic, repeat_until_successful_request, REST_HEADERS,
schema_data
)

import base64
import copy
Expand Down Expand Up @@ -186,14 +189,19 @@ async def test_offsets(rest_async_client, admin_client, trail):
)
assert res.ok, f"Unexpected response status for assignment {res}"

res = await rest_async_client.post(
offsets_path, json={"offsets": [{
await repeat_until_successful_request(
rest_async_client.post,
offsets_path,
json_data={"offsets": [{
"topic": topic_name,
"partition": 0,
"offset": 0
}]}, headers=header
"offset": 0,
}]},
headers=header,
error_msg="Unexpected response status for offset commit",
timeout=20,
sleep=1,
)
assert res.ok, f"Unexpected response status for offset commit {res}"

res = await rest_async_client.get(
offsets_path, headers=header, json={"partitions": [{
Expand Down Expand Up @@ -282,9 +290,20 @@ async def test_publish_consume_avro(rest_async_client, admin_client, trail, sche
res = await rest_async_client.post(assign_path, json=assign_payload, headers=header)
assert res.ok
publish_payload = schema_data[schema_type][1]
pl = {"value_schema": schema_data[schema_type][0], "records": [{"value": o} for o in publish_payload]}
res = await rest_async_client.post(f"topics/{tn}{trail}", json=pl, headers=header)
assert res.ok
await repeat_until_successful_request(
rest_async_client.post,
f"topics/{tn}{trail}",
json_data={
"value_schema": schema_data[schema_type][0],
"records": [{
"value": o
} for o in publish_payload]
},
headers=header,
error_msg="Unexpected response status for offset commit",
timeout=10,
sleep=1,
)
resp = await rest_async_client.get(consume_path, headers=header)
assert resp.ok, f"Expected a successful response: {resp}"
data = resp.json()
Expand Down
Loading