Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
edsoncsouza authored Dec 1, 2024
2 parents 4b5a4e0 + 366b59c commit a7b8e50
Show file tree
Hide file tree
Showing 24 changed files with 534 additions and 89 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/container.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
run: |
echo is_release=${{ contains(github.ref, 'refs/tags/') }} | tee -a $GITHUB_OUTPUT
echo is_dev=${{ ! contains(github.ref, 'refs/tags/') }} | tee -a $GITHUB_OUTPUT
echo version=$(git describe --always --tags) | tee -a $GITHUB_OUTPUT
echo version=$(git describe --tags | cut -d '-' -f -2 | sed 's/-/.dev/g') | tee -a $GITHUB_OUTPUT
# QEMU is used to set up VMs for building non-x86_64 images.
- name: Set up QEMU
Expand Down
5 changes: 5 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,11 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
* - ``log_format``
- ``%(name)-20s\t%(threadName)s\t%(levelname)-8s\t%(message)s``
- Log format
* - ``waiting_time_before_acting_as_master_ms``
- ``5000``
- The time that a master wait before becoming an active master if at the previous round of election wasn't the master (in that case the waiting time its skipped).
Should be an upper bound of the time required for a master to write a message in the kafka topic + the time required from a node in the cluster to consume the
Log of messages. If the value its too low there is the risk under high load of producing different schemas with the ID.


