Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle gracefully if no node is master eligible #226

Merged
merged 3 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions karapace/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from karapace import constants
from karapace.utils import KarapaceKafkaClient
from threading import Lock, Thread
from typing import Tuple

import json
import logging
Expand All @@ -30,7 +31,8 @@ class SchemaCoordinator(BaseCoordinator):
hostname = None
port = None
scheme = None
master = None
are_we_master = None
has_eligible_master = None
master_url = None
master_eligibility = True
log = logging.getLogger("SchemaCoordinator")
Expand All @@ -49,16 +51,25 @@ def group_protocols(self):

def _perform_assignment(self, leader_id, protocol, members):
self.log.info("Creating assignment: %r, protocol: %r, members: %r", leader_id, protocol, members)
self.master = None
self.are_we_master = None
error = NO_ERROR
urls = {}
fallback_urls = {}
tvainika marked this conversation as resolved.
Show resolved Hide resolved
for member_id, member_data in members:
hackaugusto marked this conversation as resolved.
Show resolved Hide resolved
member_identity = json.loads(member_data.decode("utf8"))
if member_identity["master_eligibility"] is True:
urls[get_identity_url(member_identity["scheme"], member_identity["host"],
member_identity["port"])] = (member_id, member_data)
self.master_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0]
hackaugusto marked this conversation as resolved.
Show resolved Hide resolved
schema_master_id, member_data = urls[self.master_url]
else:
fallback_urls[get_identity_url(member_identity["scheme"], member_identity["host"],
member_identity["port"])] = (member_id, member_data)
if len(urls) > 0:
self.master_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0]
schema_master_id, member_data = urls[self.master_url]
else:
# Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be
self.master_url = sorted(fallback_urls, reverse=self.election_strategy.lower() == "highest")[0]
schema_master_id, member_data = fallback_urls[self.master_url]
member_identity = json.loads(member_data.decode("utf8"))
identity = self.get_identity(
host=member_identity["host"],
Expand Down Expand Up @@ -90,12 +101,14 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b
host=member_identity["host"],
port=member_identity["port"],
)
if member_assignment["master"] == member_id:
# On Kafka protocol we can be assigned to be master, but if not master eligible, then we're not master for real
if member_assignment["master"] == member_id and member_identity["master_eligibility"]:
self.master_url = master_url
self.master = True
self.are_we_master = True
else:
self.master_url = master_url
self.master = False
self.are_we_master = False
self.has_eligible_master = member_identity["master_eligibility"]
return super(SchemaCoordinator, self)._on_join_complete(generation, member_id, protocol, member_assignment_bytes)

def _on_join_follower(self):
Expand Down Expand Up @@ -157,10 +170,10 @@ def init_schema_coordinator(self):
self.sc.master_eligibility = self.config["master_eligibility"]
self.lock.release() # self.sc now exists, we get to release the lock

def get_master_info(self):
def get_master_info(self) -> Tuple[bool, bool, str]:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
with self.lock:
return self.sc.master, self.sc.master_url
return self.sc.are_we_master, self.sc.has_eligible_master, self.sc.master_url

def close(self):
self.log.info("Closing master_coordinator")
Expand All @@ -179,7 +192,10 @@ def run(self):

self.sc.ensure_active_group()
self.sc.poll_heartbeat()
self.log.debug("We're master: %r: master_uri: %r", self.sc.master, self.sc.master_url)
self.log.debug(
"We're master: %r (eligible master: %r): master_uri: %r", self.sc.are_we_master,
self.sc.has_eligible_master, self.sc.master_url
)
time.sleep(min(_hb_interval, self.sc.time_to_next_heartbeat()))
except: # pylint: disable=bare-except
self.log.exception("Exception in master_coordinator")
Expand Down
9 changes: 5 additions & 4 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,12 @@ def handle_messages(self):
self.ready = True
add_offsets = False
if self.master_coordinator is not None:
master, _ = self.master_coordinator.get_master_info()
# keep old behavior for True. When master is False, then we are a follower, so we should not accept direct
# writes anyway. When master is None, then this particular node is waiting for a stable value, so any
are_we_master, has_eligible_master, _ = self.master_coordinator.get_master_info()
# keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct
# writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any
# messages off the topic are writes performed by another node
if master is True:
# Also if master_elibility is disabled by configuration, disable writes too
if are_we_master is True and has_eligible_master is True:
add_offsets = True

for _, msgs in raw_msgs.items():
Expand Down
38 changes: 19 additions & 19 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,10 @@ async def config_set(self, content_type, *, request):
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)

are_we_master, master_url = await self.get_master()
if are_we_master:
are_we_master, has_eligible_master, master_url = await self.get_master()
if are_we_master and has_eligible_master:
self.send_config_message(compatibility_level=compatibility_level, subject=None)
elif are_we_master is None:
elif not has_eligible_master:
self.no_master_error(content_type)
else:
url = f"{master_url}/config"
Expand Down Expand Up @@ -448,10 +448,10 @@ async def config_subject_set(self, content_type, *, request, subject):
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)

