Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: validate REST Proxy subscription param types #981

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ def _illegal_state_fail(message: str, content_type: str) -> None:
message=message,
)

@staticmethod
def _unprocessable_entity(*, message: str, content_type: str) -> None:
ConsumerManager._assert(
cond=False,
code=HTTPStatus.UNPROCESSABLE_ENTITY,
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
content_type=content_type,
message=message,
)

# external api below
# CONSUMER
async def create_consumer(self, group_name: str, request_data: dict, content_type: str):
Expand Down Expand Up @@ -318,7 +328,11 @@ async def set_subscription(self, internal_name: tuple[str, str], content_type: s
LOG.info("Updating subscription for %s", internal_name)
self._assert_consumer_exists(internal_name, content_type)
topics = request_data.get("topics", [])
if topics and not isinstance(topics, list):
self._unprocessable_entity(message="Topics is expected to be list of strings", content_type=content_type)
topics_pattern = request_data.get("topic_pattern")
if topics_pattern and not isinstance(topics_pattern, str):
self._unprocessable_entity(message="Topic patterns is expected to be a string", content_type=content_type)
if not (topics or topics_pattern):
self._illegal_state_fail(
message="Neither topic_pattern nor topics are present in request", content_type=content_type
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/test_rest_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ async def test_subscription(rest_async_client, admin_client, producer, trail):
res = await rest_async_client.post(assign_path, headers=REST_HEADERS["json"], json=assign_payload)
assert res.status_code == 409, "Expecting status code 409 on assign after subscribe on the same consumer instance"

# topics parameter is expected to be array, 4xx error returned
res = await rest_async_client.post(sub_path, json={"topics": topic_name}, headers=REST_HEADERS["json"])
assert res.status_code == 422, "Expecting status code 422 on subscription update with invalid topics param"

# topic pattern parameter is expected to be a string, 4xx error returned
res = await rest_async_client.post(
sub_path, json={"topic_pattern": ["not", "a", "string"]}, headers=REST_HEADERS["json"]
)
assert res.status_code == 422, "Expecting status code 422 on subscription update with invalid topics param"


@pytest.mark.parametrize("trail", ["", "/"])
async def test_seek(rest_async_client, admin_client, trail):
Expand Down
Loading