Skip to content

Commit

Permalink
chore: add retry to tests requiring forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Sep 19, 2024
1 parent 8f8d50c commit 9051154
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 54 deletions.
11 changes: 11 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from tests.integration.utils.network import PortRangeInclusive
from tests.integration.utils.process import stop_process, wait_for_port_subprocess
from tests.integration.utils.rest_client import RetryRestClient
from tests.integration.utils.synchronization import lock_path_for
from tests.integration.utils.zookeeper import configure_and_start_zk
from tests.utils import repeat_until_successful_request
Expand Down Expand Up @@ -576,6 +577,11 @@ async def fixture_registry_async_client(
await client.close()


@pytest.fixture(scope="function", name="registry_async_retry_client")
async def fixture_registry_async_retry_client(registry_async_client: Client) -> RetryRestClient:
return RetryRestClient(registry_async_client)


@pytest.fixture(scope="function", name="credentials_folder")
def fixture_credentials_folder() -> str:
integration_test_folder = os.path.dirname(__file__)
Expand Down Expand Up @@ -715,6 +721,11 @@ async def fixture_registry_async_client_auth(
await client.close()


@pytest.fixture(scope="function", name="registry_async_retry_client_auth")
async def fixture_registry_async_retry_client_auth(registry_async_client_auth: Client) -> RetryRestClient:
return RetryRestClient(registry_async_client_auth)


@pytest.fixture(scope="function", name="registry_async_auth_pair")
async def fixture_registry_async_auth_pair(
request: SubRequest,
Expand Down
29 changes: 18 additions & 11 deletions tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from karapace.coordinator.master_coordinator import MasterCoordinator
from tests.integration.utils.kafka_server import KafkaServers
from tests.integration.utils.network import PortRangeInclusive
from tests.integration.utils.rest_client import RetryRestClient
from tests.utils import new_random_name

import asyncio
import json
import pytest
import requests


async def init_admin(config):
Expand Down Expand Up @@ -195,7 +195,10 @@ async def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortR
await mc.close()


async def test_schema_request_forwarding(registry_async_pair):
async def test_schema_request_forwarding(
registry_async_pair,
registry_async_retry_client: RetryRestClient,
) -> None:
master_url, slave_url = registry_async_pair
max_tries, counter = 5, 0
wait_time = 0.5
Expand All @@ -209,11 +212,11 @@ async def test_schema_request_forwarding(registry_async_pair):
else:
path = "config"
for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]:
resp = requests.put(f"{slave_url}/{path}", json={"compatibility": compat})
resp = await registry_async_retry_client.put(f"{slave_url}/{path}", json={"compatibility": compat})
assert resp.ok
while True:
assert counter < max_tries, "Compat update not propagated"
resp = requests.get(f"{master_url}/{path}")
resp = await registry_async_retry_client.get(f"{master_url}/{path}")
if not resp.ok:
print(f"Invalid http status code: {resp.status_code}")
continue
Expand All @@ -232,14 +235,16 @@ async def test_schema_request_forwarding(registry_async_pair):

# New schema updates, last compatibility is None
for s in [schema, other_schema]:
resp = requests.post(f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)})
resp = await registry_async_retry_client.post(
f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)}
)
assert resp.ok
data = resp.json()
assert "id" in data, data
counter = 0
while True:
assert counter < max_tries, "Subject schema data not propagated yet"
resp = requests.get(f"{master_url}/subjects/{subject}/versions")
resp = await registry_async_retry_client.get(f"{master_url}/subjects/{subject}/versions")
if not resp.ok:
print(f"Invalid http status code: {resp.status_code}")
counter += 1
Expand All @@ -255,12 +260,14 @@ async def test_schema_request_forwarding(registry_async_pair):
break

# Schema deletions
resp = requests.delete(f"{slave_url}/subjects/{subject}/versions/1")
resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}/versions/1")
assert resp.ok
counter = 0
while True:
assert counter < max_tries, "Subject version deletion not propagated yet"
resp = requests.get(f"{master_url}/subjects/{subject}/versions/1")
resp = await registry_async_retry_client.get(
f"{master_url}/subjects/{subject}/versions/1", expected_response_code=404
)
if resp.ok:
print(f"Subject {subject} still has version 1 on master")
counter += 1
Expand All @@ -270,16 +277,16 @@ async def test_schema_request_forwarding(registry_async_pair):
break

