From 0b8a2c616a280031257a5112668e330b04c811a3 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Wed, 27 Nov 2024 11:56:48 +0100 Subject: [PATCH] feat: run unit-tests in the cli docker container - we replace use the karapace container to load the default config --- GNUmakefile | 11 ++- tests/conftest.py | 15 ++++ tests/unit/backup/test_api.py | 52 +++++++----- .../test_rest_proxy_cluster_metadata_cache.py | 71 +++++++++------- tests/unit/protobuf/test_protoc.py | 6 +- tests/unit/test_authentication.py | 35 +++++--- tests/unit/test_config.py | 28 +++---- tests/unit/test_in_memory_database.py | 6 +- tests/unit/test_kafka_error_handler.py | 13 +-- tests/unit/test_protobuf_serialization.py | 36 ++++----- tests/unit/test_rapu.py | 12 +-- tests/unit/test_rest_auth.py | 14 ++-- tests/unit/test_schema_reader.py | 39 +++++---- tests/unit/test_schema_registry_api.py | 81 +++++++++++-------- tests/unit/test_serialization.py | 38 ++++----- 15 files changed, 270 insertions(+), 187 deletions(-) diff --git a/GNUmakefile b/GNUmakefile index 88c14daa9..b8305d65a 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -3,7 +3,9 @@ SHELL := /usr/bin/env bash VENV_DIR ?= $(CURDIR)/venv PIP ?= pip3 --disable-pip-version-check --no-input --require-virtualenv PYTHON ?= python3 -PYTHON_VERSION ?= 3.9 +PYTHON_VERSION ?= 3.10 +DOCKER_COMPOSE ?= docker compose +KARAPACE-CLI ?= $(DOCKER_COMPOSE) -f container/compose.yml run karapace-cli define PIN_VERSIONS_COMMAND pip install pip-tools && \ @@ -102,3 +104,10 @@ schema: .PHONY: pin-requirements pin-requirements: docker run -e CUSTOM_COMPILE_COMMAND='make pin-requirements' -it -v .:/karapace --security-opt label=disable python:$(PYTHON_VERSION)-bullseye /bin/bash -c "$(PIN_VERSIONS_COMMAND)" + +.PHONY: unit-tests-in-docker +unit-tests-in-docker: export PYTEST_ARGS ?= +unit-tests-in-docker: + rm -fr runtime/* + $(KARAPACE-CLI) $(PYTHON) -m pytest -s -vvv $(PYTEST_ARGS) tests/unit/ + rm -fr runtime/* diff --git a/tests/conftest.py b/tests/conftest.py index d62663633..91fb0b02d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,7 +3,10 @@ See LICENSE for details """ from avro.compatibility import SchemaCompatibilityResult +from karapace.config import KARAPACE_BASE_CONFIG_YAML_PATH +from karapace.container import KarapaceContainer from pathlib import Path +from schema_registry.container import SchemaRegistryContainer from tempfile import mkstemp from typing import Optional @@ -179,3 +182,15 @@ def fixture_tmp_file(): path = Path(str_path) yield path path.unlink() + + +@pytest.fixture(name="karapace_container", scope="session") +def fixture_karapace_container() -> KarapaceContainer: + container = KarapaceContainer() + container.base_config.from_yaml(KARAPACE_BASE_CONFIG_YAML_PATH, envs_required=True, required=True) + return container + + +@pytest.fixture +def schema_registry_container(karapace_container: KarapaceContainer) -> SchemaRegistryContainer: + return SchemaRegistryContainer(karapace_container=karapace_container) diff --git a/tests/unit/backup/test_api.py b/tests/unit/backup/test_api.py index 983beb786..3df4d028d 100644 --- a/tests/unit/backup/test_api.py +++ b/tests/unit/backup/test_api.py @@ -5,7 +5,6 @@ from __future__ import annotations from aiokafka.errors import KafkaError, TopicAlreadyExistsError -from karapace import config from karapace.backup.api import ( _admin, _consumer, @@ -22,6 +21,7 @@ from karapace.backup.errors import BackupError, PartitionCountError from karapace.config import Config from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.container import KarapaceContainer from karapace.kafka.consumer import KafkaConsumer, PartitionMetadata from karapace.kafka.producer import KafkaProducer from pathlib import Path @@ -41,10 +41,12 @@ class TestAdmin: @mock.patch("time.sleep", autospec=True) @patch_admin_new - def test_retries_on_kafka_error(self, admin_new: MagicMock, sleep_mock: MagicMock) -> None: + def test_retries_on_kafka_error( + self, admin_new: MagicMock, sleep_mock: MagicMock, karapace_container: KarapaceContainer + ) -> None: admin_mock = admin_new.return_value admin_new.side_effect = [KafkaError("1"), KafkaError("2"), admin_mock] - with _admin(config.DEFAULTS) as admin: + with _admin(karapace_container.config()) as admin: assert admin is admin_mock assert sleep_mock.call_count == 2 # proof that we waited between retries @@ -56,41 +58,48 @@ def test_reraises_unknown_exceptions( admin_new: MagicMock, sleep_mock: MagicMock, e: type[BaseException], + karapace_container: KarapaceContainer, ) -> None: admin_new.side_effect = e - with pytest.raises(e), _admin(config.DEFAULTS): + with pytest.raises(e), _admin(karapace_container.config()): pass assert sleep_mock.call_count == 0 # proof that we did not retry class TestHandleRestoreTopic: @patch_admin_new - def test_calls_admin_create_topics(self, admin_new: MagicMock) -> None: + def test_calls_admin_create_topics(self, admin_new: MagicMock, karapace_container: KarapaceContainer) -> None: new_topic: MagicMock = admin_new.return_value.new_topic topic_configs = {"cleanup.policy": "compact"} - _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs=topic_configs) + _maybe_create_topic( + DEFAULT_SCHEMA_TOPIC, config=karapace_container.config(), replication_factor=1, topic_configs=topic_configs + ) new_topic.assert_called_once_with( DEFAULT_SCHEMA_TOPIC, num_partitions=1, - replication_factor=config.DEFAULTS["replication_factor"], + replication_factor=karapace_container.config().replication_factor, config=topic_configs, ) @patch_admin_new - def test_gracefully_handles_topic_already_exists_error(self, admin_new: MagicMock) -> None: + def test_gracefully_handles_topic_already_exists_error( + self, admin_new: MagicMock, karapace_container: KarapaceContainer + ) -> None: new_topic: MagicMock = admin_new.return_value.new_topic new_topic.side_effect = TopicAlreadyExistsError() - _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs={}) + _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=karapace_container.config(), replication_factor=1, topic_configs={}) new_topic.assert_called_once() @patch_admin_new - def test_retries_for_kafka_errors(self, admin_new: MagicMock) -> None: + def test_retries_for_kafka_errors(self, admin_new: MagicMock, karapace_container: KarapaceContainer) -> None: new_topic: MagicMock = admin_new.return_value.new_topic new_topic.side_effect = [KafkaError("1"), KafkaError("2"), None] with mock.patch("time.sleep", autospec=True): - _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs={}) + _maybe_create_topic( + DEFAULT_SCHEMA_TOPIC, config=karapace_container.config(), replication_factor=1, topic_configs={} + ) assert new_topic.call_count == 3 @@ -98,17 +107,19 @@ def test_retries_for_kafka_errors(self, admin_new: MagicMock) -> None: def test_noop_for_custom_name_on_legacy_versions( self, admin_new: MagicMock, + karapace_container: KarapaceContainer, ) -> None: new_topic: MagicMock = admin_new.return_value.new_topic assert "custom-name" != DEFAULT_SCHEMA_TOPIC instruction = RestoreTopicLegacy(topic_name="custom-name", partition_count=1) - _handle_restore_topic_legacy(instruction, config.DEFAULTS) + _handle_restore_topic_legacy(instruction, karapace_container.config()) new_topic.assert_not_called() @patch_admin_new def test_allows_custom_name_on_v3( self, admin_new: MagicMock, + karapace_container: KarapaceContainer, ) -> None: new_topic: MagicMock = admin_new.return_value.new_topic topic_name = "custom-name" @@ -117,7 +128,7 @@ def test_allows_custom_name_on_v3( instruction = RestoreTopic( topic_name="custom-name", partition_count=1, replication_factor=2, topic_configs=topic_configs ) - _handle_restore_topic(instruction, config.DEFAULTS) + _handle_restore_topic(instruction, karapace_container.config()) new_topic.assert_called_once_with(topic_name, num_partitions=1, replication_factor=2, config=topic_configs) @@ -125,11 +136,12 @@ def test_allows_custom_name_on_v3( def test_skip_topic_creation( self, admin_new: MagicMock, + karapace_container: KarapaceContainer, ) -> None: new_topic: MagicMock = admin_new.return_value.new_topic _handle_restore_topic( RestoreTopic(topic_name="custom-name", partition_count=1, replication_factor=2, topic_configs={}), - config.DEFAULTS, + karapace_container.config(), skip_topic_creation=True, ) _handle_restore_topic_legacy( @@ -137,7 +149,7 @@ def test_skip_topic_creation( topic_name="custom-name", partition_count=1, ), - config.DEFAULTS, + karapace_container.config(), skip_topic_creation=True, ) @@ -171,11 +183,12 @@ def test_auto_closing( client_class: type[KafkaConsumer | KafkaProducer], partitions_method: FunctionType, close_method_name: str, + karapace_container: KarapaceContainer, ) -> None: with mock.patch(f"{client_class.__module__}.{client_class.__qualname__}.__new__", autospec=True) as client_ctor: client_mock = client_ctor.return_value getattr(client_mock, partitions_method.__name__).return_value = self._partition_metadata() - with ctx_mng(config.DEFAULTS, "topic") as client: + with ctx_mng(karapace_container.config(), "topic") as client: assert client is client_mock assert getattr(client_mock, close_method_name).call_count == 1 @@ -194,12 +207,13 @@ def test_raises_partition_count_error_for_unexpected_count( partitions_method: FunctionType, partition_count: int, close_method_name: str, + karapace_container: KarapaceContainer, ) -> None: with mock.patch(f"{client_class.__module__}.{client_class.__qualname__}.__new__", autospec=True) as client_ctor: client_mock = client_ctor.return_value getattr(client_mock, partitions_method.__name__).return_value = self._partition_metadata(partition_count) with pytest.raises(PartitionCountError): - with ctx_mng(config.DEFAULTS, "topic") as client: + with ctx_mng(karapace_container.config(), "topic") as client: assert client == client_mock assert getattr(client_mock, close_method_name).call_count == 1 @@ -271,6 +285,6 @@ def test_returns_option_if_given(self) -> None: fake_config = cast(Config, {}) assert normalize_topic_name("some-topic", fake_config) == "some-topic" - def test_defaults_to_config(self) -> None: - fake_config = cast(Config, {"topic_name": "default-topic"}) + def test_defaults_to_config(self, karapace_container: KarapaceContainer) -> None: + fake_config = karapace_container.config().set_config_defaults({"topic_name": "default-topic"}) assert normalize_topic_name(None, fake_config) == "default-topic" diff --git a/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py b/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py index b47fb5e02..d1227fbc2 100644 --- a/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py +++ b/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py @@ -3,7 +3,8 @@ Copyright (c) 2024 Aiven Ltd See LICENSE for details """ -from karapace.config import DEFAULTS + +from karapace.container import KarapaceContainer from karapace.kafka_rest_apis import UserRestProxy from karapace.serialization import SchemaRegistrySerializer from unittest.mock import patch @@ -11,10 +12,10 @@ import copy -def user_rest_proxy(max_age_metadata: int = 5) -> UserRestProxy: - configs = {**DEFAULTS, **{"admin_metadata_max_age": max_age_metadata}} - serializer = SchemaRegistrySerializer(configs) - return UserRestProxy(configs, 1, serializer, auth_expiry=None, verify_connection=False) +def user_rest_proxy(karapace_container: KarapaceContainer, max_age_metadata: int = 5) -> UserRestProxy: + config = karapace_container.config().set_config_defaults({"admin_metadata_max_age": max_age_metadata}) + serializer = SchemaRegistrySerializer(config=config) + return UserRestProxy(config, 1, serializer, auth_expiry=None, verify_connection=False) EMPTY_REPLY = { @@ -158,8 +159,8 @@ def user_rest_proxy(max_age_metadata: int = 5) -> UserRestProxy: } -async def test_cache_is_evicted_after_expiration_global_initially() -> None: - proxy = user_rest_proxy() +async def test_cache_is_evicted_after_expiration_global_initially(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container) with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=EMPTY_REPLY ) as mocked_cluster_metadata: @@ -167,8 +168,8 @@ async def test_cache_is_evicted_after_expiration_global_initially() -> None: mocked_cluster_metadata.assert_called_once_with(None) # "initially the metadata are always old" -async def test_no_topic_means_all_metadata() -> None: - proxy = user_rest_proxy() +async def test_no_topic_means_all_metadata(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container) with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=EMPTY_REPLY ) as mocked_cluster_metadata: @@ -176,8 +177,8 @@ async def test_no_topic_means_all_metadata() -> None: mocked_cluster_metadata.assert_called_once_with(None) -async def test_cache_is_evicted_after_expiration_global() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_cache_is_evicted_after_expiration_global(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=EMPTY_REPLY @@ -187,8 +188,8 @@ async def test_cache_is_evicted_after_expiration_global() -> None: mocked_cluster_metadata.assert_called_once_with(None) # "metadata old require a refresh" -async def test_global_cache_is_used_for_single_topic() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_global_cache_is_used_for_single_topic(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=ALL_TOPIC_REQUEST @@ -214,8 +215,8 @@ async def test_global_cache_is_used_for_single_topic() -> None: ), "the result should still be cached since we marked it as ready at time 11 and we are at 14" -async def test_cache_is_evicted_if_one_topic_is_expired() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_cache_is_evicted_if_one_topic_is_expired(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=ALL_TOPIC_REQUEST @@ -234,8 +235,8 @@ async def test_cache_is_evicted_if_one_topic_is_expired() -> None: assert mocked_cluster_metadata.call_count == 1, "topic_b should be evicted" -async def test_cache_is_evicted_if_a_topic_was_never_queries() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_cache_is_evicted_if_a_topic_was_never_queries(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=ALL_TOPIC_REQUEST @@ -254,8 +255,8 @@ async def test_cache_is_evicted_if_a_topic_was_never_queries() -> None: assert mocked_cluster_metadata.call_count == 1, "topic_b is not present in the cache, should call the refresh" -async def test_cache_is_used_if_topic_requested_is_updated() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_cache_is_used_if_topic_requested_is_updated(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST @@ -272,8 +273,8 @@ async def test_cache_is_used_if_topic_requested_is_updated() -> None: assert mocked_cluster_metadata.call_count == 0, "topic_a cache its present, should be used" -async def test_update_global_cache() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_global_cache(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST @@ -292,8 +293,8 @@ async def test_update_global_cache() -> None: assert mocked_cluster_metadata.call_count == 0, "should call the server since the cache its expired" -async def test_update_topic_cache_do_not_evict_all_the_global_cache() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_topic_cache_do_not_evict_all_the_global_cache(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 proxy._cluster_metadata = ALL_TOPIC_REQUEST proxy._cluster_metadata_topic_birth = {"topic_a": 0, "topic_b": 200, "__consumer_offsets": 200} @@ -317,8 +318,10 @@ async def test_update_topic_cache_do_not_evict_all_the_global_cache() -> None: ), "we should call the server since the previous time of caching for the topic_a was 0" -async def test_update_local_cache_does_not_evict_all_the_global_cache_if_no_new_data() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_local_cache_does_not_evict_all_the_global_cache_if_no_new_data( + karapace_container: KarapaceContainer, +) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 proxy._cluster_metadata_complete = True proxy._cluster_metadata = ALL_TOPIC_REQUEST @@ -346,8 +349,10 @@ async def test_update_local_cache_does_not_evict_all_the_global_cache_if_no_new_ ), "we should call the server since the previous time of caching for the topic_a was 0" -async def test_update_local_cache_not_evict_all_the_global_cache_if_changed_replica_data() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_local_cache_not_evict_all_the_global_cache_if_changed_replica_data( + karapace_container: KarapaceContainer, +) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 proxy._cluster_metadata_complete = True proxy._cluster_metadata = ALL_TOPIC_REQUEST @@ -360,8 +365,10 @@ async def test_update_local_cache_not_evict_all_the_global_cache_if_changed_repl assert not proxy._cluster_metadata_complete, "new replica data incoming, should update the global metadata next!" -async def test_update_local_cache_not_evict_all_the_global_cache_if_new_topic_data() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_local_cache_not_evict_all_the_global_cache_if_new_topic_data( + karapace_container: KarapaceContainer, +) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 proxy._cluster_metadata_complete = True proxy._cluster_metadata = ALL_TOPIC_REQUEST @@ -374,8 +381,10 @@ async def test_update_local_cache_not_evict_all_the_global_cache_if_new_topic_da assert not proxy._cluster_metadata_complete, "new topic data incoming, should update the global metadata next!" -async def test_update_local_cache_not_evict_all_the_global_cache_if_new_broker_data() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_local_cache_not_evict_all_the_global_cache_if_new_broker_data( + karapace_container: KarapaceContainer, +) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 proxy._cluster_metadata_complete = True proxy._cluster_metadata = ALL_TOPIC_REQUEST diff --git a/tests/unit/protobuf/test_protoc.py b/tests/unit/protobuf/test_protoc.py index f044f1abe..d61648d9e 100644 --- a/tests/unit/protobuf/test_protoc.py +++ b/tests/unit/protobuf/test_protoc.py @@ -2,7 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace import config +from karapace.container import KarapaceContainer from karapace.protobuf.io import calculate_class_name from karapace.protobuf.kotlin_wrapper import trim_margin @@ -14,7 +14,7 @@ log = logging.getLogger(__name__) -def test_protoc() -> None: +def test_protoc(karapace_container: KarapaceContainer) -> None: proto: str = """ |syntax = "proto3"; |package com.instaclustr.protobuf; @@ -28,7 +28,7 @@ def test_protoc() -> None: """ proto = trim_margin(proto) - directory = config.DEFAULTS["protobuf_runtime_directory"] + directory = karapace_container.config().protobuf_runtime_directory proto_name = calculate_class_name(str(proto)) proto_path = f"{directory}/{proto_name}.proto" class_path = f"{directory}/{proto_name}_pb2.py" diff --git a/tests/unit/test_authentication.py b/tests/unit/test_authentication.py index 40abc5c01..9834865fb 100644 --- a/tests/unit/test_authentication.py +++ b/tests/unit/test_authentication.py @@ -4,8 +4,9 @@ """ from __future__ import annotations +from collections.abc import Mapping from http import HTTPStatus -from karapace.config import ConfigDefaults, set_config_defaults +from karapace.container import KarapaceContainer from karapace.kafka_rest_apis.authentication import ( get_auth_config_from_header, get_expiration_time_from_header, @@ -13,6 +14,7 @@ SimpleOauthTokenProvider, ) from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE +from typing import Any import base64 import datetime @@ -31,11 +33,11 @@ def _assert_unauthorized_http_response(http_response: HTTPResponse) -> None: "auth_header", (None, "Digest foo=bar"), ) -def test_get_auth_config_from_header_raises_unauthorized_on_invalid_header(auth_header: str | None) -> None: - config = set_config_defaults({}) - +def test_get_auth_config_from_header_raises_unauthorized_on_invalid_header( + karapace_container: KarapaceContainer, auth_header: str | None +) -> None: with pytest.raises(HTTPResponse) as exc_info: - get_auth_config_from_header(auth_header, config) + get_auth_config_from_header(auth_header, karapace_container.config()) _assert_unauthorized_http_response(exc_info.value) @@ -66,9 +68,12 @@ def test_get_auth_config_from_header_raises_unauthorized_on_invalid_header(auth_ ), ) def test_get_auth_config_from_header( - auth_header: str, config_override: ConfigDefaults, expected_auth_config: ConfigDefaults + karapace_container: KarapaceContainer, + auth_header: str, + config_override: Mapping[str, Any], + expected_auth_config: Mapping[str, Any], ) -> None: - config = set_config_defaults(config_override) + config = karapace_container.config().set_config_defaults(new_config=config_override) auth_config = get_auth_config_from_header(auth_header, config) assert auth_config == expected_auth_config @@ -109,9 +114,11 @@ def test_simple_oauth_token_provider_returns_configured_token_and_expiry() -> No assert token_provider.token_with_expiry() == (token, expiry_timestamp) -def test_get_client_auth_parameters_from_config_sasl_plain() -> None: - config = set_config_defaults( - {"sasl_mechanism": "PLAIN", "sasl_plain_username": "username", "sasl_plain_password": "password"} +def test_get_client_auth_parameters_from_config_sasl_plain( + karapace_container: KarapaceContainer, +) -> None: + config = karapace_container.config().set_config_defaults( + new_config={"sasl_mechanism": "PLAIN", "sasl_plain_username": "username", "sasl_plain_password": "password"}, ) client_auth_params = get_kafka_client_auth_parameters_from_config(config) @@ -123,10 +130,14 @@ def test_get_client_auth_parameters_from_config_sasl_plain() -> None: } -def test_get_client_auth_parameters_from_config_oauth() -> None: +def test_get_client_auth_parameters_from_config_oauth( + karapace_container: KarapaceContainer, +) -> None: expiry_timestamp = 1697013997 token = jwt.encode({"exp": expiry_timestamp}, "secret") - config = set_config_defaults({"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": token}) + config = karapace_container.config().set_config_defaults( + new_config={"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": token} + ) client_auth_params = get_kafka_client_auth_parameters_from_config(config) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index b8475e1c6..79ce7da78 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -4,55 +4,55 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace.config import set_config_defaults from karapace.constants import DEFAULT_AIOHTTP_CLIENT_MAX_SIZE, DEFAULT_PRODUCER_MAX_REQUEST +from karapace.container import KarapaceContainer -def test_http_request_max_size() -> None: - config = set_config_defaults( +def test_http_request_max_size(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( { "karapace_rest": False, "producer_max_request_size": DEFAULT_PRODUCER_MAX_REQUEST + 1024, } ) - assert config["http_request_max_size"] == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE + assert config.http_request_max_size == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE - config = set_config_defaults( + config = karapace_container.config().set_config_defaults( { "karapace_rest": False, "http_request_max_size": 1024, } ) - assert config["http_request_max_size"] == 1024 + assert config.http_request_max_size == 1024 - config = set_config_defaults( + config = karapace_container.config().set_config_defaults( { "karapace_rest": True, } ) - assert config["http_request_max_size"] == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE + assert config.http_request_max_size == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE - config = set_config_defaults( + config = karapace_container.config().set_config_defaults( { "karapace_rest": True, "producer_max_request_size": 1024, } ) - assert config["http_request_max_size"] == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE + assert config.http_request_max_size == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE - config = set_config_defaults( + config = karapace_container.config().set_config_defaults( { "karapace_rest": True, "producer_max_request_size": DEFAULT_PRODUCER_MAX_REQUEST + 1024, } ) - assert config["http_request_max_size"] == DEFAULT_PRODUCER_MAX_REQUEST + 1024 + DEFAULT_AIOHTTP_CLIENT_MAX_SIZE + assert config.http_request_max_size == DEFAULT_PRODUCER_MAX_REQUEST + 1024 + DEFAULT_AIOHTTP_CLIENT_MAX_SIZE - config = set_config_defaults( + config = karapace_container.config().set_config_defaults( { "karapace_rest": True, "producer_max_request_size": DEFAULT_PRODUCER_MAX_REQUEST + 1024, "http_request_max_size": 1024, } ) - assert config["http_request_max_size"] == 1024 + assert config.http_request_max_size == 1024 diff --git a/tests/unit/test_in_memory_database.py b/tests/unit/test_in_memory_database.py index a3720940d..2a0156567 100644 --- a/tests/unit/test_in_memory_database.py +++ b/tests/unit/test_in_memory_database.py @@ -7,8 +7,8 @@ from collections import defaultdict from collections.abc import Iterable, Sequence from confluent_kafka.cimpl import KafkaError -from karapace.config import DEFAULTS from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.container import KarapaceContainer from karapace.in_memory_database import InMemoryDatabase, KarapaceDatabase, Subject, SubjectData from karapace.kafka.types import Timestamp from karapace.key_format import KeyFormatter @@ -214,7 +214,7 @@ def compute_schema_id_to_subjects( return schema_id_to_duplicated_subjects -def test_can_ingest_schemas_from_log() -> None: +def test_can_ingest_schemas_from_log(karapace_container: KarapaceContainer) -> None: """ Test for the consistency of a backup, this checks that each SchemaID its unique in the backup. The format of the log its the one obtained by running: @@ -228,7 +228,7 @@ def test_can_ingest_schemas_from_log() -> None: database = WrappedInMemoryDatabase() schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=OffsetWatcher(), key_formatter=KeyFormatter(), master_coordinator=None, diff --git a/tests/unit/test_kafka_error_handler.py b/tests/unit/test_kafka_error_handler.py index 45e9fea1b..183205137 100644 --- a/tests/unit/test_kafka_error_handler.py +++ b/tests/unit/test_kafka_error_handler.py @@ -3,6 +3,7 @@ See LICENSE for details """ from _pytest.logging import LogCaptureFixture +from karapace.container import KarapaceContainer from karapace.errors import CorruptKafkaRecordException from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation @@ -12,11 +13,13 @@ @pytest.fixture(name="kafka_error_handler") -def fixture_kafka_error_handler() -> KafkaErrorHandler: - config = { - "kafka_schema_reader_strict_mode": False, - "kafka_retriable_errors_silenced": True, - } +def fixture_kafka_error_handler(karapace_container: KarapaceContainer) -> KafkaErrorHandler: + config = karapace_container.config().set_config_defaults( + { + "kafka_schema_reader_strict_mode": False, + "kafka_retriable_errors_silenced": True, + } + ) return KafkaErrorHandler(config=config) diff --git a/tests/unit/test_protobuf_serialization.py b/tests/unit/test_protobuf_serialization.py index ee2586d63..1cb013538 100644 --- a/tests/unit/test_protobuf_serialization.py +++ b/tests/unit/test_protobuf_serialization.py @@ -2,7 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace.config import read_config +from karapace.container import KarapaceContainer from karapace.dependency import Dependency from karapace.protobuf.kotlin_wrapper import trim_margin from karapace.schema_models import ParsedTypedSchema, SchemaType, Versioner @@ -11,11 +11,11 @@ InvalidMessageHeader, InvalidMessageSchema, InvalidPayload, + SchemaRegistryClient, SchemaRegistrySerializer, START_BYTE, ) from karapace.typing import Subject -from pathlib import Path from tests.utils import schema_protobuf, test_fail_objects_protobuf, test_objects_protobuf from unittest.mock import call, Mock @@ -27,16 +27,16 @@ log = logging.getLogger(__name__) -async def make_ser_deser(config_path: str, mock_client) -> SchemaRegistrySerializer: - with open(config_path, encoding="utf8") as handler: - config = read_config(handler) - serializer = SchemaRegistrySerializer(config=config) +async def make_ser_deser( + karapace_container: KarapaceContainer, mock_client: SchemaRegistryClient +) -> SchemaRegistrySerializer: + serializer = SchemaRegistrySerializer(config=karapace_container.config()) await serializer.registry_client.close() serializer.registry_client = mock_client return serializer -async def test_happy_flow(default_config_path: Path): +async def test_happy_flow(karapace_container: KarapaceContainer): mock_protobuf_registry_client = Mock() schema_for_id_one_future = asyncio.Future() schema_for_id_one_future.set_result( @@ -49,7 +49,7 @@ async def test_happy_flow(default_config_path: Path): ) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future - serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + serializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) assert len(serializer.ids_to_schemas) == 0 schema = await serializer.get_schema_for_subject("top") for o in test_objects_protobuf: @@ -62,7 +62,7 @@ async def test_happy_flow(default_config_path: Path): assert mock_protobuf_registry_client.method_calls == [call.get_schema("top"), call.get_schema_for_id(1)] -async def test_happy_flow_references(default_config_path: Path): +async def test_happy_flow_references(karapace_container: KarapaceContainer): no_ref_schema_str = """ |syntax = "proto3"; | @@ -117,7 +117,7 @@ async def test_happy_flow_references(default_config_path: Path): get_latest_schema_future.set_result((1, ref_schema, Versioner.V(1))) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future - serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + serializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) assert len(serializer.ids_to_schemas) == 0 schema = await serializer.get_schema_for_subject("top") for o in test_objects: @@ -130,7 +130,7 @@ async def test_happy_flow_references(default_config_path: Path): assert mock_protobuf_registry_client.method_calls == [call.get_schema("top"), call.get_schema_for_id(1)] -async def test_happy_flow_references_two(default_config_path: Path): +async def test_happy_flow_references_two(karapace_container: KarapaceContainer): no_ref_schema_str = """ |syntax = "proto3"; | @@ -204,7 +204,7 @@ async def test_happy_flow_references_two(default_config_path: Path): get_latest_schema_future.set_result((1, ref_schema_two, Versioner.V(1))) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future - serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + serializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) assert len(serializer.ids_to_schemas) == 0 schema = await serializer.get_schema_for_subject("top") for o in test_objects: @@ -217,7 +217,7 @@ async def test_happy_flow_references_two(default_config_path: Path): assert mock_protobuf_registry_client.method_calls == [call.get_schema("top"), call.get_schema_for_id(1)] -async def test_serialization_fails(default_config_path: Path): +async def test_serialization_fails(karapace_container: KarapaceContainer): mock_protobuf_registry_client = Mock() get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result( @@ -225,7 +225,7 @@ async def test_serialization_fails(default_config_path: Path): ) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future - serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + serializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) with pytest.raises(InvalidMessageSchema): schema = await serializer.get_schema_for_subject("top") await serializer.serialize(schema, test_fail_objects_protobuf[0]) @@ -240,10 +240,10 @@ async def test_serialization_fails(default_config_path: Path): assert mock_protobuf_registry_client.method_calls == [call.get_schema("top")] -async def test_deserialization_fails(default_config_path: Path): +async def test_deserialization_fails(karapace_container: KarapaceContainer): mock_protobuf_registry_client = Mock() - deserializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + deserializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) invalid_header_payload = struct.pack(">bII", 1, 500, 500) with pytest.raises(InvalidMessageHeader): await deserializer.deserialize(invalid_header_payload) @@ -259,10 +259,10 @@ async def test_deserialization_fails(default_config_path: Path): assert mock_protobuf_registry_client.method_calls == [call.get_schema_for_id(500)] -async def test_deserialization_fails2(default_config_path: Path): +async def test_deserialization_fails2(karapace_container: KarapaceContainer): mock_protobuf_registry_client = Mock() - deserializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + deserializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) invalid_header_payload = struct.pack(">bII", 1, 500, 500) with pytest.raises(InvalidMessageHeader): await deserializer.deserialize(invalid_header_payload) diff --git a/tests/unit/test_rapu.py b/tests/unit/test_rapu.py index cde68e2be..ba5c77e8c 100644 --- a/tests/unit/test_rapu.py +++ b/tests/unit/test_rapu.py @@ -5,7 +5,7 @@ from _pytest.logging import LogCaptureFixture from aiohttp.client_exceptions import ClientConnectionError from aiohttp.web import Request -from karapace.config import DEFAULTS +from karapace.container import KarapaceContainer from karapace.karapace import KarapaceBase from karapace.rapu import HTTPRequest, REST_ACCEPT_RE, REST_CONTENT_TYPE_RE from karapace.statsd import StatsClient @@ -167,12 +167,14 @@ def test_content_type_re(): @pytest.mark.parametrize("connection_error", (ConnectionError(), ClientConnectionError())) -async def test_raise_connection_error_handling(connection_error: BaseException) -> None: +async def test_raise_connection_error_handling( + karapace_container: KarapaceContainer, connection_error: BaseException +) -> None: request_mock = Mock(spec=Request) request_mock.read.side_effect = connection_error callback_mock = Mock() - app = KarapaceBase(config=DEFAULTS) + app = KarapaceBase(config=karapace_container.config()) response = await app._handle_request( # pylint: disable=protected-access request=request_mock, @@ -185,8 +187,8 @@ async def test_raise_connection_error_handling(connection_error: BaseException) callback_mock.assert_not_called() -async def test_close_by_app(caplog: LogCaptureFixture) -> None: - app = KarapaceBase(config=DEFAULTS) +async def test_close_by_app(caplog: LogCaptureFixture, karapace_container: KarapaceContainer) -> None: + app = KarapaceBase(config=karapace_container.config()) app.stats = Mock(spec=StatsClient) with caplog.at_level(logging.WARNING, logger="karapace.rapu"): diff --git a/tests/unit/test_rest_auth.py b/tests/unit/test_rest_auth.py index 86bb14b8a..ad2d54057 100644 --- a/tests/unit/test_rest_auth.py +++ b/tests/unit/test_rest_auth.py @@ -5,7 +5,7 @@ """ from __future__ import annotations -from karapace.config import set_config_defaults +from karapace.container import KarapaceContainer from karapace.kafka_rest_apis import AUTH_EXPIRY_TOLERANCE, KafkaRest, UserRestProxy from unittest.mock import call, Mock @@ -34,8 +34,8 @@ def _create_mock_proxy( return proxy -async def test_rest_proxy_janitor_expiring_credentials() -> None: - config = set_config_defaults( +async def test_rest_proxy_janitor_expiring_credentials(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( { "rest_authorization": True, "sasl_bootstrap_uri": "localhost:9094", @@ -92,8 +92,8 @@ async def test_rest_proxy_janitor_expiring_credentials() -> None: assert unused_proxy_expiring_later_than_tolerance.method_calls == [call.num_consumers(), call.aclose()] -async def test_rest_proxy_janitor_default() -> None: - config = set_config_defaults( +async def test_rest_proxy_janitor_default(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( { "rest_authorization": True, "sasl_bootstrap_uri": "localhost:9094", @@ -148,8 +148,8 @@ async def test_rest_proxy_janitor_default() -> None: assert active_proxy_with_consumers.method_calls == [call.num_consumers()] -async def test_rest_proxy_janitor_destructive() -> None: - config = set_config_defaults( +async def test_rest_proxy_janitor_destructive(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( { "rest_authorization": True, "sasl_bootstrap_uri": "localhost:9094", diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index d500c5b03..1134b6ae8 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -9,7 +9,7 @@ from concurrent.futures import Future, ThreadPoolExecutor from confluent_kafka import Message from dataclasses import dataclass -from karapace.config import DEFAULTS +from karapace.container import KarapaceContainer from karapace.errors import CorruptKafkaRecordException, ShutdownException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.consumer import KafkaConsumer @@ -154,7 +154,7 @@ class ReadinessTestCase(BaseTestCase): ), ], ) -def test_readiness_check(testcase: ReadinessTestCase) -> None: +def test_readiness_check(testcase: ReadinessTestCase, karapace_container: KarapaceContainer) -> None: key_formatter_mock = Mock() consumer_mock = Mock() consumer_mock.consume.return_value = [] @@ -163,7 +163,7 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None: offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=offset_watcher, key_formatter=key_formatter_mock, master_coordinator=None, @@ -176,7 +176,7 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None: assert schema_reader.ready() is testcase.expected -def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: +def test_num_max_messages_to_consume_moved_to_one_after_ready(karapace_container: KarapaceContainer) -> None: key_formatter_mock = Mock() consumer_mock = Mock() consumer_mock.consume.return_value = [] @@ -185,7 +185,7 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=offset_watcher, key_formatter=key_formatter_mock, master_coordinator=None, @@ -200,7 +200,9 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP -def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_schemas_topic() -> None: +def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_schemas_topic( + karapace_container: KarapaceContainer, +) -> None: key_formatter_mock = Mock(spec=KeyFormatter) consumer_mock = Mock(spec=KafkaConsumer) @@ -230,7 +232,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=offset_watcher, key_formatter=key_formatter_mock, master_coordinator=None, @@ -255,7 +257,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP -def test_soft_deleted_schema_storing() -> None: +def test_soft_deleted_schema_storing(karapace_container: KarapaceContainer) -> None: """This tests a case when _schemas has been compacted and only the soft deleted version of the schema is present. """ @@ -287,7 +289,7 @@ def test_soft_deleted_schema_storing() -> None: offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=offset_watcher, key_formatter=key_formatter_mock, master_coordinator=None, @@ -302,14 +304,14 @@ def test_soft_deleted_schema_storing() -> None: assert soft_deleted_stored_schema is not None -def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture) -> None: +def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture, karapace_container: KarapaceContainer) -> None: database_mock = Mock(spec=InMemoryDatabase) database_mock.find_subject.return_value = True database_mock.find_subject_schemas.return_value = { Version(1): "SchemaVersion" } # `SchemaVersion` is an actual object, simplified for test schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=OffsetWatcher(), key_formatter=KeyFormatter(), master_coordinator=None, @@ -376,7 +378,9 @@ class HealthCheckTestCase(BaseTestCase): ), ], ) -async def test_schema_reader_health_check(testcase: HealthCheckTestCase, monkeypatch: MonkeyPatch) -> None: +async def test_schema_reader_health_check( + testcase: HealthCheckTestCase, monkeypatch: MonkeyPatch, karapace_container: KarapaceContainer +) -> None: offset_watcher = OffsetWatcher() key_formatter_mock = Mock() admin_client_mock = Mock() @@ -386,10 +390,10 @@ async def test_schema_reader_health_check(testcase: HealthCheckTestCase, monkeyp emtpy_future.set_exception(testcase.check_topic_error) else: emtpy_future.set_result(None) - admin_client_mock.describe_topics.return_value = {DEFAULTS["topic_name"]: emtpy_future} + admin_client_mock.describe_topics.return_value = {karapace_container.config().topic_name: emtpy_future} schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=offset_watcher, key_formatter=key_formatter_mock, master_coordinator=None, @@ -415,7 +419,9 @@ class KafkaMessageHandlingErrorTestCase(BaseTestCase): @pytest.fixture(name="schema_reader_with_consumer_messages_factory") -def fixture_schema_reader_with_consumer_messages_factory() -> Callable[[tuple[list[Message]]], KafkaSchemaReader]: +def fixture_schema_reader_with_consumer_messages_factory( + karapace_container: KarapaceContainer, +) -> Callable[[tuple[list[Message]]], KafkaSchemaReader]: def factory(consumer_messages: tuple[list[Message]]) -> KafkaSchemaReader: key_formatter_mock = Mock(spec=KeyFormatter) consumer_mock = Mock(spec=KafkaConsumer) @@ -425,8 +431,7 @@ def factory(consumer_messages: tuple[list[Message]]) -> KafkaSchemaReader: consumer_mock.get_watermark_offsets.return_value = (0, 4) # Update the config to run the schema reader in strict mode so errors can be raised - config = DEFAULTS.copy() - config["kafka_schema_reader_strict_mode"] = True + config = karapace_container.config().set_config_defaults({"kafka_schema_reader_strict_mode": True}) offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index b4d87f35b..f21f47097 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -2,64 +2,79 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from aiohttp.test_utils import TestClient, TestServer -from karapace.config import DEFAULTS, set_config_defaults +from fastapi.exceptions import HTTPException from karapace.rapu import HTTPResponse +from karapace.schema_models import SchemaType, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader -from karapace.schema_registry import KarapaceSchemaRegistry -from karapace.schema_registry_apis import KarapaceSchemaRegistryController -from unittest.mock import ANY, AsyncMock, Mock, patch, PropertyMock +from schema_registry.container import SchemaRegistryContainer +from unittest.mock import Mock, patch, PropertyMock import asyncio +import json import pytest +TYPED_AVRO_SCHEMA = ValidatedTypedSchema.parse( + SchemaType.AVRO, + json.dumps( + { + "namespace": "io.aiven.data", + "name": "Test", + "type": "record", + "fields": [ + { + "name": "attr1", + "type": ["null", "string"], + } + ], + } + ), +) -async def test_validate_schema_request_body() -> None: - controller = KarapaceSchemaRegistryController(config=set_config_defaults(DEFAULTS)) - controller._validate_schema_request_body( # pylint: disable=W0212 - "application/json", {"schema": "{}", "schemaType": "JSON", "references": [], "metadata": {}, "ruleSet": {}} +async def test_validate_schema_request_body(schema_registry_container: SchemaRegistryContainer) -> None: + schema_registry_container.schema_registry_controller()._validate_schema_type( # pylint: disable=W0212 + {"schema": "{}", "schemaType": "JSON", "references": [], "metadata": {}, "ruleSet": {}} ) - with pytest.raises(HTTPResponse) as exc_info: - controller._validate_schema_request_body( # pylint: disable=W0212 - "application/json", - {"schema": "{}", "schemaType": "JSON", "references": [], "unexpected_field_name": {}, "ruleSet": {}}, + with pytest.raises(HTTPException) as exc_info: + schema_registry_container.schema_registry_controller()._validate_schema_type( # pylint: disable=W0212 + {"schema": "{}", "schemaType": "DOES_NOT_EXIST", "references": [], "unexpected_field_name": {}, "ruleSet": {}}, ) - assert exc_info.type is HTTPResponse - assert str(exc_info.value) == "HTTPResponse 422" + assert exc_info.type is HTTPException + assert str(exc_info.value) == "422: {'error_code': 422, 'message': 'Invalid schemaType DOES_NOT_EXIST'}" -async def test_forward_when_not_ready() -> None: - with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class: +async def test_forward_when_not_ready(schema_registry_container: SchemaRegistryContainer) -> None: + with patch("karapace.container.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) - ready_property_mock = PropertyMock(return_value=lambda: False) - schema_registry = AsyncMock(spec=KarapaceSchemaRegistry) + ready_property_mock = PropertyMock(return_value=False) type(schema_reader_mock).ready = ready_property_mock - schema_registry.schema_reader = schema_reader_mock - schema_registry_class.return_value = schema_registry + schema_registry_class.schema_reader = schema_reader_mock - schema_registry.get_master.return_value = (False, "http://primary-url") + schema_registry_class.schemas_get.return_value = TYPED_AVRO_SCHEMA + schema_registry_class.get_master.return_value = (False, "http://primary-url") close_future_result = asyncio.Future() close_future_result.set_result(True) close_func = Mock() close_func.return_value = close_future_result - schema_registry.close = close_func + schema_registry_class.close = close_func + + schema_registry_container.karapace_container().schema_registry = schema_registry_class + controller = schema_registry_container.schema_registry_controller() + controller.schema_registry = schema_registry_class - controller = KarapaceSchemaRegistryController(config=set_config_defaults(DEFAULTS)) mock_forward_func_future = asyncio.Future() mock_forward_func_future.set_exception(HTTPResponse({"mock": "response"})) mock_forward_func = Mock() mock_forward_func.return_value = mock_forward_func_future controller._forward_request_remote = mock_forward_func # pylint: disable=protected-access - test_server = TestServer(controller.app) - async with TestClient(test_server) as client: - await client.get("/schemas/ids/1", headers={"Content-Type": "application/json"}) - - ready_property_mock.assert_called_once() - schema_registry.get_master.assert_called_once() - mock_forward_func.assert_called_once_with( - request=ANY, body=None, url="http://primary-url/schemas/ids/1", content_type="application/json", method="GET" - ) + assert await controller.schemas_get( + schema_id=1, + include_subjects=False, + fetch_max_id=False, + format_serialized="", + user=None, + authorizer=None, + ) diff --git a/tests/unit/test_serialization.py b/tests/unit/test_serialization.py index a21d3bc00..041df44ab 100644 --- a/tests/unit/test_serialization.py +++ b/tests/unit/test_serialization.py @@ -2,8 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace.client import Path -from karapace.config import DEFAULTS, read_config +from karapace.container import KarapaceContainer from karapace.schema_models import SchemaType, ValidatedTypedSchema, Versioner from karapace.serialization import ( flatten_unions, @@ -12,6 +11,7 @@ InvalidMessageHeader, InvalidMessageSchema, InvalidPayload, + SchemaRegistryClient, SchemaRegistrySerializer, START_BYTE, write_value, @@ -109,16 +109,16 @@ ) -async def make_ser_deser(config_path: str, mock_client) -> SchemaRegistrySerializer: - with open(config_path, encoding="utf8") as handler: - config = read_config(handler) - serializer = SchemaRegistrySerializer(config=config) +async def make_ser_deser( + karapace_container: KarapaceContainer, mock_client: SchemaRegistryClient +) -> SchemaRegistrySerializer: + serializer = SchemaRegistrySerializer(config=karapace_container.config()) await serializer.registry_client.close() serializer.registry_client = mock_client return serializer -async def test_happy_flow(default_config_path: Path): +async def test_happy_flow(karapace_container: KarapaceContainer): mock_registry_client = Mock() get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result((1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), Versioner.V(1))) @@ -127,7 +127,7 @@ async def test_happy_flow(default_config_path: Path): schema_for_id_one_future.set_result((ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), [Subject("stub")])) mock_registry_client.get_schema_for_id.return_value = schema_for_id_one_future - serializer = await make_ser_deser(default_config_path, mock_registry_client) + serializer = await make_ser_deser(karapace_container, mock_registry_client) assert len(serializer.ids_to_schemas) == 0 schema = await serializer.get_schema_for_subject(Subject("top")) for o in test_objects_avro: @@ -213,7 +213,7 @@ def test_flatten_unions_map() -> None: assert flatten_unions(typed_schema.schema, record) == flatten_record -def test_avro_json_write_invalid() -> None: +def test_avro_json_write_invalid(karapace_container: KarapaceContainer) -> None: schema = { "namespace": "io.aiven.data", "name": "Test", @@ -236,10 +236,10 @@ def test_avro_json_write_invalid() -> None: for record in records: with pytest.raises(avro.errors.AvroTypeException): - write_value(DEFAULTS, typed_schema, bio, record) + write_value(karapace_container.config(), typed_schema, bio, record) -def test_avro_json_write_accepts_json_encoded_data_without_tagged_unions() -> None: +def test_avro_json_write_accepts_json_encoded_data_without_tagged_unions(karapace_container: KarapaceContainer) -> None: """Backwards compatibility test for Avro data using JSON encoding. The initial behavior of the API was incorrect, and it accept data with @@ -299,24 +299,24 @@ def test_avro_json_write_accepts_json_encoded_data_without_tagged_unions() -> No buffer_a = io.BytesIO() buffer_b = io.BytesIO() - write_value(DEFAULTS, typed_schema, buffer_a, properly_tagged_encoding_a) - write_value(DEFAULTS, typed_schema, buffer_b, missing_tag_encoding_a) + write_value(karapace_container.config(), typed_schema, buffer_a, properly_tagged_encoding_a) + write_value(karapace_container.config(), typed_schema, buffer_b, missing_tag_encoding_a) assert buffer_a.getbuffer() == buffer_b.getbuffer() buffer_a = io.BytesIO() buffer_b = io.BytesIO() - write_value(DEFAULTS, typed_schema, buffer_a, properly_tagged_encoding_b) - write_value(DEFAULTS, typed_schema, buffer_b, missing_tag_encoding_b) + write_value(karapace_container.config(), typed_schema, buffer_a, properly_tagged_encoding_b) + write_value(karapace_container.config(), typed_schema, buffer_b, missing_tag_encoding_b) assert buffer_a.getbuffer() == buffer_b.getbuffer() -async def test_serialization_fails(default_config_path: Path): +async def test_serialization_fails(karapace_container: KarapaceContainer): mock_registry_client = Mock() get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result((1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), Versioner.V(1))) mock_registry_client.get_schema.return_value = get_latest_schema_future - serializer = await make_ser_deser(default_config_path, mock_registry_client) + serializer = await make_ser_deser(karapace_container, mock_registry_client) with pytest.raises(InvalidMessageSchema): schema = await serializer.get_schema_for_subject(Subject("topic")) await serializer.serialize(schema, {"foo": "bar"}) @@ -324,13 +324,13 @@ async def test_serialization_fails(default_config_path: Path): assert mock_registry_client.method_calls == [call.get_schema("topic")] -async def test_deserialization_fails(default_config_path: Path): +async def test_deserialization_fails(karapace_container: KarapaceContainer): mock_registry_client = Mock() schema_for_id_one_future = asyncio.Future() schema_for_id_one_future.set_result((ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), [Subject("stub")])) mock_registry_client.get_schema_for_id.return_value = schema_for_id_one_future - deserializer = await make_ser_deser(default_config_path, mock_registry_client) + deserializer = await make_ser_deser(karapace_container, mock_registry_client) invalid_header_payload = struct.pack(">bII", 1, 500, 500) with pytest.raises(InvalidMessageHeader): await deserializer.deserialize(invalid_header_payload)