are_we_master, master_url = await self.get_master()
if are_we_master:
are_we_master, has_eligible_master, master_url = await self.get_master()
if are_we_master and has_eligible_master:
self.send_config_message(compatibility_level=compatibility_level, subject=subject)
elif are_we_master is None:
elif not has_eligible_master:
self.no_master_error(content_type)
else:
url = f"{master_url}/config/{subject}"
Expand Down Expand Up @@ -494,11 +494,11 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent
async def subject_delete(self, content_type, *, subject, request: HTTPRequest):
permanent = request.query.get("permanent", "false").lower() == "true"

are_we_master, master_url = await self.get_master()
if are_we_master:
are_we_master, has_eligible_master, master_url = await self.get_master()
if are_we_master and has_eligible_master:
async with self.schema_lock:
await self._subject_delete_local(content_type, subject, permanent)
elif are_we_master is None:
elif not has_eligible_master:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}?permanent={permanent}"
Expand Down Expand Up @@ -598,11 +598,11 @@ async def subject_version_delete(self, content_type, *, subject, version, reques
version = int(version)
permanent = request.query.get("permanent", "false").lower() == "true"

are_we_master, master_url = await self.get_master()
if are_we_master:
are_we_master, has_eligible_master, master_url = await self.get_master()
if are_we_master and has_eligible_master:
async with self.schema_lock:
await self._subject_version_delete_local(content_type, subject, version, permanent)
elif are_we_master is None:
elif not has_eligible_master:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}"
Expand Down Expand Up @@ -635,13 +635,13 @@ async def subject_versions_list(self, content_type, *, subject):
async def get_master(self):
async with self.master_lock:
while True:
master, master_url = self.mc.get_master_info()
if master is None:
self.log.info("No master set: %r, url: %r", master, master_url)
are_we_master, has_eligible_master, master_url = self.mc.get_master_info()
if are_we_master is None:
self.log.info("No master set: %r, url: %r", are_we_master, master_url)
elif self.ksr.ready is False:
self.log.info("Schema reader isn't ready yet: %r", self.ksr.ready)
else:
return master, master_url
return are_we_master, has_eligible_master, master_url
await asyncio.sleep(1.0)

def _validate_schema_request_body(self, content_type, body) -> None:
Expand Down Expand Up @@ -745,11 +745,11 @@ async def subject_post(self, content_type, *, subject, request):
self._validate_schema_request_body(content_type, body)
self._validate_schema_type(content_type, body)
self._validate_schema_key(content_type, body)
are_we_master, master_url = await self.get_master()
if are_we_master:
are_we_master, has_eligible_master, master_url = await self.get_master()
if are_we_master and has_eligible_master:
async with self.schema_lock:
await self.write_new_schema_local(subject, body, content_type)
elif are_we_master is None:
elif not has_eligible_master:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions"
Expand Down
29 changes: 27 additions & 2 deletions tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ def is_master(mc: MasterCoordinator) -> bool:
This takes care of a race condition were the flag `master` is set but
`master_url` is not yet set.
"""
return bool(mc.sc and mc.sc.master and mc.sc.master_url)
return bool(mc.sc and mc.sc.are_we_master and mc.sc.master_url)


def has_master(mc: MasterCoordinator) -> bool:
""" True if `mc` has a master. """
return bool(mc.sc and not mc.sc.master and mc.sc.master_url)
return bool(mc.sc and not mc.sc.are_we_master and mc.sc.master_url)


@pytest.mark.timeout(60) # Github workflows need a bit of extra time
Expand Down Expand Up @@ -91,6 +91,31 @@ def test_master_selection(kafka_servers: KafkaServers, strategy: str) -> None:
assert slave.sc.master_url == master_url


def test_no_eligible_master(kafka_servers: KafkaServers) -> None:
client_id = new_random_name("master_selection_")
group_id = new_random_name("group_id")

config_aa = set_config_defaults({
"advertised_hostname": "127.0.0.1",
"bootstrap_uri": kafka_servers.bootstrap_servers,
"client_id": client_id,
"group_id": group_id,
"port": get_random_port(port_range=TESTS_PORT_RANGE, blacklist=[]),
"master_eligibility": False,
})

with closing(init_admin(config_aa)) as mc:
# Wait for the election to happen, ie. flag is not None
while not mc.sc or mc.sc.are_we_master is None:
time.sleep(0.3)

# Make sure the end configuration is as expected
master_url = f'http://{mc.config["host"]}:{mc.config["port"]}'
assert mc.sc.master_url == master_url
assert mc.sc.are_we_master is False
assert mc.sc.has_eligible_master is False


async def test_schema_request_forwarding(registry_async_pair):
master_url, slave_url = registry_async_pair
max_tries, counter = 5, 0
Expand Down