Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix subject name strategy config and add validation flag #735

Merged
merged 3 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,11 @@ Keys to take special care are the ones needed to configure Kafka and advertised_
- ``runtime``
- Runtime directory for the ``protoc`` protobuf schema parser and code generator
* - ``name_strategy``
- ``subject_name``
- ``topic_name``
- Name strategy to use when storing schemas from the kafka rest proxy service
* - ``name_strategy_validation``
- ``true``
- If enabled, validate that given schema is registered under used name strategy when producing messages from Kafka Rest
* - ``master_election_strategy``
- ``lowest``
- Decides on what basis the Karapace cluster master is chosen (only relevant in a multi node setup)
Expand Down
2 changes: 1 addition & 1 deletion container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ RUN groupadd --system karapace \
&& chown --recursive karapace:karapace /opt/karapace /var/log/karapace

# Install protobuf compiler.
ARG PROTOBUF_COMPILER_VERSION="3.12.4-1"
ARG PROTOBUF_COMPILER_VERSION="3.12.4-1+deb11u1"
RUN apt-get update \
&& apt-get install --assume-yes --no-install-recommends \
protobuf-compiler=$PROTOBUF_COMPILER_VERSION \
Expand Down
18 changes: 18 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class Config(TypedDict):
session_timeout_ms: int
karapace_rest: bool
karapace_registry: bool
name_strategy: str
name_strategy_validation: bool
master_election_strategy: str
protobuf_runtime_directory: str

Expand Down Expand Up @@ -142,6 +144,8 @@ class ConfigDefaults(Config, total=False):
"session_timeout_ms": 10000,
"karapace_rest": False,
"karapace_registry": False,
"name_strategy": "topic_name",
"name_strategy_validation": True,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
}
Expand All @@ -158,6 +162,13 @@ class ElectionStrategy(Enum):
lowest = "lowest"


@unique
class NameStrategy(Enum):
topic_name = "topic_name"
record_name = "record_name"
topic_record_name = "topic_record_name"


def parse_env_value(value: str) -> str | int | bool:
# we only have ints, strings and bools in the config
try:
Expand Down Expand Up @@ -256,6 +267,13 @@ def validate_config(config: Config) -> None:
f"Invalid master election strategy: {master_election_strategy}, valid values are {valid_strategies}"
) from None

name_strategy = config["name_strategy"]
try:
NameStrategy(name_strategy)
except ValueError:
valid_strategies = [strategy.value for strategy in NameStrategy]
raise InvalidConfiguration(f"Invalid name strategy: {name_strategy}, valid values are {valid_strategies}") from None

if config["rest_authorization"] and config["sasl_bootstrap_uri"] is None:
raise InvalidConfiguration(
"Using 'rest_authorization' requires configuration value for 'sasl_bootstrap_uri' to be set"
Expand Down
2 changes: 1 addition & 1 deletion karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ def subject_not_included(schema: TypedSchema, subjects: List[Subject]) -> bool:
need_new_call=subject_not_included,
)

if subject_not_included(parsed_schema, valid_subjects):
if self.config["name_strategy_validation"] and subject_not_included(parsed_schema, valid_subjects):
raise InvalidSchema()

