diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index 8d0e9b738..25f88d1ed 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -58,7 +58,7 @@ def schema_coordinator(self) -> SchemaCoordinator | None: def config(self) -> Config: return self._config - async def start(self) -> None: + def start(self) -> None: self._thread.start() def _start_loop(self) -> None: @@ -90,10 +90,13 @@ async def _async_loop(self) -> None: self._sc = self.init_schema_coordinator() while self._running: - if self._sc.ready(): - break + # why do we need to close? + # we just need to keep running even when the schema registry its ready + # otherwise we cause a rebalance and a new election. This should run until + # karapace is restarted + # if self._sc.ready(): + # break await asyncio.sleep(0.5) - # todo: wait a condition variable or a lock. LOG.info("Closing master_coordinator") if self._sc: await self._sc.close() @@ -171,4 +174,3 @@ def get_master_info(self) -> tuple[bool | None, str | None]: async def close(self) -> None: self._running = False - # todo set the condition variable or lock. diff --git a/src/karapace/coordinator/schema_coordinator.py b/src/karapace/coordinator/schema_coordinator.py index 7fd662590..058318578 100644 --- a/src/karapace/coordinator/schema_coordinator.py +++ b/src/karapace/coordinator/schema_coordinator.py @@ -209,8 +209,7 @@ def are_we_master(self) -> bool | None: """ if self._are_we_master is None: # `self._are_we_master` is `None` only during the perform of the assignment - # where we don't know if we are master yet (probably we should return false due to the - # new logic of waiting for a while before enabling the writes), todo: check me later. + # where we don't know if we are master yet LOG.warning("No new elections performed yet.") return None @@ -529,6 +528,7 @@ def reset_generation(self) -> None: """Coordinator did not recognize either generation or member_id. Will need to re-join the group. """ + LOG.info("Resetting generation status") self._are_we_master = False self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID diff --git a/src/karapace/schema_registry.py b/src/karapace/schema_registry.py index 614b4a6ad..5c553b0db 100644 --- a/src/karapace/schema_registry.py +++ b/src/karapace/schema_registry.py @@ -58,12 +58,6 @@ def __init__(self, config: Config) -> None: master_coordinator=self.mc, database=self.database, ) - # very ugly, left as a placeholder, since we have a bidirectional - # dependency it means that the two objects needs to be one (aka the - # mc should create the KafkaSchemaReader and inject the stopper inside - # the schema_coordinator. Left as it is to reason together to the implementation - # since semantically it's the same, after we agree on the solution proceeding with - # the refactor) self.mc.set_stoppper(self.schema_reader) self.schema_lock = asyncio.Lock() @@ -81,7 +75,7 @@ def get_schemas(self, subject: Subject, *, include_deleted: bool = False) -> lis return list(schema_versions.values()) async def start(self) -> None: - await self.mc.start() + self.mc.start() self.schema_reader.start() self.producer.initialize_karapace_producer() diff --git a/src/karapace/schema_registry_apis.py b/src/karapace/schema_registry_apis.py index 38aad1d82..b68be929e 100644 --- a/src/karapace/schema_registry_apis.py +++ b/src/karapace/schema_registry_apis.py @@ -84,7 +84,6 @@ class SchemaErrorMessages(Enum): class KarapaceSchemaRegistryController(KarapaceBase): def __init__(self, config: Config) -> None: - # the `not_ready_handler` its wrong, its not expecting an async method the receiver. super().__init__(config=config, not_ready_handler=self._forward_if_not_ready_to_serve) self._auth: HTTPAuthorizer | None = None @@ -104,7 +103,7 @@ async def schema_registry_health(self) -> HealthCheck: resp = {} if self._auth is not None: resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified - resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready + resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready() if self.schema_registry.schema_reader.ready(): resp["schema_registry_startup_time_sec"] = ( self.schema_registry.schema_reader.last_check - self._process_start_time @@ -686,6 +685,7 @@ async def config_subject_delete( async def master_available(self, content_type: str, *, request: HTTPRequest) -> None: are_we_master, master_url = await self.schema_registry.get_master() + self.log.info("are master %s, master url %s", are_we_master, master_url) if ( self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access @@ -693,10 +693,11 @@ async def master_available(self, content_type: str, *, request: HTTPRequest) -> ): self.r({"master_available": are_we_master}, content_type) - if master_url is None: + if master_url is None or f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in master_url: self.r({"master_available": False}, content_type) else: url = f"{master_url}/master_available" + await self._forward_if_not_ready_to_serve await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="POST") async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None: diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 89b5a74f5..583008b27 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -283,6 +283,7 @@ async def fixture_rest_async( "bootstrap_uri": kafka_servers.bootstrap_servers, # Use non-default max request size for REST producer. "producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES, + "waiting_time_before_acting_as_master_ms": 300, } ) write_config(config_path, config) @@ -357,6 +358,7 @@ async def fixture_rest_async_novalidation( # Use non-default max request size for REST producer. "producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES, "name_strategy_validation": False, # This should be only difference from rest_async + "waiting_time_before_acting_as_master_ms": 300, } ) write_config(config_path, config) @@ -430,6 +432,7 @@ async def fixture_rest_async_registry_auth( "registry_port": registry.port, "registry_user": "admin", "registry_password": "admin", + "waiting_time_before_acting_as_master_ms": 300, } ) rest = KafkaRest(config=config) diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index 62cc4392d..c4c8c95a8 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -28,7 +28,7 @@ def set_not_ready(self) -> None: async def init_admin(config): mc = MasterCoordinator(config=config) mc.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) - await mc.start() + mc.start() return mc diff --git a/tests/integration/test_schema_coordinator.py b/tests/integration/test_schema_coordinator.py index 15c432d10..485e962dd 100644 --- a/tests/integration/test_schema_coordinator.py +++ b/tests/integration/test_schema_coordinator.py @@ -115,7 +115,7 @@ async def test_coordinator_workflow( # Check if the initial group join is performed correctly with minimal # setup - waiting_time_before_acting_as_master_sec = 5 + waiting_time_before_acting_as_master_sec = 6 coordinator = SchemaCoordinator( client, AlwaysAvailableSchemaReaderStoppper(), @@ -128,7 +128,8 @@ async def test_coordinator_workflow( session_timeout_ms=10000, heartbeat_interval_ms=500, retry_backoff_ms=100, - waiting_time_before_acting_as_master_ms=waiting_time_before_acting_as_master_sec * 1000, + # removing 1 second to consider the network latency in the rest of the test + waiting_time_before_acting_as_master_ms=(waiting_time_before_acting_as_master_sec - 1) * 1000, ) coordinator.start() assert coordinator.coordinator_id is None @@ -137,8 +138,8 @@ async def test_coordinator_workflow( assert coordinator.coordinator_id is not None assert not coordinator.are_we_master() - # the waiting_time_before_acting_as_master_ms - await asyncio.sleep(10) + # waiting a little bit more since the cluster needs to setup. + await asyncio.sleep(2 * waiting_time_before_acting_as_master_sec) assert not coordinator.are_we_master(), "last fetch before being available as master" assert coordinator.are_we_master(), f"after {waiting_time_before_acting_as_master_sec} seconds we can act as a master" @@ -170,9 +171,14 @@ async def test_coordinator_workflow( secondary_client = client if primary_selection_strategy == "highest" else client2 if primary == coordinator2: - # we need to disable the master for `waiting_time_before_acting_as_master_sec` seconds each time, we cannot be sure. + # we need to disable the master for `waiting_time_before_acting_as_master_sec` seconds each time, + # a new node its elected as master. # if the coordinator its `coordinator1` since isn't changed we don't have to wait # for the `waiting_time_before_acting_as_master_sec` seconds. + + # give time to the election to be forwarded to all the coordinators. + await asyncio.sleep(3) + assert ( not primary.are_we_master() ), "after a change in the coordinator we can act as a master until we wait for the required time" @@ -180,6 +186,7 @@ async def test_coordinator_workflow( # after that time the primary can act as a master await asyncio.sleep(waiting_time_before_acting_as_master_sec) assert not primary.are_we_master(), "Last fetch before being available as master" + assert not secondary.are_we_master(), "secondary cannot be master" assert primary.are_we_master() assert not secondary.are_we_master() diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index c5516951a..a1cf382b4 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -73,7 +73,7 @@ async def test_regression_soft_delete_schemas_should_be_registered( master_coordinator = MasterCoordinator(config=config) master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: - await master_coordinator.start() + master_coordinator.start() database = InMemoryDatabase() offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( @@ -166,7 +166,7 @@ async def test_regression_config_for_inexisting_object_should_not_throw( master_coordinator = MasterCoordinator(config=config) master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: - await master_coordinator.start() + master_coordinator.start() database = InMemoryDatabase() offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( @@ -271,7 +271,7 @@ async def test_key_format_detection( master_coordinator = MasterCoordinator(config=config) master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: - await master_coordinator.start() + master_coordinator.start() key_formatter = KeyFormatter() database = InMemoryDatabase() offset_watcher = OffsetWatcher() diff --git a/tests/utils.py b/tests/utils.py index 5c6d8507b..d5b757245 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -324,9 +324,7 @@ async def repeat_until_master_is_available(client: Client) -> None: while True: res = await client.post("/master_available", json={}) reply = res.json() - breakpoint() if reply is not None and "master_available" in reply and reply["master_available"] is True: - breakpoint() break time.sleep(1)