diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index a2f993155..8d1b8545d 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -11,6 +11,7 @@ from karapace import constants from karapace.utils import KarapaceKafkaClient from threading import Lock, Thread +from typing import Optional, Tuple import json import logging @@ -30,7 +31,7 @@ class SchemaCoordinator(BaseCoordinator): hostname = None port = None scheme = None - master = None + are_we_master = None master_url = None master_eligibility = True log = logging.getLogger("SchemaCoordinator") @@ -49,16 +50,25 @@ def group_protocols(self): def _perform_assignment(self, leader_id, protocol, members): self.log.info("Creating assignment: %r, protocol: %r, members: %r", leader_id, protocol, members) - self.master = None + self.are_we_master = None error = NO_ERROR urls = {} + fallback_urls = {} for member_id, member_data in members: member_identity = json.loads(member_data.decode("utf8")) if member_identity["master_eligibility"] is True: urls[get_identity_url(member_identity["scheme"], member_identity["host"], member_identity["port"])] = (member_id, member_data) - self.master_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] - schema_master_id, member_data = urls[self.master_url] + else: + fallback_urls[get_identity_url(member_identity["scheme"], member_identity["host"], + member_identity["port"])] = (member_id, member_data) + if len(urls) > 0: + chosen_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] + schema_master_id, member_data = urls[chosen_url] + else: + # Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be + chosen_url = sorted(fallback_urls, reverse=self.election_strategy.lower() == "highest")[0] + schema_master_id, member_data = fallback_urls[chosen_url] member_identity = json.loads(member_data.decode("utf8")) identity = self.get_identity( host=member_identity["host"], @@ -66,7 +76,7 @@ def _perform_assignment(self, leader_id, protocol, members): scheme=member_identity["scheme"], json_encode=False, ) - self.log.info("Chose: %r with url: %r as the master", schema_master_id, self.master_url) + self.log.info("Chose: %r with url: %r as the master", schema_master_id, chosen_url) assignments = {} for member_id, member_data in members: @@ -90,12 +100,16 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b host=member_identity["host"], port=member_identity["port"], ) - if member_assignment["master"] == member_id: + # On Kafka protocol we can be assigned to be master, but if not master eligible, then we're not master for real + if member_assignment["master"] == member_id and member_identity["master_eligibility"]: self.master_url = master_url - self.master = True + self.are_we_master = True + elif not member_identity["master_eligibility"]: + self.master_url = None + self.are_we_master = False else: self.master_url = master_url - self.master = False + self.are_we_master = False return super(SchemaCoordinator, self)._on_join_complete(generation, member_id, protocol, member_assignment_bytes) def _on_join_follower(self): @@ -157,10 +171,10 @@ def init_schema_coordinator(self): self.sc.master_eligibility = self.config["master_eligibility"] self.lock.release() # self.sc now exists, we get to release the lock - def get_master_info(self): + def get_master_info(self) -> Tuple[bool, Optional[str]]: """Return whether we're the master, and the actual master url that can be used if we're not""" with self.lock: - return self.sc.master, self.sc.master_url + return self.sc.are_we_master, self.sc.master_url def close(self): self.log.info("Closing master_coordinator") @@ -179,7 +193,7 @@ def run(self): self.sc.ensure_active_group() self.sc.poll_heartbeat() - self.log.debug("We're master: %r: master_uri: %r", self.sc.master, self.sc.master_url) + self.log.debug("We're master: %r: master_uri: %r", self.sc.are_we_master, self.sc.master_url) time.sleep(min(_hb_interval, self.sc.time_to_next_heartbeat())) except: # pylint: disable=bare-except self.log.exception("Exception in master_coordinator") diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index e51f01421..676395461 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -239,11 +239,12 @@ def handle_messages(self): self.ready = True add_offsets = False if self.master_coordinator is not None: - master, _ = self.master_coordinator.get_master_info() - # keep old behavior for True. When master is False, then we are a follower, so we should not accept direct - # writes anyway. When master is None, then this particular node is waiting for a stable value, so any + are_we_master, _ = self.master_coordinator.get_master_info() + # keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct + # writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any # messages off the topic are writes performed by another node - if master is True: + # Also if master_elibility is disabled by configuration, disable writes too + if are_we_master is True: add_offsets = True for _, msgs in raw_msgs.items(): diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index ec76ce539..fc6b1eb37 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -10,7 +10,7 @@ from karapace.rapu import HTTPRequest from karapace.schema_reader import InvalidSchema, KafkaSchemaReader, SchemaType, TypedSchema from karapace.utils import json_encode -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Tuple import argparse import asyncio @@ -400,7 +400,7 @@ async def config_set(self, content_type, *, request): are_we_master, master_url = await self.get_master() if are_we_master: self.send_config_message(compatibility_level=compatibility_level, subject=None) - elif are_we_master is None: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/config" @@ -451,7 +451,7 @@ async def config_subject_set(self, content_type, *, request, subject): are_we_master, master_url = await self.get_master() if are_we_master: self.send_config_message(compatibility_level=compatibility_level, subject=subject) - elif are_we_master is None: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/config/{subject}" @@ -498,7 +498,7 @@ async def subject_delete(self, content_type, *, subject, request: HTTPRequest): if are_we_master: async with self.schema_lock: await self._subject_delete_local(content_type, subject, permanent) - elif are_we_master is None: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}?permanent={permanent}" @@ -602,7 +602,7 @@ async def subject_version_delete(self, content_type, *, subject, version, reques if are_we_master: async with self.schema_lock: await self._subject_version_delete_local(content_type, subject, version, permanent) - elif are_we_master is None: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}" @@ -632,16 +632,16 @@ async def subject_versions_list(self, content_type, *, subject): subject_data = self._subject_get(subject, content_type) self.r(list(subject_data["schemas"]), content_type, status=HTTPStatus.OK) - async def get_master(self): + async def get_master(self) -> Tuple[bool, Optional[str]]: async with self.master_lock: while True: - master, master_url = self.mc.get_master_info() - if master is None: - self.log.info("No master set: %r, url: %r", master, master_url) + are_we_master, master_url = self.mc.get_master_info() + if are_we_master is None: + self.log.info("No master set: %r, url: %r", are_we_master, master_url) elif self.ksr.ready is False: self.log.info("Schema reader isn't ready yet: %r", self.ksr.ready) else: - return master, master_url + return are_we_master, master_url await asyncio.sleep(1.0) def _validate_schema_request_body(self, content_type, body) -> None: @@ -749,7 +749,7 @@ async def subject_post(self, content_type, *, subject, request): if are_we_master: async with self.schema_lock: await self.write_new_schema_local(subject, body, content_type) - elif are_we_master is None: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions" diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index a70dcaa7e..6d95a972d 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -32,12 +32,12 @@ def is_master(mc: MasterCoordinator) -> bool: This takes care of a race condition were the flag `master` is set but `master_url` is not yet set. """ - return bool(mc.sc and mc.sc.master and mc.sc.master_url) + return bool(mc.sc and mc.sc.are_we_master and mc.sc.master_url) def has_master(mc: MasterCoordinator) -> bool: """ True if `mc` has a master. """ - return bool(mc.sc and not mc.sc.master and mc.sc.master_url) + return bool(mc.sc and not mc.sc.are_we_master and mc.sc.master_url) @pytest.mark.timeout(60) # Github workflows need a bit of extra time @@ -91,6 +91,29 @@ def test_master_selection(kafka_servers: KafkaServers, strategy: str) -> None: assert slave.sc.master_url == master_url +def test_no_eligible_master(kafka_servers: KafkaServers) -> None: + client_id = new_random_name("master_selection_") + group_id = new_random_name("group_id") + + config_aa = set_config_defaults({ + "advertised_hostname": "127.0.0.1", + "bootstrap_uri": kafka_servers.bootstrap_servers, + "client_id": client_id, + "group_id": group_id, + "port": get_random_port(port_range=TESTS_PORT_RANGE, blacklist=[]), + "master_eligibility": False, + }) + + with closing(init_admin(config_aa)) as mc: + # Wait for the election to happen, ie. flag is not None + while not mc.sc or mc.sc.are_we_master is None: + time.sleep(0.3) + + # Make sure the end configuration is as expected + assert mc.sc.are_we_master is False + assert mc.sc.master_url is None + + async def test_schema_request_forwarding(registry_async_pair): master_url, slave_url = registry_async_pair max_tries, counter = 5, 0