Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rest-proxy: split out into standalone module #993

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[run]
branch = true
relative_files = true
source = src/karapace
source = src/karapace,src/rest_proxy
3 changes: 3 additions & 0 deletions .github/workflows/container-smoke-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ jobs:

- name: Smoke test REST proxy
run: bin/smoke-test-rest.sh

- name: Smoke test REST proxy standalone
run: bin/smoke-test-rest-proxy.sh
20 changes: 20 additions & 0 deletions bin/smoke-test-rest-proxy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

retries=5

for ((i = 0; i <= retries; i++)); do
response=$(curl --silent --verbose --fail http://localhost:8083/topics)

if [[ $response == '["_schemas","__consumer_offsets"]' ]]; then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would __consumer_offsets be enough here, the _schemas is schema registry special.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think testing against _schemas adds a little bit more coverage as that topic is created from application code and truly verifies the flow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this is existing code/logic.

echo "Ok!"
break
nosahama marked this conversation as resolved.
Show resolved Hide resolved
fi

if ((i == retries)); then
echo "Still failing after $i retries, giving up."
exit 1
fi

echo "Smoke test failed, retrying in 5 seconds ..."
sleep 5
done
28 changes: 28 additions & 0 deletions container/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,34 @@ services:
KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true

karapace-rest-proxy:
image: ghcr.io/aiven-open/karapace:develop
build:
context: ..
dockerfile: container/Dockerfile
entrypoint:
- /bin/bash
- /opt/karapace/start.sh
- rest_proxy
depends_on:
- kafka
- karapace-registry
ports:
- "8083:8083"
environment:
KARAPACE_PORT: 8083
KARAPACE_HOST: 0.0.0.0
KARAPACE_ADVERTISED_HOSTNAME: karapace-rest-proxy
KARAPACE_BOOTSTRAP_URI: kafka:29092
KARAPACE_REGISTRY_HOST: karapace-registry
KARAPACE_REGISTRY_PORT: 8081
KARAPACE_ADMIN_METADATA_MAX_AGE: 0
KARAPACE_LOG_LEVEL: WARNING
KARAPACE_STATSD_HOST: statsd-exporter
KARAPACE_STATSD_PORT: 8125
KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true

prometheus:
image: prom/prometheus
volumes:
Expand Down
16 changes: 16 additions & 0 deletions container/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@ rest)
echo "Starting Karapace REST API"
exec python3 -m karapace.karapace_all /opt/karapace/rest.config.json
;;
rest_proxy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case rest) is not required anymore.

# Reexport variables for compatibility
[[ -n ${KARAPACE_REST_ADVERTISED_HOSTNAME+isset} ]] && export KARAPACE_ADVERTISED_HOSTNAME="${KARAPACE_REST_ADVERTISED_HOSTNAME}"
[[ -n ${KARAPACE_REST_BOOTSTRAP_URI+isset} ]] && export KARAPACE_BOOTSTRAP_URI="${KARAPACE_REST_BOOTSTRAP_URI}"
[[ -n ${KARAPACE_REST_REGISTRY_HOST+isset} ]] && export KARAPACE_REGISTRY_HOST="${KARAPACE_REST_REGISTRY_HOST}"
[[ -n ${KARAPACE_REST_REGISTRY_PORT+isset} ]] && export KARAPACE_REGISTRY_PORT="${KARAPACE_REST_REGISTRY_PORT}"
[[ -n ${KARAPACE_REST_HOST+isset} ]] && export KARAPACE_HOST="${KARAPACE_REST_HOST}"
[[ -n ${KARAPACE_REST_PORT+isset} ]] && export KARAPACE_PORT="${KARAPACE_REST_PORT}"
[[ -n ${KARAPACE_REST_ADMIN_METADATA_MAX_AGE+isset} ]] && export KARAPACE_ADMIN_METADATA_MAX_AGE="${KARAPACE_REST_ADMIN_METADATA_MAX_AGE}"
[[ -n ${KARAPACE_REST_LOG_LEVEL+isset} ]] && export KARAPACE_LOG_LEVEL="${KARAPACE_REST_LOG_LEVEL}"
export KARAPACE_REST=1
echo "{}" >/opt/karapace/rest.config.json

echo "Starting Karapace REST API"
exec python3 -m rest_proxy /opt/karapace/rest.config.json
;;
registry)
# Reexport variables for compatibility
[[ -n ${KARAPACE_REGISTRY_ADVERTISED_HOSTNAME+isset} ]] && export KARAPACE_ADVERTISED_HOSTNAME="${KARAPACE_REGISTRY_ADVERTISED_HOSTNAME}"
Expand Down
4 changes: 2 additions & 2 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ ignore_errors = True
[mypy-karapace.serialization]
ignore_errors = True

