Skip to content

Commit

Permalink
WIP: test if this fixes all
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Nov 13, 2024
1 parent 4f4edd7 commit 9f3d8bc
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def schema_coordinator(self) -> SchemaCoordinator | None:
def config(self) -> Config:
return self._config

async def start(self) -> None:
def start(self) -> None:
self._thread.start()

def _start_loop(self) -> None:
Expand Down
28 changes: 27 additions & 1 deletion src/karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,27 @@ def are_we_master(self) -> bool | None:
The condition to resume being the master its:
no new messages are still to be processed + at least 5 seconds have passed since we were elected master
"""

cur_time = time.monotonic()

LOG.warning(
"""
state of the variables:
self._ready: %s
self._schema_reader_stopper.ready(): %s
self._are_we_master: %s
self._initial_election_sec %s
self._waiting_time_before_acting_as_master_ms %s
time.monotonic() %s
""",
self._ready,
self._schema_reader_stopper.ready(),
self._are_we_master,
self._initial_election_sec,
self._waiting_time_before_acting_as_master_ms,
cur_time,
)

if self._are_we_master is None:
# `self._are_we_master` is `None` only during the perform of the assignment
# where we don't know if we are master yet (probably we should return false due to the
Expand Down Expand Up @@ -529,7 +550,12 @@ def reset_generation(self) -> None:
"""Coordinator did not recognize either generation or member_id. Will
need to re-join the group.
"""
self._are_we_master = False
LOG.info("Resetting generation status")
# this is called immediately after the election, we shouldn't reset this
# until a new node its elected aka the other path where a new node its elected
# otherwise this its called at each round and we keep not counting the 5 seconds
# required before the election.
# self._are_we_master = False
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
self.request_rejoin()
Expand Down
9 changes: 8 additions & 1 deletion src/karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def get_schemas(self, subject: Subject, *, include_deleted: bool = False) -> lis
return list(schema_versions.values())

async def start(self) -> None:
await self.mc.start()
self.mc.start()
self.schema_reader.start()
self.producer.initialize_karapace_producer()

Expand All @@ -98,9 +98,16 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str |
follower/primary state and primary url.
:return (bool, Optional[str]): returns the primary/follower state and primary url
"""
LOG.warning("checking stuff")
async with self._master_lock:
LOG.warning("aquired log")
while True:
LOG.warning("calling master info")
are_we_master, master_url = self.mc.get_master_info()
LOG.warning("done")
LOG.warning(
"Master %s is %s. Schema reader is ready %s", master_url, are_we_master, self.schema_reader.ready()
)
if are_we_master is None:
LOG.info("No master set: %r, url: %r", are_we_master, master_url)
elif not ignore_readiness and self.schema_reader.ready() is False:
Expand Down
7 changes: 6 additions & 1 deletion src/karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,17 +686,22 @@ async def config_subject_delete(

async def master_available(self, content_type: str, *, request: HTTPRequest) -> None:
are_we_master, master_url = await self.schema_registry.get_master()
self.log.info("are master %s, master url %s", are_we_master, master_url)

if (
self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access
and self.schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself() # pylint: disable=protected-access
):
self.r({"master_available": are_we_master}, content_type)

if master_url is None:
if master_url is None or f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in master_url:
self.r({"master_available": False}, content_type)
else:
url = f"{master_url}/master_available"
self.log.info("forwarding to %s", url)
self.log.info("Self advertising hostname: %s", self.config["advertised_hostname"])
self.log.info("Self advertising port: %s", self.config["advertised_port"])
await self._forward_if_not_ready_to_serve
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="POST")

async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def set_not_ready(self) -> None:
async def init_admin(config):
mc = MasterCoordinator(config=config)
mc.set_stoppper(AlwaysAvailableSchemaReaderStoppper())
await mc.start()
mc.start()
return mc


Expand Down
2 changes: 0 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,7 @@ async def repeat_until_master_is_available(client: Client) -> None:
while True:
res = await client.post("/master_available", json={})
reply = res.json()
breakpoint()
if reply is not None and "master_available" in reply and reply["master_available"] is True:
breakpoint()
break
time.sleep(1)

Expand Down

0 comments on commit 9f3d8bc

Please sign in to comment.