Skip to content

Commit

Permalink
Merge pull request #981 from Aiven-Open/jjaakola-aiven-validate-rest-…
Browse files Browse the repository at this point in the history
…proxy-subscription-param-types

fix: validate REST Proxy subscription param types
  • Loading branch information
davide-armand authored Oct 17, 2024
2 parents 2c55c61 + 4c41189 commit 60b5db6
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ def _illegal_state_fail(message: str, content_type: str) -> None:
message=message,
)

@staticmethod
def _unprocessable_entity(*, message: str, content_type: str) -> None:
ConsumerManager._assert(
cond=False,
code=HTTPStatus.UNPROCESSABLE_ENTITY,
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
content_type=content_type,
message=message,
)

# external api below
# CONSUMER
async def create_consumer(self, group_name: str, request_data: dict, content_type: str):
Expand Down Expand Up @@ -318,7 +328,11 @@ async def set_subscription(self, internal_name: tuple[str, str], content_type: s
LOG.info("Updating subscription for %s", internal_name)
self._assert_consumer_exists(internal_name, content_type)
topics = request_data.get("topics", [])
if topics and not isinstance(topics, list):
self._unprocessable_entity(message="Topics is expected to be list of strings", content_type=content_type)
topics_pattern = request_data.get("topic_pattern")
if topics_pattern and not isinstance(topics_pattern, str):
self._unprocessable_entity(message="Topic patterns is expected to be a string", content_type=content_type)
if not (topics or topics_pattern):
self._illegal_state_fail(
message="Neither topic_pattern nor topics are present in request", content_type=content_type
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/test_rest_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ async def test_subscription(rest_async_client, admin_client, producer, trail):
res = await rest_async_client.post(assign_path, headers=REST_HEADERS["json"], json=assign_payload)
assert res.status_code == 409, "Expecting status code 409 on assign after subscribe on the same consumer instance"

# topics parameter is expected to be array, 4xx error returned
res = await rest_async_client.post(sub_path, json={"topics": topic_name}, headers=REST_HEADERS["json"])
assert res.status_code == 422, "Expecting status code 422 on subscription update with invalid topics param"

# topic pattern parameter is expected to be a string, 4xx error returned
res = await rest_async_client.post(
sub_path, json={"topic_pattern": ["not", "a", "string"]}, headers=REST_HEADERS["json"]
)
assert res.status_code == 422, "Expecting status code 422 on subscription update with invalid topics param"


@pytest.mark.parametrize("trail", ["", "/"])
async def test_seek(rest_async_client, admin_client, trail):
Expand Down

0 comments on commit 60b5db6

Please sign in to comment.