# Subject deletion
resp = requests.get(f"{master_url}/subjects/")
resp = await registry_async_retry_client.get(f"{master_url}/subjects/")
assert resp.ok
data = resp.json()
assert subject in data
resp = requests.delete(f"{slave_url}/subjects/{subject}")
resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}")
assert resp.ok
counter = 0
while True:
assert counter < max_tries, "Subject deletion not propagated yet"
resp = requests.get(f"{master_url}/subjects/")
resp = await registry_async_retry_client.get(f"{master_url}/subjects/")
if not resp.ok:
print("Could not retrieve subject list on master")
counter += 1
Expand Down
110 changes: 67 additions & 43 deletions tests/integration/test_schema_registry_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from karapace.client import Client
from karapace.kafka.admin import KafkaAdminClient
from karapace.schema_models import SchemaType, ValidatedTypedSchema
from tests.integration.utils.rest_client import RetryRestClient
from tests.utils import (
new_random_name,
new_topic,
Expand All @@ -29,139 +29,157 @@
reader = aiohttp.BasicAuth("reader", "secret")


async def test_sr_auth(registry_async_client_auth: Client) -> None:
async def test_sr_auth(registry_async_retry_client_auth: RetryRestClient) -> None:
subject = new_random_name("cave-")

res = await registry_async_client_auth.post(f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json})
res = await registry_async_retry_client_auth.post(
f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, expected_response_code=401
)
assert res.status_code == 401

res = await registry_async_client_auth.post(
res = await registry_async_retry_client_auth.post(
f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, auth=aladdin
)
assert res.status_code == 200
sc_id = res.json()["id"]
assert sc_id >= 0

res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/latest")
res = await registry_async_retry_client_auth.get(
f"subjects/{quote(subject)}/versions/latest", expected_response_code=401
)
assert res.status_code == 401
res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/latest", auth=aladdin)
res = await registry_async_retry_client_auth.get(f"subjects/{quote(subject)}/versions/latest", auth=aladdin)
assert res.status_code == 200
assert sc_id == res.json()["id"]
assert ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json) == ValidatedTypedSchema.parse(
SchemaType.AVRO, res.json()["schema"]
)


async def test_sr_auth_endpoints(registry_async_client_auth: Client) -> None:
async def test_sr_auth_endpoints(registry_async_retry_client_auth: RetryRestClient) -> None:
"""Test endpoints for authorization"""

subject = new_random_name("any-")

res = await registry_async_client_auth.post(
f"compatibility/subjects/{quote(subject)}/versions/1", json={"schema": schema_avro_json}
res = await registry_async_retry_client_auth.post(
f"compatibility/subjects/{quote(subject)}/versions/1",
json={"schema": schema_avro_json},
expected_response_code=401,
)
assert res.status_code == 401

res = await registry_async_client_auth.get(f"config/{quote(subject)}")
res = await registry_async_retry_client_auth.get(f"config/{quote(subject)}", expected_response_code=401)
assert res.status_code == 401

res = await registry_async_client_auth.put(f"config/{quote(subject)}", json={"compatibility": "NONE"})
res = await registry_async_retry_client_auth.put(
f"config/{quote(subject)}",
json={"compatibility": "NONE"},
expected_response_code=401,
)
assert res.status_code == 401

res = await registry_async_client_auth.get("config")
res = await registry_async_retry_client_auth.get("config", expected_response_code=401)
assert res.status_code == 401

res = await registry_async_client_auth.put("config", json={"compatibility": "NONE"})
res = await registry_async_retry_client_auth.put("config", json={"compatibility": "NONE"}, expected_response_code=401)
assert res.status_code == 401

res = await registry_async_client_auth.get("schemas/ids/1/versions")
res = await registry_async_retry_client_auth.get("schemas/ids/1/versions", expected_response_code=401)
assert res.status_code == 401

# This is an exception that does not require authorization
res = await registry_async_client_auth.get("schemas/types")
res = await registry_async_retry_client_auth.get("schemas/types")
assert res.status_code == 200

# but let's verify it answers normally if sending authorization header
res = await registry_async_client_auth.get("schemas/types", auth=admin)
res = await registry_async_retry_client_auth.get("schemas/types", auth=admin)
assert res.status_code == 200

res = await registry_async_client_auth.post(f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json})
res = await registry_async_retry_client_auth.post(
f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, expected_response_code=401
)
assert res.status_code == 401

res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}/versions/1")
res = await registry_async_retry_client_auth.delete(f"subjects/{quote(subject)}/versions/1", expected_response_code=401)
assert res.status_code == 401

res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/1/schema")
res = await registry_async_retry_client_auth.get(
f"subjects/{quote(subject)}/versions/1/schema", expected_response_code=401
)
assert res.status_code == 401

res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/1/referencedby")
res = await registry_async_retry_client_auth.get(
f"subjects/{quote(subject)}/versions/1/referencedby", expected_response_code=401
)
assert res.status_code == 401

res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}")
res = await registry_async_retry_client_auth.delete(f"subjects/{quote(subject)}", expected_response_code=401)
assert res.status_code == 401

res = await registry_async_client_auth.get("mode")
res = await registry_async_retry_client_auth.get("mode", expected_response_code=401)
assert res.status_code == 401

res = await registry_async_client_auth.get(f"mode/{quote(subject)}")
res = await registry_async_retry_client_auth.get(f"mode/{quote(subject)}", expected_response_code=401)
assert res.status_code == 401


