From d7cdd8a4253288d1780b410ba4dd9cc3bd7fee01 Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Wed, 13 Nov 2024 16:50:49 +0100 Subject: [PATCH] WIP: test if this fixes all --- .../coordinator/master_coordinator.py | 10 +++++--- .../coordinator/schema_coordinator.py | 24 +++++++++++++++++++ src/karapace/schema_registry.py | 9 ++++++- src/karapace/schema_registry_apis.py | 9 +++++-- tests/integration/test_master_coordinator.py | 2 +- tests/integration/test_schema_coordinator.py | 17 +++++++++---- tests/integration/test_schema_reader.py | 6 ++--- tests/utils.py | 2 -- 8 files changed, 62 insertions(+), 17 deletions(-) diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index 8d0e9b738..ed96b5cc8 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -58,7 +58,7 @@ def schema_coordinator(self) -> SchemaCoordinator | None: def config(self) -> Config: return self._config - async def start(self) -> None: + def start(self) -> None: self._thread.start() def _start_loop(self) -> None: @@ -90,8 +90,12 @@ async def _async_loop(self) -> None: self._sc = self.init_schema_coordinator() while self._running: - if self._sc.ready(): - break + # why do we need to close? + # we just need to keep running even when the schema registry its ready + # otherwise we cause a rebalance and a new election. This should run until + # karapace is restarted + # if self._sc.ready(): + # break await asyncio.sleep(0.5) # todo: wait a condition variable or a lock. LOG.info("Closing master_coordinator") diff --git a/src/karapace/coordinator/schema_coordinator.py b/src/karapace/coordinator/schema_coordinator.py index 7fd662590..df5d2ff85 100644 --- a/src/karapace/coordinator/schema_coordinator.py +++ b/src/karapace/coordinator/schema_coordinator.py @@ -207,6 +207,29 @@ def are_we_master(self) -> bool | None: The condition to resume being the master its: no new messages are still to be processed + at least 5 seconds have passed since we were elected master """ + + cur_time = time.monotonic() + + LOG.warning( + """ + state of the variables: + self.hostname %s + self._ready: %s + self._schema_reader_stopper.ready(): %s + self._are_we_master: %s + self._initial_election_sec %s + self._waiting_time_before_acting_as_master_ms %s + time.monotonic() %s + """, + self.hostname, + self._ready, + self._schema_reader_stopper.ready(), + self._are_we_master, + self._initial_election_sec, + self._waiting_time_before_acting_as_master_ms, + cur_time, + ) + if self._are_we_master is None: # `self._are_we_master` is `None` only during the perform of the assignment # where we don't know if we are master yet (probably we should return false due to the @@ -529,6 +552,7 @@ def reset_generation(self) -> None: """Coordinator did not recognize either generation or member_id. Will need to re-join the group. """ + LOG.info("Resetting generation status") self._are_we_master = False self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID diff --git a/src/karapace/schema_registry.py b/src/karapace/schema_registry.py index 614b4a6ad..fd88bbd50 100644 --- a/src/karapace/schema_registry.py +++ b/src/karapace/schema_registry.py @@ -81,7 +81,7 @@ def get_schemas(self, subject: Subject, *, include_deleted: bool = False) -> lis return list(schema_versions.values()) async def start(self) -> None: - await self.mc.start() + self.mc.start() self.schema_reader.start() self.producer.initialize_karapace_producer() @@ -98,9 +98,16 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | follower/primary state and primary url. :return (bool, Optional[str]): returns the primary/follower state and primary url """ + LOG.warning("checking stuff") async with self._master_lock: + LOG.warning("aquired log") while True: + LOG.warning("calling master info") are_we_master, master_url = self.mc.get_master_info() + LOG.warning("done") + LOG.warning( + "Master %s is %s. Schema reader is ready %s", master_url, are_we_master, self.schema_reader.ready() + ) if are_we_master is None: LOG.info("No master set: %r, url: %r", are_we_master, master_url) elif not ignore_readiness and self.schema_reader.ready() is False: diff --git a/src/karapace/schema_registry_apis.py b/src/karapace/schema_registry_apis.py index 38aad1d82..483e0df16 100644 --- a/src/karapace/schema_registry_apis.py +++ b/src/karapace/schema_registry_apis.py @@ -104,7 +104,7 @@ async def schema_registry_health(self) -> HealthCheck: resp = {} if self._auth is not None: resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified - resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready + resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready() if self.schema_registry.schema_reader.ready(): resp["schema_registry_startup_time_sec"] = ( self.schema_registry.schema_reader.last_check - self._process_start_time @@ -686,6 +686,7 @@ async def config_subject_delete( async def master_available(self, content_type: str, *, request: HTTPRequest) -> None: are_we_master, master_url = await self.schema_registry.get_master() + self.log.info("are master %s, master url %s", are_we_master, master_url) if ( self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access @@ -693,10 +694,14 @@ async def master_available(self, content_type: str, *, request: HTTPRequest) -> ): self.r({"master_available": are_we_master}, content_type) - if master_url is None: + if master_url is None or f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in master_url: self.r({"master_available": False}, content_type) else: url = f"{master_url}/master_available" + self.log.info("forwarding to %s", url) + self.log.info("Self advertising hostname: %s", self.config["advertised_hostname"]) + self.log.info("Self advertising port: %s", self.config["advertised_port"]) + await self._forward_if_not_ready_to_serve await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="POST") async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None: diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index 62cc4392d..c4c8c95a8 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -28,7 +28,7 @@ def set_not_ready(self) -> None: async def init_admin(config): mc = MasterCoordinator(config=config) mc.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) - await mc.start() + mc.start() return mc diff --git a/tests/integration/test_schema_coordinator.py b/tests/integration/test_schema_coordinator.py index 15c432d10..485e962dd 100644 --- a/tests/integration/test_schema_coordinator.py +++ b/tests/integration/test_schema_coordinator.py @@ -115,7 +115,7 @@ async def test_coordinator_workflow( # Check if the initial group join is performed correctly with minimal # setup - waiting_time_before_acting_as_master_sec = 5 + waiting_time_before_acting_as_master_sec = 6 coordinator = SchemaCoordinator( client, AlwaysAvailableSchemaReaderStoppper(), @@ -128,7 +128,8 @@ async def test_coordinator_workflow( session_timeout_ms=10000, heartbeat_interval_ms=500, retry_backoff_ms=100, - waiting_time_before_acting_as_master_ms=waiting_time_before_acting_as_master_sec * 1000, + # removing 1 second to consider the network latency in the rest of the test + waiting_time_before_acting_as_master_ms=(waiting_time_before_acting_as_master_sec - 1) * 1000, ) coordinator.start() assert coordinator.coordinator_id is None @@ -137,8 +138,8 @@ async def test_coordinator_workflow( assert coordinator.coordinator_id is not None assert not coordinator.are_we_master() - # the waiting_time_before_acting_as_master_ms - await asyncio.sleep(10) + # waiting a little bit more since the cluster needs to setup. + await asyncio.sleep(2 * waiting_time_before_acting_as_master_sec) assert not coordinator.are_we_master(), "last fetch before being available as master" assert coordinator.are_we_master(), f"after {waiting_time_before_acting_as_master_sec} seconds we can act as a master" @@ -170,9 +171,14 @@ async def test_coordinator_workflow( secondary_client = client if primary_selection_strategy == "highest" else client2 if primary == coordinator2: - # we need to disable the master for `waiting_time_before_acting_as_master_sec` seconds each time, we cannot be sure. + # we need to disable the master for `waiting_time_before_acting_as_master_sec` seconds each time, + # a new node its elected as master. # if the coordinator its `coordinator1` since isn't changed we don't have to wait # for the `waiting_time_before_acting_as_master_sec` seconds. + + # give time to the election to be forwarded to all the coordinators. + await asyncio.sleep(3) + assert ( not primary.are_we_master() ), "after a change in the coordinator we can act as a master until we wait for the required time" @@ -180,6 +186,7 @@ async def test_coordinator_workflow( # after that time the primary can act as a master await asyncio.sleep(waiting_time_before_acting_as_master_sec) assert not primary.are_we_master(), "Last fetch before being available as master" + assert not secondary.are_we_master(), "secondary cannot be master" assert primary.are_we_master() assert not secondary.are_we_master() diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index c5516951a..a1cf382b4 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -73,7 +73,7 @@ async def test_regression_soft_delete_schemas_should_be_registered( master_coordinator = MasterCoordinator(config=config) master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: - await master_coordinator.start() + master_coordinator.start() database = InMemoryDatabase() offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( @@ -166,7 +166,7 @@ async def test_regression_config_for_inexisting_object_should_not_throw( master_coordinator = MasterCoordinator(config=config) master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: - await master_coordinator.start() + master_coordinator.start() database = InMemoryDatabase() offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( @@ -271,7 +271,7 @@ async def test_key_format_detection( master_coordinator = MasterCoordinator(config=config) master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: - await master_coordinator.start() + master_coordinator.start() key_formatter = KeyFormatter() database = InMemoryDatabase() offset_watcher = OffsetWatcher() diff --git a/tests/utils.py b/tests/utils.py index 5c6d8507b..d5b757245 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -324,9 +324,7 @@ async def repeat_until_master_is_available(client: Client) -> None: while True: res = await client.post("/master_available", json={}) reply = res.json() - breakpoint() if reply is not None and "master_available" in reply and reply["master_available"] is True: - breakpoint() break time.sleep(1)