diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index cc86a3c36..a4d97ddbb 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -31,6 +31,7 @@ ) from tests.integration.utils.network import PortRangeInclusive from tests.integration.utils.process import stop_process, wait_for_port_subprocess +from tests.integration.utils.rest_client import RetryRestClient from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk from tests.utils import repeat_until_successful_request @@ -576,6 +577,11 @@ async def fixture_registry_async_client( await client.close() +@pytest.fixture(scope="function", name="registry_async_retry_client") +async def fixture_registry_async_retry_client(registry_async_client: Client) -> RetryRestClient: + return RetryRestClient(registry_async_client) + + @pytest.fixture(scope="function", name="credentials_folder") def fixture_credentials_folder() -> str: integration_test_folder = os.path.dirname(__file__) @@ -715,6 +721,11 @@ async def fixture_registry_async_client_auth( await client.close() +@pytest.fixture(scope="function", name="registry_async_retry_client_auth") +async def fixture_registry_async_retry_client_auth(registry_async_client_auth: Client) -> RetryRestClient: + return RetryRestClient(registry_async_client_auth) + + @pytest.fixture(scope="function", name="registry_async_auth_pair") async def fixture_registry_async_auth_pair( request: SubRequest, diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index 225539f8d..9acdffdb4 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -8,12 +8,12 @@ from karapace.coordinator.master_coordinator import MasterCoordinator from tests.integration.utils.kafka_server import KafkaServers from tests.integration.utils.network import PortRangeInclusive +from tests.integration.utils.rest_client import RetryRestClient from tests.utils import new_random_name import asyncio import json import pytest -import requests async def init_admin(config): @@ -195,7 +195,10 @@ async def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortR await mc.close() -async def test_schema_request_forwarding(registry_async_pair): +async def test_schema_request_forwarding( + registry_async_pair, + registry_async_retry_client: RetryRestClient, +) -> None: master_url, slave_url = registry_async_pair max_tries, counter = 5, 0 wait_time = 0.5 @@ -209,11 +212,11 @@ async def test_schema_request_forwarding(registry_async_pair): else: path = "config" for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]: - resp = requests.put(f"{slave_url}/{path}", json={"compatibility": compat}) + resp = await registry_async_retry_client.put(f"{slave_url}/{path}", json={"compatibility": compat}) assert resp.ok while True: assert counter < max_tries, "Compat update not propagated" - resp = requests.get(f"{master_url}/{path}") + resp = await registry_async_retry_client.get(f"{master_url}/{path}") if not resp.ok: print(f"Invalid http status code: {resp.status_code}") continue @@ -232,14 +235,16 @@ async def test_schema_request_forwarding(registry_async_pair): # New schema updates, last compatibility is None for s in [schema, other_schema]: - resp = requests.post(f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)}) + resp = await registry_async_retry_client.post( + f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)} + ) assert resp.ok data = resp.json() assert "id" in data, data counter = 0 while True: assert counter < max_tries, "Subject schema data not propagated yet" - resp = requests.get(f"{master_url}/subjects/{subject}/versions") + resp = await registry_async_retry_client.get(f"{master_url}/subjects/{subject}/versions") if not resp.ok: print(f"Invalid http status code: {resp.status_code}") counter += 1 @@ -255,12 +260,14 @@ async def test_schema_request_forwarding(registry_async_pair): break # Schema deletions - resp = requests.delete(f"{slave_url}/subjects/{subject}/versions/1") + resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}/versions/1") assert resp.ok counter = 0 while True: assert counter < max_tries, "Subject version deletion not propagated yet" - resp = requests.get(f"{master_url}/subjects/{subject}/versions/1") + resp = await registry_async_retry_client.get( + f"{master_url}/subjects/{subject}/versions/1", expected_response_code=404 + ) if resp.ok: print(f"Subject {subject} still has version 1 on master") counter += 1 @@ -270,16 +277,16 @@ async def test_schema_request_forwarding(registry_async_pair): break # Subject deletion - resp = requests.get(f"{master_url}/subjects/") + resp = await registry_async_retry_client.get(f"{master_url}/subjects/") assert resp.ok data = resp.json() assert subject in data - resp = requests.delete(f"{slave_url}/subjects/{subject}") + resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}") assert resp.ok counter = 0 while True: assert counter < max_tries, "Subject deletion not propagated yet" - resp = requests.get(f"{master_url}/subjects/") + resp = await registry_async_retry_client.get(f"{master_url}/subjects/") if not resp.ok: print("Could not retrieve subject list on master") counter += 1 diff --git a/tests/integration/test_schema_registry_auth.py b/tests/integration/test_schema_registry_auth.py index 1305c5cbf..ca75d6fdb 100644 --- a/tests/integration/test_schema_registry_auth.py +++ b/tests/integration/test_schema_registry_auth.py @@ -4,9 +4,9 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace.client import Client from karapace.kafka.admin import KafkaAdminClient from karapace.schema_models import SchemaType, ValidatedTypedSchema +from tests.integration.utils.rest_client import RetryRestClient from tests.utils import ( new_random_name, new_topic, @@ -29,22 +29,26 @@ reader = aiohttp.BasicAuth("reader", "secret") -async def test_sr_auth(registry_async_client_auth: Client) -> None: +async def test_sr_auth(registry_async_retry_client_auth: RetryRestClient) -> None: subject = new_random_name("cave-") - res = await registry_async_client_auth.post(f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}) + res = await registry_async_retry_client_auth.post( + f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, expected_response_code=401 + ) assert res.status_code == 401 - res = await registry_async_client_auth.post( + res = await registry_async_retry_client_auth.post( f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, auth=aladdin ) assert res.status_code == 200 sc_id = res.json()["id"] assert sc_id >= 0 - res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/latest") + res = await registry_async_retry_client_auth.get( + f"subjects/{quote(subject)}/versions/latest", expected_response_code=401 + ) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/latest", auth=aladdin) + res = await registry_async_retry_client_auth.get(f"subjects/{quote(subject)}/versions/latest", auth=aladdin) assert res.status_code == 200 assert sc_id == res.json()["id"] assert ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json) == ValidatedTypedSchema.parse( @@ -52,116 +56,130 @@ async def test_sr_auth(registry_async_client_auth: Client) -> None: ) -async def test_sr_auth_endpoints(registry_async_client_auth: Client) -> None: +async def test_sr_auth_endpoints(registry_async_retry_client_auth: RetryRestClient) -> None: """Test endpoints for authorization""" subject = new_random_name("any-") - res = await registry_async_client_auth.post( - f"compatibility/subjects/{quote(subject)}/versions/1", json={"schema": schema_avro_json} + res = await registry_async_retry_client_auth.post( + f"compatibility/subjects/{quote(subject)}/versions/1", + json={"schema": schema_avro_json}, + expected_response_code=401, ) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"config/{quote(subject)}") + res = await registry_async_retry_client_auth.get(f"config/{quote(subject)}", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.put(f"config/{quote(subject)}", json={"compatibility": "NONE"}) + res = await registry_async_retry_client_auth.put( + f"config/{quote(subject)}", + json={"compatibility": "NONE"}, + expected_response_code=401, + ) assert res.status_code == 401 - res = await registry_async_client_auth.get("config") + res = await registry_async_retry_client_auth.get("config", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.put("config", json={"compatibility": "NONE"}) + res = await registry_async_retry_client_auth.put("config", json={"compatibility": "NONE"}, expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.get("schemas/ids/1/versions") + res = await registry_async_retry_client_auth.get("schemas/ids/1/versions", expected_response_code=401) assert res.status_code == 401 # This is an exception that does not require authorization - res = await registry_async_client_auth.get("schemas/types") + res = await registry_async_retry_client_auth.get("schemas/types") assert res.status_code == 200 # but let's verify it answers normally if sending authorization header - res = await registry_async_client_auth.get("schemas/types", auth=admin) + res = await registry_async_retry_client_auth.get("schemas/types", auth=admin) assert res.status_code == 200 - res = await registry_async_client_auth.post(f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}) + res = await registry_async_retry_client_auth.post( + f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, expected_response_code=401 + ) assert res.status_code == 401 - res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}/versions/1") + res = await registry_async_retry_client_auth.delete(f"subjects/{quote(subject)}/versions/1", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/1/schema") + res = await registry_async_retry_client_auth.get( + f"subjects/{quote(subject)}/versions/1/schema", expected_response_code=401 + ) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/1/referencedby") + res = await registry_async_retry_client_auth.get( + f"subjects/{quote(subject)}/versions/1/referencedby", expected_response_code=401 + ) assert res.status_code == 401 - res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}") + res = await registry_async_retry_client_auth.delete(f"subjects/{quote(subject)}", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.get("mode") + res = await registry_async_retry_client_auth.get("mode", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"mode/{quote(subject)}") + res = await registry_async_retry_client_auth.get(f"mode/{quote(subject)}", expected_response_code=401) assert res.status_code == 401 -async def test_sr_list_subjects(registry_async_client_auth: Client) -> None: +async def test_sr_list_subjects(registry_async_retry_client_auth: RetryRestClient) -> None: cavesubject = new_random_name("cave-") carpetsubject = new_random_name("carpet-") - res = await registry_async_client_auth.post( + res = await registry_async_retry_client_auth.post( f"subjects/{quote(cavesubject)}/versions", json={"schema": schema_avro_json}, auth=aladdin ) assert res.status_code == 200 sc_id = res.json()["id"] assert sc_id >= 0 - res = await registry_async_client_auth.post( + res = await registry_async_retry_client_auth.post( f"subjects/{quote(carpetsubject)}/versions", json={"schema": schema_avro_json}, auth=admin ) assert res.status_code == 200 - res = await registry_async_client_auth.get("subjects", auth=admin) + res = await registry_async_retry_client_auth.get("subjects", auth=admin) assert res.status_code == 200 assert [cavesubject, carpetsubject] == res.json() - res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions") + res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", expected_response_code=401) assert res.status_code == 401 - res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=admin) + res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=admin) assert res.status_code == 200 assert [sc_id] == res.json() - res = await registry_async_client_auth.get("subjects", auth=aladdin) + res = await registry_async_retry_client_auth.get("subjects", auth=aladdin) assert res.status_code == 200 assert [cavesubject] == res.json() - res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=aladdin) + res = await registry_async_retry_client_auth.get( + f"subjects/{quote(carpetsubject)}/versions", auth=aladdin, expected_response_code=403 + ) assert res.status_code == 403 - res = await registry_async_client_auth.get("subjects", auth=reader) + res = await registry_async_retry_client_auth.get("subjects", auth=reader) assert res.status_code == 200 assert [carpetsubject] == res.json() - res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=reader) + res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=reader) assert res.status_code == 200 assert [1] == res.json() -async def test_sr_ids(registry_async_client_auth: Client) -> None: +async def test_sr_ids(registry_async_retry_client_auth: RetryRestClient) -> None: cavesubject = new_random_name("cave-") carpetsubject = new_random_name("carpet-") - res = await registry_async_client_auth.post( + res = await registry_async_retry_client_auth.post( f"subjects/{quote(cavesubject)}/versions", json={"schema": schema_avro_json}, auth=aladdin ) assert res.status_code == 200 avro_sc_id = res.json()["id"] assert avro_sc_id >= 0 - res = await registry_async_client_auth.post( + res = await registry_async_retry_client_auth.post( f"subjects/{quote(carpetsubject)}/versions", json={"schemaType": "JSON", "schema": schema_jsonschema_json}, auth=admin, @@ -170,22 +188,26 @@ async def test_sr_ids(registry_async_client_auth: Client) -> None: jsonschema_sc_id = res.json()["id"] assert jsonschema_sc_id >= 0 - res = await registry_async_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=aladdin) + res = await registry_async_retry_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=aladdin) assert res.status_code == 200 - res = await registry_async_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=aladdin) + res = await registry_async_retry_client_auth.get( + f"schemas/ids/{jsonschema_sc_id}", auth=aladdin, expected_response_code=404 + ) assert res.status_code == 404 assert {"error_code": 40403, "message": "Schema not found"} == res.json() - res = await registry_async_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=reader) + res = await registry_async_retry_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=reader, expected_response_code=404) assert res.status_code == 404 assert {"error_code": 40403, "message": "Schema not found"} == res.json() - res = await registry_async_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=reader) + res = await registry_async_retry_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=reader) assert res.status_code == 200 -async def test_sr_auth_forwarding(registry_async_auth_pair: List[str]) -> None: +async def test_sr_auth_forwarding( + registry_async_auth_pair: List[str], registry_async_retry_client_auth: RetryRestClient +) -> None: auth = requests.auth.HTTPBasicAuth("admin", "admin") # Test primary/replica forwarding with global config setting @@ -193,7 +215,7 @@ async def test_sr_auth_forwarding(registry_async_auth_pair: List[str]) -> None: max_tries, counter = 5, 0 wait_time = 0.5 for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]: - resp = requests.put(f"{replica_url}/config", json={"compatibility": compat}, auth=auth) + resp = await registry_async_retry_client_auth.put(f"{replica_url}/config", json={"compatibility": compat}, auth=auth) assert resp.ok while True: assert counter < max_tries, "Compat update not propagated" @@ -213,7 +235,9 @@ async def test_sr_auth_forwarding(registry_async_auth_pair: List[str]) -> None: # Test that Kafka REST API works when configured with Schema Registry requiring authorization -async def test_rest_api_with_sr_auth(rest_async_client_registry_auth: Client, admin_client: KafkaAdminClient) -> None: +async def test_rest_api_with_sr_auth( + rest_async_client_registry_auth: RetryRestClient, admin_client: KafkaAdminClient +) -> None: client = rest_async_client_registry_auth topic = new_topic(admin_client, prefix="cave-rest-") diff --git a/tests/integration/utils/rest_client.py b/tests/integration/utils/rest_client.py new file mode 100644 index 000000000..05539a34f --- /dev/null +++ b/tests/integration/utils/rest_client.py @@ -0,0 +1,81 @@ +""" +karapace - Test rest client with retries + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from aiohttp import BasicAuth +from collections.abc import Mapping +from karapace.client import Client, Headers, Path, Result +from karapace.typing import JsonData +from tenacity import retry, stop_after_attempt, wait_fixed +from typing import Final + +RETRY_WAIT_SECONDS: Final = 0.5 + + +class UnexpectedResponseStatus(Exception): + pass + + +class RetryRestClient: + def __init__(self, client: Client): + self._karapace_client = client + + @retry(stop=stop_after_attempt(5), wait=wait_fixed(RETRY_WAIT_SECONDS)) + async def get( + self, + path: Path, + json: JsonData | None = None, + headers: Headers | None = None, + auth: BasicAuth | None = None, + params: Mapping[str, str] | None = None, + expected_response_code: int = 200, + ) -> Result: + response: Result = await self._karapace_client.get(path, headers=headers, json=json, auth=auth, params=params) + if response.status_code != expected_response_code: + raise UnexpectedResponseStatus(f"Unexpected status code: {response!r}") + return response + + @retry(stop=stop_after_attempt(5), wait=wait_fixed(RETRY_WAIT_SECONDS)) + async def delete( + self, + path: Path, + headers: Headers | None = None, + auth: BasicAuth | None = None, + expected_response_code: int = 200, + ) -> Result: + response: Result = await self._karapace_client.delete(path=path, headers=headers, auth=auth) + if response.status_code != expected_response_code: + raise UnexpectedResponseStatus(f"Unexpected status code: {response!r}") + return response + + @retry(stop=stop_after_attempt(5), wait=wait_fixed(RETRY_WAIT_SECONDS)) + async def post( + self, + path: Path, + json: JsonData, + headers: Headers | None = None, + auth: BasicAuth | None = None, + expected_response_code: int = 200, + ) -> Result: + response: Result = await self._karapace_client.post(path=path, json=json, headers=headers, auth=auth) + if response.status_code != expected_response_code: + raise UnexpectedResponseStatus(f"Unexpected status code: {response!r}") + return response + + @retry(stop=stop_after_attempt(5), wait=wait_fixed(RETRY_WAIT_SECONDS)) + async def put( + self, + path: Path, + json: JsonData, + headers: Headers | None = None, + auth: BasicAuth | None = None, + expected_response_code: int = 200, + ) -> Result: + response: Result = await self._karapace_client.put(path=path, json=json, headers=headers, auth=auth) + if response.status_code != expected_response_code: + raise UnexpectedResponseStatus(f"Unexpected status code: {response!r}") + return response