async def test_sr_list_subjects(registry_async_client_auth: Client) -> None:
async def test_sr_list_subjects(registry_async_retry_client_auth: RetryRestClient) -> None:
cavesubject = new_random_name("cave-")
carpetsubject = new_random_name("carpet-")

res = await registry_async_client_auth.post(
res = await registry_async_retry_client_auth.post(
f"subjects/{quote(cavesubject)}/versions", json={"schema": schema_avro_json}, auth=aladdin
)
assert res.status_code == 200
sc_id = res.json()["id"]
assert sc_id >= 0

res = await registry_async_client_auth.post(
res = await registry_async_retry_client_auth.post(
f"subjects/{quote(carpetsubject)}/versions", json={"schema": schema_avro_json}, auth=admin
)
assert res.status_code == 200

res = await registry_async_client_auth.get("subjects", auth=admin)
res = await registry_async_retry_client_auth.get("subjects", auth=admin)
assert res.status_code == 200
assert [cavesubject, carpetsubject] == res.json()

res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions")
res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", expected_response_code=401)
assert res.status_code == 401

res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=admin)
res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=admin)
assert res.status_code == 200
assert [sc_id] == res.json()

res = await registry_async_client_auth.get("subjects", auth=aladdin)
res = await registry_async_retry_client_auth.get("subjects", auth=aladdin)
assert res.status_code == 200
assert [cavesubject] == res.json()

res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=aladdin)
res = await registry_async_retry_client_auth.get(
f"subjects/{quote(carpetsubject)}/versions", auth=aladdin, expected_response_code=403
)
assert res.status_code == 403

res = await registry_async_client_auth.get("subjects", auth=reader)
res = await registry_async_retry_client_auth.get("subjects", auth=reader)
assert res.status_code == 200
assert [carpetsubject] == res.json()

res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=reader)
res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=reader)
assert res.status_code == 200
assert [1] == res.json()


async def test_sr_ids(registry_async_client_auth: Client) -> None:
async def test_sr_ids(registry_async_retry_client_auth: RetryRestClient) -> None:
cavesubject = new_random_name("cave-")
carpetsubject = new_random_name("carpet-")

res = await registry_async_client_auth.post(
res = await registry_async_retry_client_auth.post(
f"subjects/{quote(cavesubject)}/versions", json={"schema": schema_avro_json}, auth=aladdin
)
assert res.status_code == 200
avro_sc_id = res.json()["id"]
assert avro_sc_id >= 0

res = await registry_async_client_auth.post(
res = await registry_async_retry_client_auth.post(
f"subjects/{quote(carpetsubject)}/versions",
json={"schemaType": "JSON", "schema": schema_jsonschema_json},
auth=admin,
Expand All @@ -170,30 +188,34 @@ async def test_sr_ids(registry_async_client_auth: Client) -> None:
jsonschema_sc_id = res.json()["id"]
assert jsonschema_sc_id >= 0

res = await registry_async_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=aladdin)
res = await registry_async_retry_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=aladdin)
assert res.status_code == 200

res = await registry_async_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=aladdin)
res = await registry_async_retry_client_auth.get(
f"schemas/ids/{jsonschema_sc_id}", auth=aladdin, expected_response_code=404
)
assert res.status_code == 404
assert {"error_code": 40403, "message": "Schema not found"} == res.json()

res = await registry_async_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=reader)
res = await registry_async_retry_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=reader, expected_response_code=404)
assert res.status_code == 404
assert {"error_code": 40403, "message": "Schema not found"} == res.json()

res = await registry_async_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=reader)
res = await registry_async_retry_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=reader)
assert res.status_code == 200


async def test_sr_auth_forwarding(registry_async_auth_pair: List[str]) -> None:
async def test_sr_auth_forwarding(
registry_async_auth_pair: List[str], registry_async_retry_client_auth: RetryRestClient
) -> None:
auth = requests.auth.HTTPBasicAuth("admin", "admin")

# Test primary/replica forwarding with global config setting
primary_url, replica_url = registry_async_auth_pair
max_tries, counter = 5, 0
wait_time = 0.5
for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]:
resp = requests.put(f"{replica_url}/config", json={"compatibility": compat}, auth=auth)
resp = await registry_async_retry_client_auth.put(f"{replica_url}/config", json={"compatibility": compat}, auth=auth)
assert resp.ok
while True:
assert counter < max_tries, "Compat update not propagated"
Expand All @@ -213,7 +235,9 @@ async def test_sr_auth_forwarding(registry_async_auth_pair: List[str]) -> None:


# Test that Kafka REST API works when configured with Schema Registry requiring authorization
async def test_rest_api_with_sr_auth(rest_async_client_registry_auth: Client, admin_client: KafkaAdminClient) -> None:
async def test_rest_api_with_sr_auth(
rest_async_client_registry_auth: RetryRestClient, admin_client: KafkaAdminClient
) -> None:
client = rest_async_client_registry_auth

topic = new_topic(admin_client, prefix="cave-rest-")
Expand Down
Loading

0 comments on commit 9051154

Please sign in to comment.