Skip to content

Commit

Permalink
Remove explicit master eligibility flag and utilize optional master_url
Browse files Browse the repository at this point in the history
  • Loading branch information
tvainika committed May 26, 2021
1 parent acf7610 commit fd546f4
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 29 deletions.
11 changes: 6 additions & 5 deletions karapace/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from karapace import constants
from karapace.utils import KarapaceKafkaClient
from threading import Lock, Thread
from typing import Tuple
from typing import Optional, Tuple

import json
import logging
Expand All @@ -32,7 +32,6 @@ class SchemaCoordinator(BaseCoordinator):
port = None
scheme = None
are_we_master = None
has_eligible_master = None
master_url = None
master_eligibility = True
log = logging.getLogger("SchemaCoordinator")
Expand Down Expand Up @@ -105,10 +104,12 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b
if member_assignment["master"] == member_id and member_identity["master_eligibility"]:
self.master_url = master_url
self.are_we_master = True
elif not member_identity["master_eligibility"]:
self.master_url = None
self.are_we_master = False
else:
self.master_url = master_url
self.are_we_master = False
self.has_eligible_master = member_identity["master_eligibility"]
return super(SchemaCoordinator, self)._on_join_complete(generation, member_id, protocol, member_assignment_bytes)

def _on_join_follower(self):
Expand Down Expand Up @@ -170,10 +171,10 @@ def init_schema_coordinator(self):
self.sc.master_eligibility = self.config["master_eligibility"]
self.lock.release() # self.sc now exists, we get to release the lock

def get_master_info(self) -> Tuple[bool, bool, str]:
def get_master_info(self) -> Tuple[bool, Optional[str]]:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
with self.lock:
return self.sc.are_we_master, self.sc.has_eligible_master, self.sc.master_url
return self.sc.are_we_master, self.sc.master_url

def close(self):
self.log.info("Closing master_coordinator")
Expand Down
4 changes: 2 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ def handle_messages(self):
self.ready = True
add_offsets = False
if self.master_coordinator is not None:
are_we_master, has_eligible_master, _ = self.master_coordinator.get_master_info()
are_we_master, _ = self.master_coordinator.get_master_info()
# keep old behavior for True. When are_we_master is False, then we are a follower, so we should not accept direct
# writes anyway. When are_we_master is None, then this particular node is waiting for a stable value, so any
# messages off the topic are writes performed by another node
# Also if master_elibility is disabled by configuration, disable writes too
if are_we_master is True and has_eligible_master is True:
if are_we_master is True:
add_offsets = True

for _, msgs in raw_msgs.items():
Expand Down
38 changes: 19 additions & 19 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from karapace.rapu import HTTPRequest
from karapace.schema_reader import InvalidSchema, KafkaSchemaReader, SchemaType, TypedSchema
from karapace.utils import json_encode
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Tuple

import argparse
import asyncio
Expand Down Expand Up @@ -397,10 +397,10 @@ async def config_set(self, content_type, *, request):
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)

are_we_master, has_eligible_master, master_url = await self.get_master()
if are_we_master and has_eligible_master:
are_we_master, master_url = await self.get_master()
if are_we_master:
self.send_config_message(compatibility_level=compatibility_level, subject=None)
elif not has_eligible_master:
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config"
Expand Down Expand Up @@ -448,10 +448,10 @@ async def config_subject_set(self, content_type, *, request, subject):
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)

are_we_master, has_eligible_master, master_url = await self.get_master()
if are_we_master and has_eligible_master:
are_we_master, master_url = await self.get_master()
if are_we_master:
self.send_config_message(compatibility_level=compatibility_level, subject=subject)
elif not has_eligible_master:
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/config/{subject}"
Expand Down Expand Up @@ -494,11 +494,11 @@ async def _subject_delete_local(self, content_type: str, subject: str, permanent
async def subject_delete(self, content_type, *, subject, request: HTTPRequest):
permanent = request.query.get("permanent", "false").lower() == "true"

are_we_master, has_eligible_master, master_url = await self.get_master()
if are_we_master and has_eligible_master:
are_we_master, master_url = await self.get_master()
if are_we_master:
async with self.schema_lock:
await self._subject_delete_local(content_type, subject, permanent)
elif not has_eligible_master:
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}?permanent={permanent}"
Expand Down Expand Up @@ -598,11 +598,11 @@ async def subject_version_delete(self, content_type, *, subject, version, reques
version = int(version)
permanent = request.query.get("permanent", "false").lower() == "true"

are_we_master, has_eligible_master, master_url = await self.get_master()
if are_we_master and has_eligible_master:
are_we_master, master_url = await self.get_master()
if are_we_master:
async with self.schema_lock:
await self._subject_version_delete_local(content_type, subject, version, permanent)
elif not has_eligible_master:
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions/{version}?permanent={permanent}"
Expand Down Expand Up @@ -632,16 +632,16 @@ async def subject_versions_list(self, content_type, *, subject):
subject_data = self._subject_get(subject, content_type)
self.r(list(subject_data["schemas"]), content_type, status=HTTPStatus.OK)

async def get_master(self):
async def get_master(self) -> Tuple[bool, Optional[str]]:
async with self.master_lock:
while True:
are_we_master, has_eligible_master, master_url = self.mc.get_master_info()
are_we_master, master_url = self.mc.get_master_info()
if are_we_master is None:
self.log.info("No master set: %r, url: %r", are_we_master, master_url)
elif self.ksr.ready is False:
self.log.info("Schema reader isn't ready yet: %r", self.ksr.ready)
else:
return are_we_master, has_eligible_master, master_url
return are_we_master, master_url
await asyncio.sleep(1.0)

def _validate_schema_request_body(self, content_type, body) -> None:
Expand Down Expand Up @@ -745,11 +745,11 @@ async def subject_post(self, content_type, *, subject, request):
self._validate_schema_request_body(content_type, body)
self._validate_schema_type(content_type, body)
self._validate_schema_key(content_type, body)
are_we_master, has_eligible_master, master_url = await self.get_master()
if are_we_master and has_eligible_master:
are_we_master, master_url = await self.get_master()
if are_we_master:
async with self.schema_lock:
await self.write_new_schema_local(subject, body, content_type)
elif not has_eligible_master:
elif not master_url:
self.no_master_error(content_type)
else:
url = f"{master_url}/subjects/{subject}/versions"
Expand Down
4 changes: 1 addition & 3 deletions tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,8 @@ def test_no_eligible_master(kafka_servers: KafkaServers) -> None:
time.sleep(0.3)

# Make sure the end configuration is as expected
master_url = f'http://{mc.config["host"]}:{mc.config["port"]}'
assert mc.sc.master_url == master_url
assert mc.sc.are_we_master is False
assert mc.sc.has_eligible_master is False
assert mc.sc.master_url is None


async def test_schema_request_forwarding(registry_async_pair):
Expand Down

0 comments on commit fd546f4

Please sign in to comment.