Skip to content

Commit

Permalink
WIP: test if this fixes all
Browse files Browse the repository at this point in the history
  • Loading branch information
eliax1996 committed Nov 14, 2024
1 parent aa55eb3 commit d7cdd8a
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 17 deletions.
10 changes: 7 additions & 3 deletions src/karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def schema_coordinator(self) -> SchemaCoordinator | None:
def config(self) -> Config:
return self._config

async def start(self) -> None:
def start(self) -> None:
self._thread.start()

def _start_loop(self) -> None:
Expand Down Expand Up @@ -90,8 +90,12 @@ async def _async_loop(self) -> None:

self._sc = self.init_schema_coordinator()
while self._running:
if self._sc.ready():
break
# why do we need to close?
# we just need to keep running even when the schema registry its ready
# otherwise we cause a rebalance and a new election. This should run until
# karapace is restarted
# if self._sc.ready():
# break
await asyncio.sleep(0.5)
# todo: wait a condition variable or a lock.
LOG.info("Closing master_coordinator")
Expand Down
24 changes: 24 additions & 0 deletions src/karapace/coordinator/schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,29 @@ def are_we_master(self) -> bool | None:
The condition to resume being the master its:
no new messages are still to be processed + at least 5 seconds have passed since we were elected master
"""

cur_time = time.monotonic()

LOG.warning(
"""
state of the variables:
self.hostname %s
self._ready: %s
self._schema_reader_stopper.ready(): %s
self._are_we_master: %s
self._initial_election_sec %s
self._waiting_time_before_acting_as_master_ms %s
time.monotonic() %s
""",
self.hostname,
self._ready,
self._schema_reader_stopper.ready(),
self._are_we_master,
self._initial_election_sec,
self._waiting_time_before_acting_as_master_ms,
cur_time,
)

if self._are_we_master is None:
# `self._are_we_master` is `None` only during the perform of the assignment
# where we don't know if we are master yet (probably we should return false due to the
Expand Down Expand Up @@ -529,6 +552,7 @@ def reset_generation(self) -> None:
"""Coordinator did not recognize either generation or member_id. Will
need to re-join the group.
"""
LOG.info("Resetting generation status")
self._are_we_master = False
self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID
self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID
Expand Down
9 changes: 8 additions & 1 deletion src/karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def get_schemas(self, subject: Subject, *, include_deleted: bool = False) -> lis
return list(schema_versions.values())

async def start(self) -> None:
await self.mc.start()
self.mc.start()
self.schema_reader.start()
self.producer.initialize_karapace_producer()

Expand All @@ -98,9 +98,16 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str |
follower/primary state and primary url.
:return (bool, Optional[str]): returns the primary/follower state and primary url
"""
LOG.warning("checking stuff")
async with self._master_lock:
LOG.warning("aquired log")
while True:
LOG.warning("calling master info")
are_we_master, master_url = self.mc.get_master_info()
LOG.warning("done")
LOG.warning(
"Master %s is %s. Schema reader is ready %s", master_url, are_we_master, self.schema_reader.ready()
)
if are_we_master is None:
LOG.info("No master set: %r, url: %r", are_we_master, master_url)
elif not ignore_readiness and self.schema_reader.ready() is False:
Expand Down
9 changes: 7 additions & 2 deletions src/karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async def schema_registry_health(self) -> HealthCheck:
resp = {}
if self._auth is not None:
resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified
resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready
resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready()
if self.schema_registry.schema_reader.ready():
resp["schema_registry_startup_time_sec"] = (
self.schema_registry.schema_reader.last_check - self._process_start_time
Expand Down Expand Up @@ -686,17 +686,22 @@ async def config_subject_delete(

async def master_available(self, content_type: str, *, request: HTTPRequest) -> None:
are_we_master, master_url = await self.schema_registry.get_master()
self.log.info("are master %s, master url %s", are_we_master, master_url)

if (
self.schema_registry.schema_reader.master_coordinator._sc is not None # pylint: disable=protected-access
and self.schema_registry.schema_reader.master_coordinator._sc.is_master_assigned_to_myself() # pylint: disable=protected-access
):
self.r({"master_available": are_we_master}, content_type)

if master_url is None:
if master_url is None or f"{self.config['advertised_hostname']}:{self.config['advertised_port']}" in master_url:
self.r({"master_available": False}, content_type)
else:
url = f"{master_url}/master_available"
self.log.info("forwarding to %s", url)
self.log.info("Self advertising hostname: %s", self.config["advertised_hostname"])
self.log.info("Self advertising port: %s", self.config["advertised_port"])
await self._forward_if_not_ready_to_serve
await self._forward_request_remote(request=request, body={}, url=url, content_type=content_type, method="POST")

async def subjects_list(self, content_type: str, *, request: HTTPRequest, user: User | None = None) -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def set_not_ready(self) -> None:
async def init_admin(config):
mc = MasterCoordinator(config=config)
mc.set_stoppper(AlwaysAvailableSchemaReaderStoppper())
await mc.start()
mc.start()
return mc


Expand Down
17 changes: 12 additions & 5 deletions tests/integration/test_schema_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def test_coordinator_workflow(
# Check if the initial group join is performed correctly with minimal
# setup

waiting_time_before_acting_as_master_sec = 5
waiting_time_before_acting_as_master_sec = 6
coordinator = SchemaCoordinator(
client,
AlwaysAvailableSchemaReaderStoppper(),
Expand All @@ -128,7 +128,8 @@ async def test_coordinator_workflow(
session_timeout_ms=10000,
heartbeat_interval_ms=500,
retry_backoff_ms=100,
waiting_time_before_acting_as_master_ms=waiting_time_before_acting_as_master_sec * 1000,
# removing 1 second to consider the network latency in the rest of the test
waiting_time_before_acting_as_master_ms=(waiting_time_before_acting_as_master_sec - 1) * 1000,
)
coordinator.start()
assert coordinator.coordinator_id is None
Expand All @@ -137,8 +138,8 @@ async def test_coordinator_workflow(
assert coordinator.coordinator_id is not None

assert not coordinator.are_we_master()
# the waiting_time_before_acting_as_master_ms
await asyncio.sleep(10)
# waiting a little bit more since the cluster needs to setup.
await asyncio.sleep(2 * waiting_time_before_acting_as_master_sec)
assert not coordinator.are_we_master(), "last fetch before being available as master"
assert coordinator.are_we_master(), f"after {waiting_time_before_acting_as_master_sec} seconds we can act as a master"

Expand Down Expand Up @@ -170,16 +171,22 @@ async def test_coordinator_workflow(
secondary_client = client if primary_selection_strategy == "highest" else client2

if primary == coordinator2:
# we need to disable the master for `waiting_time_before_acting_as_master_sec` seconds each time, we cannot be sure.
# we need to disable the master for `waiting_time_before_acting_as_master_sec` seconds each time,
# a new node its elected as master.
# if the coordinator its `coordinator1` since isn't changed we don't have to wait
# for the `waiting_time_before_acting_as_master_sec` seconds.

# give time to the election to be forwarded to all the coordinators.
await asyncio.sleep(3)

assert (
not primary.are_we_master()
), "after a change in the coordinator we can act as a master until we wait for the required time"
assert not secondary.are_we_master(), "also the second cannot be immediately a master"
# after that time the primary can act as a master
await asyncio.sleep(waiting_time_before_acting_as_master_sec)
assert not primary.are_we_master(), "Last fetch before being available as master"
assert not secondary.are_we_master(), "secondary cannot be master"

assert primary.are_we_master()
assert not secondary.are_we_master()
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def test_regression_soft_delete_schemas_should_be_registered(
master_coordinator = MasterCoordinator(config=config)
master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper())
try:
await master_coordinator.start()
master_coordinator.start()
database = InMemoryDatabase()
offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
Expand Down Expand Up @@ -166,7 +166,7 @@ async def test_regression_config_for_inexisting_object_should_not_throw(
master_coordinator = MasterCoordinator(config=config)
master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper())
try:
await master_coordinator.start()
master_coordinator.start()
database = InMemoryDatabase()
offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
Expand Down Expand Up @@ -271,7 +271,7 @@ async def test_key_format_detection(
master_coordinator = MasterCoordinator(config=config)
master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper())
try:
await master_coordinator.start()
master_coordinator.start()
key_formatter = KeyFormatter()
database = InMemoryDatabase()
offset_watcher = OffsetWatcher()
Expand Down
2 changes: 0 additions & 2 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,7 @@ async def repeat_until_master_is_available(client: Client) -> None:
while True:
res = await client.post("/master_available", json={})
reply = res.json()
breakpoint()
if reply is not None and "master_available" in reply and reply["master_available"] is True:
breakpoint()
break
time.sleep(1)

Expand Down

0 comments on commit d7cdd8a

Please sign in to comment.