[mypy-karapace.kafka_rest_apis.consumer_manager]
[mypy-rest_proxy.consumer_manager]
ignore_errors = True

[mypy-karapace.kafka_rest_apis]
[mypy-rest_proxy]
ignore_errors = True

# Third-party libraries with no stubs available. Before adding libraries here,
Expand Down
2 changes: 1 addition & 1 deletion src/karapace/karapace_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
from karapace import version as karapace_version
from karapace.config import Config, read_config
from karapace.instrumentation.prometheus import PrometheusInstrumentation
from karapace.kafka_rest_apis import KafkaRest
from karapace.rapu import RestApp
from karapace.schema_registry_apis import KarapaceSchemaRegistryController
from karapace.utils import DebugAccessLogger
from rest_proxy import KafkaRest

import argparse
import logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@
from karapace.errors import InvalidSchema
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.producer import AsyncKafkaProducer
from karapace.kafka_rest_apis.authentication import (
get_auth_config_from_header,
get_expiration_time_from_header,
get_kafka_client_auth_parameters_from_config,
)
from karapace.kafka_rest_apis.consumer_manager import ConsumerManager
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
from karapace.kafka_rest_apis.schema_cache import TopicSchemaCache
from karapace.karapace import KarapaceBase
from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE
from karapace.schema_models import TypedSchema, ValidatedTypedSchema
Expand All @@ -44,6 +36,14 @@
)
from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType
from karapace.utils import convert_to_int, json_encode
from rest_proxy.authentication import (
get_auth_config_from_header,
get_expiration_time_from_header,
get_kafka_client_auth_parameters_from_config,
)
from rest_proxy.consumer_manager import ConsumerManager
from rest_proxy.error_codes import RESTErrorCodes
from rest_proxy.schema_cache import TopicSchemaCache
from typing import Callable, TypedDict

import asyncio
Expand Down
93 changes: 93 additions & 0 deletions src/rest_proxy/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from aiohttp.web_log import AccessLogger
from contextlib import closing
from karapace import version as karapace_version
from karapace.config import Config, read_config
from karapace.instrumentation.prometheus import PrometheusInstrumentation
from karapace.utils import DebugAccessLogger
from rest_proxy import KafkaRest
from typing import Final

import argparse
import logging
import sys

PROGRAM_NAME: Final[str] = "karapace_rest_proxy"


def _configure_logging(*, config: Config) -> None:
log_level = config.get("log_level", "DEBUG")
log_format = config.get("log_format", "%(name)-20s\t%(threadName)s\t%(levelname)-8s\t%(message)s")

root_handler: logging.Handler | None = None
log_handler = config.get("log_handler", None)
if "systemd" == log_handler:
from systemd import journal

root_handler = journal.JournalHandler(SYSLOG_IDENTIFIER="karapace")
elif "stdout" == log_handler or log_handler is None:
root_handler = logging.StreamHandler(stream=sys.stdout)
else:
logging.basicConfig(level=logging.INFO, format=log_format)
logging.getLogger().setLevel(log_level)
logging.warning("Log handler %s not recognized, root handler not set.", log_handler)

if root_handler is not None:
root_handler.setFormatter(logging.Formatter(log_format))
root_handler.setLevel(log_level)
root_handler.set_name(name="karapace")
logging.root.addHandler(root_handler)

logging.root.setLevel(log_level)

if config.get("access_logs_debug") is True:
config["access_log_class"] = DebugAccessLogger
logging.getLogger("aiohttp.access").setLevel(logging.DEBUG)
else:
config["access_log_class"] = AccessLogger


def main() -> int:
parser = argparse.ArgumentParser(
prog=PROGRAM_NAME,
description="Karapace rest proxy: exposes an API over common Kafka operations, your Kafka essentials in one tool",
)
parser.add_argument("--version", action="version", help="show program version", version=karapace_version.__version__)
parser.add_argument("config_file", help="configuration file path", type=argparse.FileType())
arg = parser.parse_args()

with closing(arg.config_file):
config = read_config(arg.config_file)
Comment on lines +64 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you aren't using the closing arg I don't think you are closing it correctly.
Shouldn't be:

with closing(foo) as bar:
   config = read_config(bar)

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can take a look, once again, this is existing code 😉


logging.log(logging.INFO, "\n%s\\Co %s\n%s", ("=" * 50), PROGRAM_NAME, ("=" * 50))

_configure_logging(config=config)

app = KafkaRest(config=config)

