From 41f8224af53a0faef1e6c6345050f54ac8b382e8 Mon Sep 17 00:00:00 2001 From: Tommi Vainikainen Date: Mon, 24 May 2021 16:47:45 +0300 Subject: [PATCH 1/3] Handle gracefully if no node is master eligible Karapace configuration allows configuring node to not be eligible for master. Handle gracefully ie. read-only mode if all nodes are configured non-eligible for master. --- karapace/master_coordinator.py | 31 +++++++++++----- karapace/schema_reader.py | 9 ++--- karapace/schema_registry_apis.py | 38 ++++++++++---------- tests/integration/test_master_coordinator.py | 29 +++++++++++++-- 4 files changed, 74 insertions(+), 33 deletions(-) diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index a2f993155..01afb5311 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -30,7 +30,8 @@ class SchemaCoordinator(BaseCoordinator): hostname = None port = None scheme = None - master = None + are_we_master = None + is_master_eligible = None master_url = None master_eligibility = True log = logging.getLogger("SchemaCoordinator") @@ -49,16 +50,25 @@ def group_protocols(self): def _perform_assignment(self, leader_id, protocol, members): self.log.info("Creating assignment: %r, protocol: %r, members: %r", leader_id, protocol, members) - self.master = None + self.are_we_master = None error = NO_ERROR urls = {} + fallback_urls = {} for member_id, member_data in members: member_identity = json.loads(member_data.decode("utf8")) if member_identity["master_eligibility"] is True: urls[get_identity_url(member_identity["scheme"], member_identity["host"], member_identity["port"])] = (member_id, member_data) - self.master_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] - schema_master_id, member_data = urls[self.master_url] + else: + fallback_urls[get_identity_url(member_identity["scheme"], member_identity["host"], + member_identity["port"])] = (member_id, member_data) + if len(urls) > 0: + self.master_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] + schema_master_id, member_data = urls[self.master_url] + else: + # Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be + self.master_url = sorted(fallback_urls, reverse=self.election_strategy.lower() == "highest")[0] + schema_master_id, member_data = fallback_urls[self.master_url] member_identity = json.loads(member_data.decode("utf8")) identity = self.get_identity( host=member_identity["host"], @@ -92,10 +102,12 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b ) if member_assignment["master"] == member_id: self.master_url = master_url - self.master = True + self.are_we_master = True + self.is_master_eligible = member_identity["master_eligibility"] else: self.master_url = master_url - self.master = False + self.are_we_master = False + self.is_master_eligible = member_identity["master_eligibility"] return super(SchemaCoordinator, self)._on_join_complete(generation, member_id, protocol, member_assignment_bytes) def _on_join_follower(self): @@ -160,7 +172,7 @@ def init_schema_coordinator(self): def get_master_info(self): """Return whether we're the master, and the actual master url that can be used if we're not""" with self.lock: - return self.sc.master, self.sc.master_url + return self.sc.are_we_master, self.sc.is_master_eligible, self.sc.master_url def close(self): self.log.info("Closing master_coordinator") @@ -179,7 +191,10 @@ def run(self): self.sc.ensure_active_group() self.sc.poll_heartbeat() - self.log.debug("We're master: %r: master_uri: %r", self.sc.master, self.sc.master_url) + self.log.debug( + "We're master: %r (eligible master: %r): master_uri: %r", self.sc.are_we_master, + self.sc.is_master_eligible, self.sc.master_url + ) time.sleep(min(_hb_interval, self.sc.time_to_next_heartbeat())) except: # pylint: disable=bare-except self.log.exception("Exception in master_coordinator") diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index e51f01421..8c36f01ed 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -239,11 +239,12 @@ def handle_messages(self): self.ready = True add_offsets = False if self.master_coordinator is not None: - master, _ = self.master_coordinator.get_master_info() - # keep old behavior for True. When master is False, then we are a follower, so we should not accept direct - # writes anyway. When master is None, then this particular node is waiting for a stable value, so any + are_we_master, is_master_eligible, _ = self.master_coordinator.get_master_info() + # keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct + # writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any # messages off the topic are writes performed by another node - if master is True: + # Also if master_elibility is disabled by configuration, disable writes too + if are_we_master is True and is_master_eligible is True: add_offsets = True for _, msgs in raw_msgs.items(): diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index ec76ce539..e22295a55 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -397,10 +397,10 @@ async def config_set(self, content_type, *, request): status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - are_we_master, master_url = await self.get_master() - if are_we_master: + are_we_master, is_master_eligible, master_url = await self.get_master() + if are_we_master and is_master_eligible: self.send_config_message(compatibility_level=compatibility_level, subject=None) - elif are_we_master is None: + elif not is_master_eligible: self.no_master_error(content_type) else: url = f"{master_url}/config" @@ -448,10 +448,10 @@ async def config_subject_set(self, content_type, *, request, subject): status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - are_we_master, master_url = await self.get_master() - if are_we_master: + are_we_master, is_master_eligible, master_url = await self.get_master() + if are_we_master and is_master_eligible: self.send_config_message(compatibility_level=compatibility_level, subject=subject) - elif are_we_master is None: + elif not is_master_eligible: self.no_master_error(content_type) else: url = f"{master_url}/config/{subject}" @@ -494,11 +494,11 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent async def subject_delete(self, content_type, *, subject, request: HTTPRequest): permanent = request.query.get("permanent", "false").lower() == "true" - are_we_master, master_url = await self.get_master() - if are_we_master: + are_we_master, is_master_eligible, master_url = await self.get_master() + if are_we_master and is_master_eligible: async with self.schema_lock: await self._subject_delete_local(content_type, subject, permanent) - elif are_we_master is None: + elif not is_master_eligible: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}?permanent={permanent}" @@ -598,11 +598,11 @@ async def subject_version_delete(self, content_type, *, subject, version, reques version = int(version) permanent = request.query.get("permanent", "false").lower() == "true" - are_we_master, master_url = await self.get_master() - if are_we_master: + are_we_master, is_master_eligible, master_url = await self.get_master() + if are_we_master and is_master_eligible: async with self.schema_lock: await self._subject_version_delete_local(content_type, subject, version, permanent) - elif are_we_master is None: + elif not is_master_eligible: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}" @@ -635,13 +635,13 @@ async def subject_versions_list(self, content_type, *, subject): async def get_master(self): async with self.master_lock: while True: - master, master_url = self.mc.get_master_info() - if master is None: - self.log.info("No master set: %r, url: %r", master, master_url) + are_we_master, is_master_eligible, master_url = self.mc.get_master_info() + if are_we_master is None: + self.log.info("No master set: %r, url: %r", are_we_master, master_url) elif self.ksr.ready is False: self.log.info("Schema reader isn't ready yet: %r", self.ksr.ready) else: - return master, master_url + return are_we_master, is_master_eligible, master_url await asyncio.sleep(1.0) def _validate_schema_request_body(self, content_type, body) -> None: @@ -745,11 +745,11 @@ async def subject_post(self, content_type, *, subject, request): self._validate_schema_request_body(content_type, body) self._validate_schema_type(content_type, body) self._validate_schema_key(content_type, body) - are_we_master, master_url = await self.get_master() - if are_we_master: + are_we_master, is_master_eligible, master_url = await self.get_master() + if are_we_master and is_master_eligible: async with self.schema_lock: await self.write_new_schema_local(subject, body, content_type) - elif are_we_master is None: + elif not is_master_eligible: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions" diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index a70dcaa7e..dd11557a8 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -32,12 +32,12 @@ def is_master(mc: MasterCoordinator) -> bool: This takes care of a race condition were the flag `master` is set but `master_url` is not yet set. """ - return bool(mc.sc and mc.sc.master and mc.sc.master_url) + return bool(mc.sc and mc.sc.are_we_master and mc.sc.master_url) def has_master(mc: MasterCoordinator) -> bool: """ True if `mc` has a master. """ - return bool(mc.sc and not mc.sc.master and mc.sc.master_url) + return bool(mc.sc and not mc.sc.are_we_master and mc.sc.master_url) @pytest.mark.timeout(60) # Github workflows need a bit of extra time @@ -91,6 +91,31 @@ def test_master_selection(kafka_servers: KafkaServers, strategy: str) -> None: assert slave.sc.master_url == master_url +def test_no_eligible_master(kafka_servers: KafkaServers) -> None: + client_id = new_random_name("master_selection_") + group_id = new_random_name("group_id") + + config_aa = set_config_defaults({ + "advertised_hostname": "127.0.0.1", + "bootstrap_uri": kafka_servers.bootstrap_servers, + "client_id": client_id, + "group_id": group_id, + "port": get_random_port(port_range=TESTS_PORT_RANGE, blacklist=[]), + "master_eligibility": False, + }) + + with closing(init_admin(config_aa)) as mc: + # Wait for the election to happen, ie. flag is not None + while not mc.sc or mc.sc.are_we_master is None: + time.sleep(0.3) + + # Make sure the end configuration is as expected + master_url = f'http://{mc.config["host"]}:{mc.config["port"]}' + assert mc.sc.master_url == master_url + assert mc.sc.are_we_master is True + assert mc.sc.is_master_eligible is False + + async def test_schema_request_forwarding(registry_async_pair): master_url, slave_url = registry_async_pair max_tries, counter = 5, 0 From acf76107d329392a832090f4659e232d7bea9b8f Mon Sep 17 00:00:00 2001 From: Tommi Vainikainen Date: Wed, 26 May 2021 09:22:39 +0300 Subject: [PATCH 2/3] Add some typing, rename eligible master flag for clarification --- karapace/master_coordinator.py | 15 +++++---- karapace/schema_reader.py | 4 +-- karapace/schema_registry_apis.py | 34 ++++++++++---------- tests/integration/test_master_coordinator.py | 4 +-- 4 files changed, 29 insertions(+), 28 deletions(-) diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index 01afb5311..35029b131 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -11,6 +11,7 @@ from karapace import constants from karapace.utils import KarapaceKafkaClient from threading import Lock, Thread +from typing import Tuple import json import logging @@ -31,7 +32,7 @@ class SchemaCoordinator(BaseCoordinator): port = None scheme = None are_we_master = None - is_master_eligible = None + has_eligible_master = None master_url = None master_eligibility = True log = logging.getLogger("SchemaCoordinator") @@ -100,14 +101,14 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b host=member_identity["host"], port=member_identity["port"], ) - if member_assignment["master"] == member_id: + # On Kafka protocol we can be assigned to be master, but if not master eligible, then we're not master for real + if member_assignment["master"] == member_id and member_identity["master_eligibility"]: self.master_url = master_url self.are_we_master = True - self.is_master_eligible = member_identity["master_eligibility"] else: self.master_url = master_url self.are_we_master = False - self.is_master_eligible = member_identity["master_eligibility"] + self.has_eligible_master = member_identity["master_eligibility"] return super(SchemaCoordinator, self)._on_join_complete(generation, member_id, protocol, member_assignment_bytes) def _on_join_follower(self): @@ -169,10 +170,10 @@ def init_schema_coordinator(self): self.sc.master_eligibility = self.config["master_eligibility"] self.lock.release() # self.sc now exists, we get to release the lock - def get_master_info(self): + def get_master_info(self) -> Tuple[bool, bool, str]: """Return whether we're the master, and the actual master url that can be used if we're not""" with self.lock: - return self.sc.are_we_master, self.sc.is_master_eligible, self.sc.master_url + return self.sc.are_we_master, self.sc.has_eligible_master, self.sc.master_url def close(self): self.log.info("Closing master_coordinator") @@ -193,7 +194,7 @@ def run(self): self.sc.poll_heartbeat() self.log.debug( "We're master: %r (eligible master: %r): master_uri: %r", self.sc.are_we_master, - self.sc.is_master_eligible, self.sc.master_url + self.sc.has_eligible_master, self.sc.master_url ) time.sleep(min(_hb_interval, self.sc.time_to_next_heartbeat())) except: # pylint: disable=bare-except diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 8c36f01ed..0be270460 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -239,12 +239,12 @@ def handle_messages(self): self.ready = True add_offsets = False if self.master_coordinator is not None: - are_we_master, is_master_eligible, _ = self.master_coordinator.get_master_info() + are_we_master, has_eligible_master, _ = self.master_coordinator.get_master_info() # keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct # writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any # messages off the topic are writes performed by another node # Also if master_elibility is disabled by configuration, disable writes too - if are_we_master is True and is_master_eligible is True: + if are_we_master is True and has_eligible_master is True: add_offsets = True for _, msgs in raw_msgs.items(): diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index e22295a55..76c521608 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -397,10 +397,10 @@ async def config_set(self, content_type, *, request): status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - are_we_master, is_master_eligible, master_url = await self.get_master() - if are_we_master and is_master_eligible: + are_we_master, has_eligible_master, master_url = await self.get_master() + if are_we_master and has_eligible_master: self.send_config_message(compatibility_level=compatibility_level, subject=None) - elif not is_master_eligible: + elif not has_eligible_master: self.no_master_error(content_type) else: url = f"{master_url}/config" @@ -448,10 +448,10 @@ async def config_subject_set(self, content_type, *, request, subject): status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - are_we_master, is_master_eligible, master_url = await self.get_master() - if are_we_master and is_master_eligible: + are_we_master, has_eligible_master, master_url = await self.get_master() + if are_we_master and has_eligible_master: self.send_config_message(compatibility_level=compatibility_level, subject=subject) - elif not is_master_eligible: + elif not has_eligible_master: self.no_master_error(content_type) else: url = f"{master_url}/config/{subject}" @@ -494,11 +494,11 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent async def subject_delete(self, content_type, *, subject, request: HTTPRequest): permanent = request.query.get("permanent", "false").lower() == "true" - are_we_master, is_master_eligible, master_url = await self.get_master() - if are_we_master and is_master_eligible: + are_we_master, has_eligible_master, master_url = await self.get_master() + if are_we_master and has_eligible_master: async with self.schema_lock: await self._subject_delete_local(content_type, subject, permanent) - elif not is_master_eligible: + elif not has_eligible_master: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}?permanent={permanent}" @@ -598,11 +598,11 @@ async def subject_version_delete(self, content_type, *, subject, version, reques version = int(version) permanent = request.query.get("permanent", "false").lower() == "true" - are_we_master, is_master_eligible, master_url = await self.get_master() - if are_we_master and is_master_eligible: + are_we_master, has_eligible_master, master_url = await self.get_master() + if are_we_master and has_eligible_master: async with self.schema_lock: await self._subject_version_delete_local(content_type, subject, version, permanent) - elif not is_master_eligible: + elif not has_eligible_master: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}" @@ -635,13 +635,13 @@ async def subject_versions_list(self, content_type, *, subject): async def get_master(self): async with self.master_lock: while True: - are_we_master, is_master_eligible, master_url = self.mc.get_master_info() + are_we_master, has_eligible_master, master_url = self.mc.get_master_info() if are_we_master is None: self.log.info("No master set: %r, url: %r", are_we_master, master_url) elif self.ksr.ready is False: self.log.info("Schema reader isn't ready yet: %r", self.ksr.ready) else: - return are_we_master, is_master_eligible, master_url + return are_we_master, has_eligible_master, master_url await asyncio.sleep(1.0) def _validate_schema_request_body(self, content_type, body) -> None: @@ -745,11 +745,11 @@ async def subject_post(self, content_type, *, subject, request): self._validate_schema_request_body(content_type, body) self._validate_schema_type(content_type, body) self._validate_schema_key(content_type, body) - are_we_master, is_master_eligible, master_url = await self.get_master() - if are_we_master and is_master_eligible: + are_we_master, has_eligible_master, master_url = await self.get_master() + if are_we_master and has_eligible_master: async with self.schema_lock: await self.write_new_schema_local(subject, body, content_type) - elif not is_master_eligible: + elif not has_eligible_master: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions" diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index dd11557a8..56b0f2981 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -112,8 +112,8 @@ def test_no_eligible_master(kafka_servers: KafkaServers) -> None: # Make sure the end configuration is as expected master_url = f'http://{mc.config["host"]}:{mc.config["port"]}' assert mc.sc.master_url == master_url - assert mc.sc.are_we_master is True - assert mc.sc.is_master_eligible is False + assert mc.sc.are_we_master is False + assert mc.sc.has_eligible_master is False async def test_schema_request_forwarding(registry_async_pair): From 31a3e93cb9c66a1f47b3517a2c8f9f7c6f6ef311 Mon Sep 17 00:00:00 2001 From: Tommi Vainikainen Date: Wed, 26 May 2021 12:42:47 +0300 Subject: [PATCH 3/3] Remove explicit master eligibility flag and utilize optional master_url --- karapace/master_coordinator.py | 26 +++++++------- karapace/schema_reader.py | 4 +-- karapace/schema_registry_apis.py | 38 ++++++++++---------- tests/integration/test_master_coordinator.py | 4 +-- 4 files changed, 34 insertions(+), 38 deletions(-) diff --git a/karapace/master_coordinator.py b/karapace/master_coordinator.py index 35029b131..8d1b8545d 100644 --- a/karapace/master_coordinator.py +++ b/karapace/master_coordinator.py @@ -11,7 +11,7 @@ from karapace import constants from karapace.utils import KarapaceKafkaClient from threading import Lock, Thread -from typing import Tuple +from typing import Optional, Tuple import json import logging @@ -32,7 +32,6 @@ class SchemaCoordinator(BaseCoordinator): port = None scheme = None are_we_master = None - has_eligible_master = None master_url = None master_eligibility = True log = logging.getLogger("SchemaCoordinator") @@ -64,12 +63,12 @@ def _perform_assignment(self, leader_id, protocol, members): fallback_urls[get_identity_url(member_identity["scheme"], member_identity["host"], member_identity["port"])] = (member_id, member_data) if len(urls) > 0: - self.master_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] - schema_master_id, member_data = urls[self.master_url] + chosen_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0] + schema_master_id, member_data = urls[chosen_url] else: # Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be - self.master_url = sorted(fallback_urls, reverse=self.election_strategy.lower() == "highest")[0] - schema_master_id, member_data = fallback_urls[self.master_url] + chosen_url = sorted(fallback_urls, reverse=self.election_strategy.lower() == "highest")[0] + schema_master_id, member_data = fallback_urls[chosen_url] member_identity = json.loads(member_data.decode("utf8")) identity = self.get_identity( host=member_identity["host"], @@ -77,7 +76,7 @@ def _perform_assignment(self, leader_id, protocol, members): scheme=member_identity["scheme"], json_encode=False, ) - self.log.info("Chose: %r with url: %r as the master", schema_master_id, self.master_url) + self.log.info("Chose: %r with url: %r as the master", schema_master_id, chosen_url) assignments = {} for member_id, member_data in members: @@ -105,10 +104,12 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b if member_assignment["master"] == member_id and member_identity["master_eligibility"]: self.master_url = master_url self.are_we_master = True + elif not member_identity["master_eligibility"]: + self.master_url = None + self.are_we_master = False else: self.master_url = master_url self.are_we_master = False - self.has_eligible_master = member_identity["master_eligibility"] return super(SchemaCoordinator, self)._on_join_complete(generation, member_id, protocol, member_assignment_bytes) def _on_join_follower(self): @@ -170,10 +171,10 @@ def init_schema_coordinator(self): self.sc.master_eligibility = self.config["master_eligibility"] self.lock.release() # self.sc now exists, we get to release the lock - def get_master_info(self) -> Tuple[bool, bool, str]: + def get_master_info(self) -> Tuple[bool, Optional[str]]: """Return whether we're the master, and the actual master url that can be used if we're not""" with self.lock: - return self.sc.are_we_master, self.sc.has_eligible_master, self.sc.master_url + return self.sc.are_we_master, self.sc.master_url def close(self): self.log.info("Closing master_coordinator") @@ -192,10 +193,7 @@ def run(self): self.sc.ensure_active_group() self.sc.poll_heartbeat() - self.log.debug( - "We're master: %r (eligible master: %r): master_uri: %r", self.sc.are_we_master, - self.sc.has_eligible_master, self.sc.master_url - ) + self.log.debug("We're master: %r: master_uri: %r", self.sc.are_we_master, self.sc.master_url) time.sleep(min(_hb_interval, self.sc.time_to_next_heartbeat())) except: # pylint: disable=bare-except self.log.exception("Exception in master_coordinator") diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 0be270460..676395461 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -239,12 +239,12 @@ def handle_messages(self): self.ready = True add_offsets = False if self.master_coordinator is not None: - are_we_master, has_eligible_master, _ = self.master_coordinator.get_master_info() + are_we_master, _ = self.master_coordinator.get_master_info() # keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct # writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any # messages off the topic are writes performed by another node # Also if master_elibility is disabled by configuration, disable writes too - if are_we_master is True and has_eligible_master is True: + if are_we_master is True: add_offsets = True for _, msgs in raw_msgs.items(): diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 76c521608..fc6b1eb37 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -10,7 +10,7 @@ from karapace.rapu import HTTPRequest from karapace.schema_reader import InvalidSchema, KafkaSchemaReader, SchemaType, TypedSchema from karapace.utils import json_encode -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Tuple import argparse import asyncio @@ -397,10 +397,10 @@ async def config_set(self, content_type, *, request): status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - are_we_master, has_eligible_master, master_url = await self.get_master() - if are_we_master and has_eligible_master: + are_we_master, master_url = await self.get_master() + if are_we_master: self.send_config_message(compatibility_level=compatibility_level, subject=None) - elif not has_eligible_master: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/config" @@ -448,10 +448,10 @@ async def config_subject_set(self, content_type, *, request, subject): status=HTTPStatus.UNPROCESSABLE_ENTITY, ) - are_we_master, has_eligible_master, master_url = await self.get_master() - if are_we_master and has_eligible_master: + are_we_master, master_url = await self.get_master() + if are_we_master: self.send_config_message(compatibility_level=compatibility_level, subject=subject) - elif not has_eligible_master: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/config/{subject}" @@ -494,11 +494,11 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent async def subject_delete(self, content_type, *, subject, request: HTTPRequest): permanent = request.query.get("permanent", "false").lower() == "true" - are_we_master, has_eligible_master, master_url = await self.get_master() - if are_we_master and has_eligible_master: + are_we_master, master_url = await self.get_master() + if are_we_master: async with self.schema_lock: await self._subject_delete_local(content_type, subject, permanent) - elif not has_eligible_master: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}?permanent={permanent}" @@ -598,11 +598,11 @@ async def subject_version_delete(self, content_type, *, subject, version, reques version = int(version) permanent = request.query.get("permanent", "false").lower() == "true" - are_we_master, has_eligible_master, master_url = await self.get_master() - if are_we_master and has_eligible_master: + are_we_master, master_url = await self.get_master() + if are_we_master: async with self.schema_lock: await self._subject_version_delete_local(content_type, subject, version, permanent) - elif not has_eligible_master: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}" @@ -632,16 +632,16 @@ async def subject_versions_list(self, content_type, *, subject): subject_data = self._subject_get(subject, content_type) self.r(list(subject_data["schemas"]), content_type, status=HTTPStatus.OK) - async def get_master(self): + async def get_master(self) -> Tuple[bool, Optional[str]]: async with self.master_lock: while True: - are_we_master, has_eligible_master, master_url = self.mc.get_master_info() + are_we_master, master_url = self.mc.get_master_info() if are_we_master is None: self.log.info("No master set: %r, url: %r", are_we_master, master_url) elif self.ksr.ready is False: self.log.info("Schema reader isn't ready yet: %r", self.ksr.ready) else: - return are_we_master, has_eligible_master, master_url + return are_we_master, master_url await asyncio.sleep(1.0) def _validate_schema_request_body(self, content_type, body) -> None: @@ -745,11 +745,11 @@ async def subject_post(self, content_type, *, subject, request): self._validate_schema_request_body(content_type, body) self._validate_schema_type(content_type, body) self._validate_schema_key(content_type, body) - are_we_master, has_eligible_master, master_url = await self.get_master() - if are_we_master and has_eligible_master: + are_we_master, master_url = await self.get_master() + if are_we_master: async with self.schema_lock: await self.write_new_schema_local(subject, body, content_type) - elif not has_eligible_master: + elif not master_url: self.no_master_error(content_type) else: url = f"{master_url}/subjects/{subject}/versions" diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index 56b0f2981..6d95a972d 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -110,10 +110,8 @@ def test_no_eligible_master(kafka_servers: KafkaServers) -> None: time.sleep(0.3) # Make sure the end configuration is as expected - master_url = f'http://{mc.config["host"]}:{mc.config["port"]}' - assert mc.sc.master_url == master_url assert mc.sc.are_we_master is False - assert mc.sc.has_eligible_master is False + assert mc.sc.master_url is None async def test_schema_request_forwarding(registry_async_pair):