Skip to content

Commit

Permalink
Merge pull request #226 from aiven/no-eligible-master
Browse files Browse the repository at this point in the history
Handle gracefully if no node is master eligible

#226
  • Loading branch information
Augusto Hack authored May 27, 2021
2 parents 092703d + 31a3e93 commit 6633aca
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 28 deletions.
36 changes: 25 additions & 11 deletions karapace/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from karapace import constants
from karapace.utils import KarapaceKafkaClient
from threading import Lock, Thread
from typing import Optional, Tuple

import json
import logging
Expand All @@ -30,7 +31,7 @@ class SchemaCoordinator(BaseCoordinator):
hostname = None
port = None
scheme = None
master = None
are_we_master = None
master_url = None
master_eligibility = True
log = logging.getLogger("SchemaCoordinator")
Expand All @@ -49,24 +50,33 @@ def group_protocols(self):

def _perform_assignment(self, leader_id, protocol, members):
self.log.info("Creating assignment: %r, protocol: %r, members: %r", leader_id, protocol, members)
self.master = None
self.are_we_master = None
error = NO_ERROR
urls = {}
fallback_urls = {}
for member_id, member_data in members:
member_identity = json.loads(member_data.decode("utf8"))
if member_identity["master_eligibility"] is True:
urls[get_identity_url(member_identity["scheme"], member_identity["host"],
member_identity["port"])] = (member_id, member_data)
self.master_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0]
schema_master_id, member_data = urls[self.master_url]
else:
fallback_urls[get_identity_url(member_identity["scheme"], member_identity["host"],
member_identity["port"])] = (member_id, member_data)
if len(urls) > 0:
chosen_url = sorted(urls, reverse=self.election_strategy.lower() == "highest")[0]
schema_master_id, member_data = urls[chosen_url]
else:
# Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be
chosen_url = sorted(fallback_urls, reverse=self.election_strategy.lower() == "highest")[0]
schema_master_id, member_data = fallback_urls[chosen_url]
member_identity = json.loads(member_data.decode("utf8"))
identity = self.get_identity(
host=member_identity["host"],
port=member_identity["port"],
scheme=member_identity["scheme"],
json_encode=False,
)
self.log.info("Chose: %r with url: %r as the master", schema_master_id, self.master_url)
self.log.info("Chose: %r with url: %r as the master", schema_master_id, chosen_url)

assignments = {}
for member_id, member_data in members:
Expand All @@ -90,12 +100,16 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b
host=member_identity["host"],
port=member_identity["port"],
)
if member_assignment["master"] == member_id:
# On Kafka protocol we can be assigned to be master, but if not master eligible, then we're not master for real
if member_assignment["master"] == member_id and member_identity["master_eligibility"]:
self.master_url = master_url
self.master = True
self.are_we_master = True
elif not member_identity["master_eligibility"]:
self.master_url = None
self.are_we_master = False
else:
self.master_url = master_url
self.master = False
self.are_we_master = False
return super(SchemaCoordinator, self)._on_join_complete(generation, member_id, protocol, member_assignment_bytes)

def _on_join_follower(self):
Expand Down Expand Up @@ -157,10 +171,10 @@ def init_schema_coordinator(self):
self.sc.master_eligibility = self.config["master_eligibility"]
self.lock.release() # self.sc now exists, we get to release the lock

def get_master_info(self):
def get_master_info(self) -> Tuple[bool, Optional[str]]:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
with self.lock:
return self.sc.master, self.sc.master_url
return self.sc.are_we_master, self.sc.master_url

def close(self):
self.log.info("Closing master_coordinator")
Expand All @@ -179,7 +193,7 @@ def run(self):

self.sc.ensure_active_group()
self.sc.poll_heartbeat()
self.log.debug("We're master: %r: master_uri: %r", self.sc.master, self.sc.master_url)
self.log.debug("We're master: %r: master_uri: %r", self.sc.are_we_master, self.sc.master_url)
time.sleep(min(_hb_interval, self.sc.time_to_next_heartbeat()))
except: # pylint: disable=bare-except
self.log.exception("Exception in master_coordinator")
Expand Down
9 changes: 5 additions & 4 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,12 @@ def handle_messages(self):
self.ready = True
add_offsets = False
if self.master_coordinator is not None:
master, _ = self.master_coordinator.get_master_info()
# keep old behavior for True. When master is False, then we are a follower, so we should not accept direct
# writes anyway. When master is None, then this particular node is waiting for a stable value, so any
are_we_master, _ = self.master_coordinator.get_master_info()
# keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct
# writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any
# messages off the topic are writes performed by another node
if master is True:
# Also if master_elibility is disabled by configuration, disable writes too
if are_we_master is True:
add_offsets = True

