Skip to content

Commit

Permalink
Merge pull request #803 from Aiven-Open/aiven-anton/handle-expected-e…
Browse files Browse the repository at this point in the history
…xceptions

Handle expected exceptions in topic and partition publish
  • Loading branch information
matyaskuti authored Feb 5, 2024
2 parents c6f0bb3 + 62098ac commit 7794fc5
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
30 changes: 28 additions & 2 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,10 @@ def num_consumers(self) -> int:
return len(self.consumer_manager.consumers)

async def _maybe_create_async_producer(self) -> AsyncKafkaProducer:
"""
:raises NoBrokersAvailable:
:raises AuthenticationFailedError:
"""
if self._async_producer is not None:
return self._async_producer

Expand Down Expand Up @@ -672,6 +676,10 @@ async def aclose(self) -> None:
self.consumer_manager = None

async def publish(self, topic: str, partition_id: Optional[str], content_type: str, request: HTTPRequest) -> None:
"""
:raises NoBrokersAvailable:
:raises AuthenticationFailedError:
"""
formats: dict = request.content_type
data: dict = request.json
_ = await self.get_topic_info(topic, content_type)
Expand Down Expand Up @@ -725,11 +733,25 @@ async def publish(self, topic: str, partition_id: Optional[str], content_type: s

async def partition_publish(self, topic: str, partition_id: str, content_type: str, *, request: HTTPRequest) -> None:
log.debug("Executing partition publish on topic %s and partition %s", topic, partition_id)
await self.publish(topic, partition_id, content_type, request)
try:
await self.publish(topic, partition_id, content_type, request)
except (NoBrokersAvailable, AuthenticationFailedError):
KafkaRest.service_unavailable(
message="Service unavailable",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_SERVICE_UNAVAILABLE.value,
)

async def topic_publish(self, topic: str, content_type: str, *, request: HTTPRequest) -> None:
log.debug("Executing topic publish on topic %s", topic)
await self.publish(topic, None, content_type, request)
try:
await self.publish(topic, None, content_type, request)
except (NoBrokersAvailable, AuthenticationFailedError):
KafkaRest.service_unavailable(
message="Service unavailable",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_SERVICE_UNAVAILABLE.value,
)

@staticmethod
def validate_partition_id(partition_id: str, content_type: str) -> int:
Expand Down Expand Up @@ -1038,6 +1060,10 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte
)

async def produce_messages(self, *, topic: str, prepared_records: List) -> List:
"""
:raises NoBrokersAvailable:
:raises AuthenticationFailedError:
"""
producer = await self._maybe_create_async_producer()

produce_futures = []
Expand Down
1 change: 1 addition & 0 deletions karapace/kafka_rest_apis/error_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class RESTErrorCodes(Enum):
HTTP_NOT_FOUND = HTTPStatus.NOT_FOUND.value
HTTP_INTERNAL_SERVER_ERROR = HTTPStatus.INTERNAL_SERVER_ERROR.value
HTTP_UNPROCESSABLE_ENTITY = HTTPStatus.UNPROCESSABLE_ENTITY.value
HTTP_SERVICE_UNAVAILABLE = HTTPStatus.SERVICE_UNAVAILABLE.value
TOPIC_NOT_FOUND = 40401
PARTITION_NOT_FOUND = 40402
CONSUMER_NOT_FOUND = 40403
Expand Down
11 changes: 11 additions & 0 deletions karapace/karapace.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ def not_found(message: str, sub_code: int, content_type: str) -> NoReturn:
content_type=content_type, status=HTTPStatus.NOT_FOUND, body={"message": message, "error_code": sub_code}
)

@staticmethod
def service_unavailable(message: str, sub_code: int, content_type: str) -> NoReturn:
KarapaceBase.r(
content_type=content_type,
status=HTTPStatus.SERVICE_UNAVAILABLE,
body={
"message": message,
"error_code": sub_code,
},
)

async def root_get(self) -> NoReturn:
self.r({}, "application/json")

Expand Down

0 comments on commit 7794fc5

Please sign in to comment.