diff --git a/.github/workflows/container-smoke-test.yml b/.github/workflows/container-smoke-test.yml index cced926bd..b875f51c3 100644 --- a/.github/workflows/container-smoke-test.yml +++ b/.github/workflows/container-smoke-test.yml @@ -37,3 +37,6 @@ jobs: - name: Smoke test REST proxy run: bin/smoke-test-rest.sh + + - name: Smoke test REST proxy standalone + run: bin/smoke-test-rest-proxy.sh diff --git a/bin/smoke-test-rest-proxy.sh b/bin/smoke-test-rest-proxy.sh new file mode 100755 index 000000000..1dcc265f5 --- /dev/null +++ b/bin/smoke-test-rest-proxy.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +retries=5 + +for ((i = 0; i <= retries; i++)); do + response=$(curl --silent --verbose --fail http://localhost:8086/topics) + + if [[ $response == '["_schemas","__consumer_offsets"]' ]]; then + echo "Ok!" + break + fi + + if ((i == retries)); then + echo "Still failing after $i retries, giving up." + exit 1 + fi + + echo "Smoke test failed, retrying in 5 seconds ..." + sleep 5 +done diff --git a/container/compose.yml b/container/compose.yml index fa2c53265..500e4d0c5 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -111,6 +111,24 @@ services: KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true + karapace-rest-new: + image: ghcr.io/aiven-open/karapace:develop + build: + context: .. + dockerfile: container/Dockerfile + entrypoint: + - python3 + - -m + - rest_proxy + - /opt/karapace/rest.config.json + depends_on: + - kafka + - karapace-registry + volumes: + - ./rest.config.json:/opt/karapace/rest.config.json + ports: + - "8086:8086" + prometheus: image: prom/prometheus volumes: diff --git a/container/rest.config.json b/container/rest.config.json new file mode 100644 index 000000000..c3bed9d2f --- /dev/null +++ b/container/rest.config.json @@ -0,0 +1,15 @@ +{ + "rest": 1, + "advertised_hostname": "karapace-rest-proxy", + "bootstrap_uri": "kafka:29092", + "registry_host": "karapace-registry", + "registry_port": 8081, + "host": "0.0.0.0", + "port": 8086, + "admin_metadata_max_age": 0, + "log_level": "DEBUG", + "statsd_host": "statsd-exporter", + "statsd_port": 8125, + "kafka_schema_reader_strict_mode": false, + "kafka_retriable_errors_silenced": true +} diff --git a/mypy.ini b/mypy.ini index c4ef8efd1..26963e571 100644 --- a/mypy.ini +++ b/mypy.ini @@ -65,10 +65,10 @@ ignore_errors = True [mypy-karapace.serialization] ignore_errors = True -[mypy-karapace.kafka_rest_apis.consumer_manager] +[mypy-rest_proxy.consumer_manager] ignore_errors = True -[mypy-karapace.kafka_rest_apis] +[mypy-rest_proxy] ignore_errors = True # Third-party libraries with no stubs available. Before adding libraries here, diff --git a/src/karapace/karapace_all.py b/src/karapace/karapace_all.py index ccdb96915..049d07c9c 100644 --- a/src/karapace/karapace_all.py +++ b/src/karapace/karapace_all.py @@ -9,10 +9,10 @@ from karapace import version as karapace_version from karapace.config import Config, read_config from karapace.instrumentation.prometheus import PrometheusInstrumentation -from karapace.kafka_rest_apis import KafkaRest from karapace.rapu import RestApp from karapace.schema_registry_apis import KarapaceSchemaRegistryController from karapace.utils import DebugAccessLogger +from rest_proxy import KafkaRest import argparse import logging diff --git a/src/karapace/kafka_rest_apis/__init__.py b/src/rest_proxy/__init__.py similarity index 99% rename from src/karapace/kafka_rest_apis/__init__.py rename to src/rest_proxy/__init__.py index 10675fb23..acf23cd55 100644 --- a/src/karapace/kafka_rest_apis/__init__.py +++ b/src/rest_proxy/__init__.py @@ -23,14 +23,6 @@ from karapace.errors import InvalidSchema from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import AsyncKafkaProducer -from karapace.kafka_rest_apis.authentication import ( - get_auth_config_from_header, - get_expiration_time_from_header, - get_kafka_client_auth_parameters_from_config, -) -from karapace.kafka_rest_apis.consumer_manager import ConsumerManager -from karapace.kafka_rest_apis.error_codes import RESTErrorCodes -from karapace.kafka_rest_apis.schema_cache import TopicSchemaCache from karapace.karapace import KarapaceBase from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE from karapace.schema_models import TypedSchema, ValidatedTypedSchema @@ -44,6 +36,14 @@ ) from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType from karapace.utils import convert_to_int, json_encode +from rest_proxy.authentication import ( + get_auth_config_from_header, + get_expiration_time_from_header, + get_kafka_client_auth_parameters_from_config, +) +from rest_proxy.consumer_manager import ConsumerManager +from rest_proxy.error_codes import RESTErrorCodes +from rest_proxy.schema_cache import TopicSchemaCache from typing import Callable, TypedDict import asyncio diff --git a/src/rest_proxy/__main__.py b/src/rest_proxy/__main__.py new file mode 100644 index 000000000..2b0775131 --- /dev/null +++ b/src/rest_proxy/__main__.py @@ -0,0 +1,93 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from aiohttp.web_log import AccessLogger +from contextlib import closing +from karapace import version as karapace_version +from karapace.config import Config, read_config +from karapace.instrumentation.prometheus import PrometheusInstrumentation +from karapace.utils import DebugAccessLogger +from rest_proxy import KafkaRest +from typing import Final + +import argparse +import logging +import sys + +PROGRAM_NAME: Final[str] = "karapace_rest_proxy" + + +def _configure_logging(*, config: Config) -> None: + log_level = config.get("log_level", "DEBUG") + log_format = config.get("log_format", "%(name)-20s\t%(threadName)s\t%(levelname)-8s\t%(message)s") + + root_handler: logging.Handler | None = None + log_handler = config.get("log_handler", None) + if "systemd" == log_handler: + from systemd import journal + + root_handler = journal.JournalHandler(SYSLOG_IDENTIFIER="karapace") + elif "stdout" == log_handler or log_handler is None: + root_handler = logging.StreamHandler(stream=sys.stdout) + else: + logging.basicConfig(level=logging.INFO, format=log_format) + logging.getLogger().setLevel(log_level) + logging.warning("Log handler %s not recognized, root handler not set.", log_handler) + + if root_handler is not None: + root_handler.setFormatter(logging.Formatter(log_format)) + root_handler.setLevel(log_level) + root_handler.set_name(name="karapace") + logging.root.addHandler(root_handler) + + logging.root.setLevel(log_level) + + if config.get("access_logs_debug") is True: + config["access_log_class"] = DebugAccessLogger + logging.getLogger("aiohttp.access").setLevel(logging.DEBUG) + else: + config["access_log_class"] = AccessLogger + + +def main() -> int: + parser = argparse.ArgumentParser( + prog=PROGRAM_NAME, + description="Karapace rest proxy: exposes an API over common Kafka operations, your Kafka essentials in one tool", + ) + parser.add_argument("--version", action="version", help="show program version", version=karapace_version.__version__) + parser.add_argument("config_file", help="configuration file path", type=argparse.FileType()) + arg = parser.parse_args() + + with closing(arg.config_file): + config = read_config(arg.config_file) + + logging.log(logging.INFO, "\n%s\\Co %s\n%s", ("=" * 50), PROGRAM_NAME, ("=" * 50)) + + _configure_logging(config=config) + + app = KafkaRest(config=config) + + logging.log(logging.INFO, "\n%s\nStarting %s\n%s", ("=" * 50), PROGRAM_NAME, ("=" * 50)) + + config_without_secrets = {} + for key, value in config.items(): + if "password" in key: + value = "****" + config_without_secrets[key] = value + logging.log(logging.DEBUG, "Config %r", config_without_secrets) + + try: + PrometheusInstrumentation.setup_metrics(app=app) + app.run() # `close` will be called by the callback `close_by_app` set by `KarapaceBase` + except Exception as ex: # pylint: disable-broad-except + app.stats.unexpected_exception(ex=ex, where=f"{PROGRAM_NAME}_main") + raise + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/karapace/kafka_rest_apis/authentication.py b/src/rest_proxy/authentication.py similarity index 100% rename from src/karapace/kafka_rest_apis/authentication.py rename to src/rest_proxy/authentication.py diff --git a/src/karapace/kafka_rest_apis/consumer_manager.py b/src/rest_proxy/consumer_manager.py similarity index 99% rename from src/karapace/kafka_rest_apis/consumer_manager.py rename to src/rest_proxy/consumer_manager.py index 809478f4c..6c5b116a3 100644 --- a/src/karapace/kafka_rest_apis/consumer_manager.py +++ b/src/rest_proxy/consumer_manager.py @@ -19,11 +19,11 @@ from karapace.kafka.common import translate_from_kafkaerror from karapace.kafka.consumer import AsyncKafkaConsumer from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS, Timestamp -from karapace.kafka_rest_apis.authentication import get_kafka_client_auth_parameters_from_config -from karapace.kafka_rest_apis.error_codes import RESTErrorCodes from karapace.karapace import empty_response, KarapaceBase from karapace.serialization import DeserializationError, InvalidMessageHeader, InvalidPayload, SchemaRegistrySerializer from karapace.utils import convert_to_int, json_decode, JSONDecodeError +from rest_proxy.authentication import get_kafka_client_auth_parameters_from_config +from rest_proxy.error_codes import RESTErrorCodes from struct import error as UnpackError from urllib.parse import urljoin diff --git a/src/karapace/kafka_rest_apis/error_codes.py b/src/rest_proxy/error_codes.py similarity index 100% rename from src/karapace/kafka_rest_apis/error_codes.py rename to src/rest_proxy/error_codes.py diff --git a/src/karapace/kafka_rest_apis/schema_cache.py b/src/rest_proxy/schema_cache.py similarity index 100% rename from src/karapace/kafka_rest_apis/schema_cache.py rename to src/rest_proxy/schema_cache.py diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1673445ba..3e68b5968 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -19,8 +19,8 @@ from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.consumer import AsyncKafkaConsumer, KafkaConsumer from karapace.kafka.producer import AsyncKafkaProducer, KafkaProducer -from karapace.kafka_rest_apis import KafkaRest from pathlib import Path +from rest_proxy import KafkaRest from tests.conftest import KAFKA_VERSION from tests.integration.utils.cluster import RegistryDescription, RegistryEndpoint, start_schema_registry_cluster from tests.integration.utils.config import KafkaConfig, KafkaDescription, ZKConfig diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index ee504366b..6d406d04b 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -9,9 +9,9 @@ from karapace.client import Client from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer -from karapace.kafka_rest_apis import KafkaRest, SUBJECT_VALID_POSTFIX from karapace.schema_type import SchemaType from karapace.version import __version__ +from rest_proxy import KafkaRest, SUBJECT_VALID_POSTFIX from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES from tests.utils import ( new_random_name, diff --git a/tests/integration/test_rest_consumer.py b/tests/integration/test_rest_consumer.py index f0003dbdd..357601fab 100644 --- a/tests/integration/test_rest_consumer.py +++ b/tests/integration/test_rest_consumer.py @@ -2,7 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace.kafka_rest_apis.consumer_manager import KNOWN_FORMATS +from rest_proxy.consumer_manager import KNOWN_FORMATS from tests.utils import ( consumer_valid_payload, new_consumer, diff --git a/tests/unit/kafka_rest_apis/__init__.py b/tests/unit/rest_proxy/__init__.py similarity index 100% rename from tests/unit/kafka_rest_apis/__init__.py rename to tests/unit/rest_proxy/__init__.py diff --git a/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py b/tests/unit/rest_proxy/test_rest_proxy_cluster_metadata_cache.py similarity index 99% rename from tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py rename to tests/unit/rest_proxy/test_rest_proxy_cluster_metadata_cache.py index b47fb5e02..bc40c783a 100644 --- a/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py +++ b/tests/unit/rest_proxy/test_rest_proxy_cluster_metadata_cache.py @@ -4,8 +4,8 @@ See LICENSE for details """ from karapace.config import DEFAULTS -from karapace.kafka_rest_apis import UserRestProxy from karapace.serialization import SchemaRegistrySerializer +from rest_proxy import UserRestProxy from unittest.mock import patch import copy diff --git a/tests/unit/test_authentication.py b/tests/unit/test_authentication.py index 40abc5c01..e07b00384 100644 --- a/tests/unit/test_authentication.py +++ b/tests/unit/test_authentication.py @@ -6,13 +6,13 @@ from http import HTTPStatus from karapace.config import ConfigDefaults, set_config_defaults -from karapace.kafka_rest_apis.authentication import ( +from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE +from rest_proxy.authentication import ( get_auth_config_from_header, get_expiration_time_from_header, get_kafka_client_auth_parameters_from_config, SimpleOauthTokenProvider, ) -from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE import base64 import datetime diff --git a/tests/unit/test_rest_auth.py b/tests/unit/test_rest_auth.py index 86bb14b8a..aee1b4b5e 100644 --- a/tests/unit/test_rest_auth.py +++ b/tests/unit/test_rest_auth.py @@ -6,7 +6,7 @@ from __future__ import annotations from karapace.config import set_config_defaults -from karapace.kafka_rest_apis import AUTH_EXPIRY_TOLERANCE, KafkaRest, UserRestProxy +from rest_proxy import AUTH_EXPIRY_TOLERANCE, KafkaRest, UserRestProxy from unittest.mock import call, Mock import asyncio