for _, msgs in raw_msgs.items():
Expand Down
22 changes: 11 additions & 11 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from karapace.rapu import HTTPRequest
from karapace.schema_reader import InvalidSchema, KafkaSchemaReader, SchemaType, TypedSchema
from karapace.utils import json_encode
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Tuple

import argparse
import asyncio
Expand Down Expand Up @@ -390,7 +390,7 @@ async def config_set(self, content_type, *, request):
are_we_master, master_url = await self.get_master()
if are_we_master:
self.send_config_message(compatibility_level=compatibility_level, subject=None)
elif are_we_master is None:
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config"
Expand Down Expand Up @@ -441,7 +441,7 @@ async def config_subject_set(self, content_type, *, request, subject):
are_we_master, master_url = await self.get_master()
if are_we_master:
self.send_config_message(compatibility_level=compatibility_level, subject=subject)
elif are_we_master is None:
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config/{subject}"
Expand Down Expand Up @@ -488,7 +488,7 @@ async def subject_delete(self, content_type, *, subject, request: HTTPRequest):
if are_we_master:
async with self.schema_lock:
await self._subject_delete_local(content_type, subject, permanent)
elif are_we_master is None:
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}?permanent={permanent}"
Expand Down Expand Up @@ -592,7 +592,7 @@ async def subject_version_delete(self, content_type, *, subject, version, reques
if are_we_master:
async with self.schema_lock:
await self._subject_version_delete_local(content_type, subject, version, permanent)
elif are_we_master is None:
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}"
Expand Down Expand Up @@ -622,16 +622,16 @@ async def subject_versions_list(self, content_type, *, subject):
subject_data = self._subject_get(subject, content_type)
self.r(list(subject_data["schemas"]), content_type, status=HTTPStatus.OK)

async def get_master(self):
async def get_master(self) -> Tuple[bool, Optional[str]]:
async with self.master_lock:
while True:
master, master_url = self.mc.get_master_info()
if master is None:
self.log.info("No master set: %r, url: %r", master, master_url)
are_we_master, master_url = self.mc.get_master_info()
if are_we_master is None:
self.log.info("No master set: %r, url: %r", are_we_master, master_url)
elif self.ksr.ready is False:
self.log.info("Schema reader isn't ready yet: %r", self.ksr.ready)
else:
return master, master_url
return are_we_master, master_url
await asyncio.sleep(1.0)

def _validate_schema_request_body(self, content_type, body) -> None:
Expand Down Expand Up @@ -739,7 +739,7 @@ async def subject_post(self, content_type, *, subject, request):
if are_we_master:
async with self.schema_lock:
await self.write_new_schema_local(subject, body, content_type)
elif are_we_master is None:
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions"
Expand Down
27 changes: 25 additions & 2 deletions tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ def is_master(mc: MasterCoordinator) -> bool:
This takes care of a race condition were the flag `master` is set but
`master_url` is not yet set.
"""
return bool(mc.sc and mc.sc.master and mc.sc.master_url)
return bool(mc.sc and mc.sc.are_we_master and mc.sc.master_url)


def has_master(mc: MasterCoordinator) -> bool:
""" True if `mc` has a master. """
return bool(mc.sc and not mc.sc.master and mc.sc.master_url)
return bool(mc.sc and not mc.sc.are_we_master and mc.sc.master_url)


@pytest.mark.timeout(60) # Github workflows need a bit of extra time
Expand Down Expand Up @@ -91,6 +91,29 @@ def test_master_selection(kafka_servers: KafkaServers, strategy: str) -> None:
assert slave.sc.master_url == master_url


def test_no_eligible_master(kafka_servers: KafkaServers) -> None:
client_id = new_random_name("master_selection_")
group_id = new_random_name("group_id")

config_aa = set_config_defaults({
"advertised_hostname": "127.0.0.1",
"bootstrap_uri": kafka_servers.bootstrap_servers,
"client_id": client_id,
"group_id": group_id,
"port": get_random_port(port_range=TESTS_PORT_RANGE, blacklist=[]),
"master_eligibility": False,
})

with closing(init_admin(config_aa)) as mc:
# Wait for the election to happen, ie. flag is not None
while not mc.sc or mc.sc.are_we_master is None:
time.sleep(0.3)

# Make sure the end configuration is as expected
assert mc.sc.are_we_master is False
assert mc.sc.master_url is None


async def test_schema_request_forwarding(registry_async_pair):
master_url, slave_url = registry_async_pair
max_tries, counter = 5, 0
Expand Down

0 comments on commit 6633aca

Please sign in to comment.