Authentication and authorization of Karapace Schema Registry REST API
Expand Down
12 changes: 10 additions & 2 deletions src/karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,20 @@ def _handle_restore_topic(
instruction: RestoreTopic,
config: Config,
skip_topic_creation: bool = False,
override_replication_factor: int | None = None,
) -> None:
if skip_topic_creation:
return
repl_factor = instruction.replication_factor
if override_replication_factor is not None:
LOG.info(
"Overriding replication factor with: %d (was: %d)", override_replication_factor, instruction.replication_factor
)
repl_factor = override_replication_factor
if not _maybe_create_topic(
config=config,
name=instruction.topic_name,
replication_factor=instruction.replication_factor,
replication_factor=repl_factor,
topic_configs=instruction.topic_configs,
):
raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists")
Expand Down Expand Up @@ -426,6 +433,7 @@ def restore_backup(
backup_location: ExistingFile,
topic_name: TopicName,
skip_topic_creation: bool = False,
override_replication_factor: int | None = None,
) -> None:
"""Restores a backup from the specified location into the configured topic.
Expand Down Expand Up @@ -475,7 +483,7 @@ def _check_producer_exception() -> None:
_handle_restore_topic_legacy(instruction, config, skip_topic_creation)
producer = stack.enter_context(_producer(config, instruction.topic_name))
elif isinstance(instruction, RestoreTopic):
_handle_restore_topic(instruction, config, skip_topic_creation)
_handle_restore_topic(instruction, config, skip_topic_creation, override_replication_factor)
producer = stack.enter_context(_producer(config, instruction.topic_name))
elif isinstance(instruction, ProducerSend):
if producer is None:
Expand Down
10 changes: 10 additions & 0 deletions src/karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ def parse_args() -> argparse.Namespace:
),
)

parser_restore.add_argument(
"--override-replication-factor",
help=(
"Override the replication factor that is save in the backup. This is needed when restoring a backup from a"
"downsized cluster (like scaling down from 6 to 3 nodes). This has effect only for V3 backups."
),
type=int,
)

return parser.parse_args()


Expand Down Expand Up @@ -115,6 +124,7 @@ def dispatch(args: argparse.Namespace) -> None:
backup_location=api.locate_backup_file(location),
topic_name=api.normalize_topic_name(args.topic, config),
skip_topic_creation=args.skip_topic_creation,
override_replication_factor=args.override_replication_factor,
)
except BackupDataRestorationError:
traceback.print_exc()
Expand Down
2 changes: 2 additions & 0 deletions src/karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class Config(TypedDict):
kafka_schema_reader_strict_mode: bool
kafka_retriable_errors_silenced: bool
use_protobuf_formatter: bool
waiting_time_before_acting_as_master_ms: int

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -163,6 +164,7 @@ class ConfigDefaults(Config, total=False):
"kafka_schema_reader_strict_mode": False,
"kafka_retriable_errors_silenced": True,
"use_protobuf_formatter": False,
"waiting_time_before_acting_as_master_ms": 5000,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
83 changes: 68 additions & 15 deletions src/karapace/coordinator/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,42 @@
from karapace.config import Config
from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS
from karapace.typing import SchemaReaderStoppper
from threading import Thread
from typing import Final

import asyncio
import logging
import time

__all__ = ("MasterCoordinator",)


LOG = logging.getLogger(__name__)


class MasterCoordinator:
"""Handles primary election"""
"""Handles primary election
The coordination is run in own dedicated thread, under stress situation the main
eventloop could have queue of items to work and having own thread will give more
runtime for the coordination tasks as Python intrepreter will switch the active
thread by the configured thread switch interval. Default interval in CPython is
5 milliseconds.
"""

def __init__(self, config: Config) -> None:
super().__init__()
self._config: Final = config
self._kafka_client: AIOKafkaClient | None = None
self._running = True
self._sc: SchemaCoordinator | None = None
self._closing = asyncio.Event()
self._thread: Thread = Thread(target=self._start_loop, daemon=True)
self._loop: asyncio.AbstractEventLoop | None = None
self._schema_reader_stopper: SchemaReaderStoppper | None = None

def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None:
self._schema_reader_stopper = schema_reader_stopper

@property
def schema_coordinator(self) -> SchemaCoordinator | None:
Expand All @@ -41,7 +58,18 @@ def schema_coordinator(self) -> SchemaCoordinator | None:
def config(self) -> Config:
return self._config

async def start(self) -> None:
def start(self) -> None:
self._thread.start()

def _start_loop(self) -> None:
# we should avoid the reassignment otherwise we leak resources
assert self._loop is None, "Loop already started"
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._loop.create_task(self._async_loop())
self._loop.run_forever()

async def _async_loop(self) -> None:
self._kafka_client = self.init_kafka_client()
# Wait until schema coordinator is ready.
# This probably needs better synchronization than plain waits.
Expand All @@ -61,10 +89,22 @@ async def start(self) -> None:
await asyncio.sleep(0.5)

self._sc = self.init_schema_coordinator()
while True:
if self._sc.ready():
return
await asyncio.sleep(0.5)

# keeping the thread sleeping until it die.
# we need to keep the schema_coordinator running
# it contains the `heartbeat` and coordination logic.
await self._closing.wait()

LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
while self._loop is not None and not self._loop.is_closed():
self._loop.stop()
if not self._loop.is_running():
self._loop.close()
time.sleep(0.5)
if self._kafka_client:
await self._kafka_client.close()

def init_kafka_client(self) -> AIOKafkaClient:
ssl_context = create_ssl_context(
Expand All @@ -90,15 +130,18 @@ def init_kafka_client(self) -> AIOKafkaClient:

def init_schema_coordinator(self) -> SchemaCoordinator:
assert self._kafka_client is not None
assert self._schema_reader_stopper is not None
schema_coordinator = SchemaCoordinator(
client=self._kafka_client,
schema_reader_stopper=self._schema_reader_stopper,
election_strategy=self._config.get("master_election_strategy", "lowest"),
group_id=self._config["group_id"],
hostname=self._config["advertised_hostname"],
master_eligibility=self._config["master_eligibility"],
port=self._config["advertised_port"],
scheme=self._config["advertised_protocol"],
session_timeout_ms=self._config["session_timeout_ms"],
waiting_time_before_acting_as_master_ms=self._config["waiting_time_before_acting_as_master_ms"],
)
schema_coordinator.start()
return schema_coordinator
Expand All @@ -107,7 +150,7 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus:
assert self._sc is not None
generation = self._sc.generation if self._sc is not None else OffsetCommitRequest.DEFAULT_GENERATION_ID
return SchemaCoordinatorStatus(
is_primary=self._sc.are_we_master if self._sc is not None else None,
is_primary=self._sc.are_we_master() if self._sc is not None else None,
is_primary_eligible=self._config["master_eligibility"],
primary_url=self._sc.master_url if self._sc is not None else None,
is_running=True,
Expand All @@ -116,12 +159,22 @@ def get_coordinator_status(self) -> SchemaCoordinatorStatus:

def get_master_info(self) -> tuple[bool | None, str | None]:
"""Return whether we're the master, and the actual master url that can be used if we're not"""
assert self._sc is not None
return self._sc.are_we_master, self._sc.master_url
if not self._sc:
return False, None

if not self._sc.ready():
# we should wait for a while after we have been elected master, we should also consume
# all the messages in the log before proceeding, check the doc of `self._sc.are_we_master`
# for more details
return False, None

return self._sc.are_we_master(), self._sc.master_url

def __send_close_event(self) -> None:
self._closing.set()

async def close(self) -> None:
LOG.info("Closing master_coordinator")
if self._sc:
await self._sc.close()
if self._kafka_client:
await self._kafka_client.close()
LOG.info("Sending the close signal to the master coordinator thread")
if self._loop is None:
raise ValueError("Cannot stop the loop before `.start()` is called")
self._loop.call_soon_threadsafe(self.__send_close_event)
Loading

0 comments on commit a7b8e50

Please sign in to comment.