return schema_id
Expand Down
5 changes: 2 additions & 3 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,6 @@ class SchemaRegistrySerializer:
def __init__(
self,
config: dict,
name_strategy: str = "topic_name",
**cfg, # pylint: disable=unused-argument
) -> None:
self.config = config
self.state_lock = asyncio.Lock()
Expand All @@ -245,7 +243,8 @@ def __init__(
else:
registry_url = f"http://{self.config['registry_host']}:{self.config['registry_port']}"
registry_client = SchemaRegistryClient(registry_url, session_auth=session_auth)
self.subject_name_strategy = NAME_STRATEGIES[name_strategy]
name_strategy = config.get("name_strategy", "topic_name")
self.subject_name_strategy = NAME_STRATEGIES.get(name_strategy, topic_name_strategy)
self.registry_client: Optional[SchemaRegistryClient] = registry_client
self.ids_to_schemas: Dict[int, TypedSchema] = {}
self.ids_to_subjects: MutableMapping[int, List[Subject]] = TTLCache(maxsize=10000, ttl=600)
Expand Down
74 changes: 74 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,80 @@ async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument
await client.close()


@pytest.fixture(scope="function", name="rest_async_novalidation")
async def fixture_rest_async_novalidation(
request: SubRequest,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client: Client,
) -> AsyncIterator[Optional[KafkaRest]]:
# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
# the election process. Without proper configuration for the listeners that
# won't work and will cause test failures.
rest_url = request.config.getoption("rest_url")
if rest_url:
yield None
return

config_path = tmp_path / "karapace_config.json"

config = set_config_defaults(
{
"admin_metadata_max_age": 2,
"bootstrap_uri": kafka_servers.bootstrap_servers,
# Use non-default max request size for REST producer.
"producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES,
"name_strategy_validation": False, # This should be only difference from rest_async
}
)
write_config(config_path, config)
rest = KafkaRest(config=config)

assert rest.serializer.registry_client
rest.serializer.registry_client.client = registry_async_client
try:
yield rest
finally:
await rest.close()


@pytest.fixture(scope="function", name="rest_async_novalidation_client")
async def fixture_rest_async_novalidationclient(
request: SubRequest,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
rest_async_novalidation: KafkaRest,
aiohttp_client: AiohttpClient,
) -> AsyncIterator[Client]:
rest_url = request.config.getoption("rest_url")

# client and server_uri are incompatible settings.
if rest_url:
client = Client(server_uri=rest_url)
else:

async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument
return await aiohttp_client(rest_async_novalidation.app)

client = Client(client_factory=get_client)

try:
# wait until the server is listening, otherwise the tests may fail
await repeat_until_successful_request(
client.get,
"brokers",
json_data=None,
headers=None,
error_msg="REST API is unreachable",
timeout=10,
sleep=0.3,
)
yield client
finally:
await client.close()


@pytest.fixture(scope="function", name="rest_async_registry_auth")
async def fixture_rest_async_registry_auth(
request: SubRequest,
Expand Down
64 changes: 64 additions & 0 deletions tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,70 @@ async def test_publish_with_schema_id_of_another_subject(rest_async_client, regi
assert res.status_code == 200


async def test_publish_with_schema_id_of_another_subject_novalidation(
rest_async_novalidation_client, registry_async_client, admin_client
):
"""
Same as above but with name_strategy_validation disabled as config
"""
topic_name = new_topic(admin_client)
subject_1 = f"{topic_name}-value"
subject_2 = "some-other-subject-value"

await wait_for_topics(rest_async_novalidation_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
url = f"/topics/{topic_name}"

schema_1 = {
"type": "record",
"name": "Schema1",
"fields": [
{
"name": "name",
"type": "string",
},
],
}
schema_2 = {
"type": "record",
"name": "Schema2",
"fields": [
{
"name": "temperature",
"type": "int",
},
],
}

# Register schemas to get the ids
res = await registry_async_client.post(
f"subjects/{subject_1}/versions",
json={"schema": json.dumps(schema_1)},
)
assert res.status_code == 200
schema_1_id = res.json()["id"]

res = await registry_async_client.post(
f"subjects/{subject_2}/versions",
json={"schema": json.dumps(schema_2)},
)
assert res.status_code == 200
schema_2_id = res.json()["id"]

res = await rest_async_novalidation_client.post(
url,
json={"value_schema_id": schema_2_id, "records": [{"value": {"temperature": 25}}]},
headers=REST_HEADERS["avro"],
)
assert res.status_code == 200 # This is fine if name_strategy_validation is disabled

res = await rest_async_novalidation_client.post(
url,
json={"value_schema_id": schema_1_id, "records": [{"value": {"name": "Mr. Mustache"}}]},
headers=REST_HEADERS["avro"],
)
assert res.status_code == 200


async def test_brokers(rest_async_client):
res = await rest_async_client.get("/brokers")
assert res.ok
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_protobuf_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
async def make_ser_deser(config_path: str, mock_client) -> SchemaRegistrySerializer:
with open(config_path, encoding="utf8") as handler:
config = read_config(handler)
serializer = SchemaRegistrySerializer(config_path=config_path, config=config)
serializer = SchemaRegistrySerializer(config=config)
await serializer.registry_client.close()
serializer.registry_client = mock_client
return serializer
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
async def make_ser_deser(config_path: str, mock_client) -> SchemaRegistrySerializer:
with open(config_path, encoding="utf8") as handler:
config = read_config(handler)
serializer = SchemaRegistrySerializer(config_path=config_path, config=config)
serializer = SchemaRegistrySerializer(config=config)
await serializer.registry_client.close()
serializer.registry_client = mock_client
return serializer
Expand Down