logging.log(logging.INFO, "\n%s\nStarting %s\n%s", ("=" * 50), PROGRAM_NAME, ("=" * 50))

config_without_secrets = {}
for key, value in config.items():
if "password" in key:
value = "****"
Comment on lines +77 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is dangerous, you should keep the info about the secrecy of the field in the context of where its created. Otherwise if tomorrow we add another field like e.g. sentry_api_token this isn't filtered and there isn't any hint about this indirect dependency.
I suggest to add the filtering in the object itself, you can add to the Config object a method like:


SENSITIVE_FIELDS: Final = ("registry_password", "ssl_password", "sasl_plain_password", "sasl_oauth_token")

@staticmethod
def is_field_a_secret(self, field_name: str) -> bool:
   if field_name not in Config:
      raise ValueError(f"{field_name} is not known")
   return field_name not in SENSITIVE_FIELDS

In that way the filtering logic its "near" to where the addition/removal/renaming of the property is, its a property of the data structure rather than a piece of coupled code put randomly in another file and that the developer should know to look before adding/removing something far away.

PS I think the logic wasn't covering the sasl_oauth_token

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once again, this is existing code 😉
I'd reason around your suggestions and see to add/fix or create follow up items 👍

config_without_secrets[key] = value
logging.log(logging.DEBUG, "Config %r", config_without_secrets)

try:
PrometheusInstrumentation.setup_metrics(app=app)
app.run() # `close` will be called by the callback `close_by_app` set by `KarapaceBase`
except Exception as ex: # pylint: disable-broad-except
app.stats.unexpected_exception(ex=ex, where=f"{PROGRAM_NAME}_main")
raise

return 0


if __name__ == "__main__":
sys.exit(main())
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
from karapace.kafka.common import translate_from_kafkaerror
from karapace.kafka.consumer import AsyncKafkaConsumer
from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS, Timestamp
from karapace.kafka_rest_apis.authentication import get_kafka_client_auth_parameters_from_config
from karapace.kafka_rest_apis.error_codes import RESTErrorCodes
from karapace.karapace import empty_response, KarapaceBase
from karapace.serialization import DeserializationError, InvalidMessageHeader, InvalidPayload, SchemaRegistrySerializer
from karapace.utils import convert_to_int, json_decode, JSONDecodeError
from rest_proxy.authentication import get_kafka_client_auth_parameters_from_config
from rest_proxy.error_codes import RESTErrorCodes
from struct import error as UnpackError
from urllib.parse import urljoin

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.consumer import AsyncKafkaConsumer, KafkaConsumer
from karapace.kafka.producer import AsyncKafkaProducer, KafkaProducer
from karapace.kafka_rest_apis import KafkaRest
from pathlib import Path
from rest_proxy import KafkaRest
from tests.conftest import KAFKA_VERSION
from tests.integration.utils.cluster import RegistryDescription, RegistryEndpoint, start_schema_registry_cluster
from tests.integration.utils.config import KafkaConfig, KafkaDescription, ZKConfig
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
from karapace.client import Client
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.producer import KafkaProducer
from karapace.kafka_rest_apis import KafkaRest, SUBJECT_VALID_POSTFIX
from karapace.schema_type import SchemaType
from karapace.version import __version__
from rest_proxy import KafkaRest, SUBJECT_VALID_POSTFIX
from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES
from tests.utils import (
new_random_name,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_rest_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from karapace.kafka_rest_apis.consumer_manager import KNOWN_FORMATS
from rest_proxy.consumer_manager import KNOWN_FORMATS
from tests.utils import (
consumer_valid_payload,
new_consumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
See LICENSE for details
"""
from karapace.config import DEFAULTS
from karapace.kafka_rest_apis import UserRestProxy
from karapace.serialization import SchemaRegistrySerializer
from rest_proxy import UserRestProxy
from unittest.mock import patch

import copy
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

from http import HTTPStatus
from karapace.config import ConfigDefaults, set_config_defaults
from karapace.kafka_rest_apis.authentication import (
from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE
from rest_proxy.authentication import (
get_auth_config_from_header,
get_expiration_time_from_header,
get_kafka_client_auth_parameters_from_config,
SimpleOauthTokenProvider,
)
from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE

import base64
import datetime
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_rest_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from __future__ import annotations

from karapace.config import set_config_defaults
from karapace.kafka_rest_apis import AUTH_EXPIRY_TOLERANCE, KafkaRest, UserRestProxy
from rest_proxy import AUTH_EXPIRY_TOLERANCE, KafkaRest, UserRestProxy
from unittest.mock import call, Mock

import asyncio
Expand Down
Loading