Skip to content

Commit

Permalink
Catch nats timeout errors and handle them better (#2630)
Browse files Browse the repository at this point in the history
* Catch nats timeout errors

* Add exception that we control at processing

* Add another one
  • Loading branch information
lferran authored Nov 14, 2024
1 parent d38cb26 commit e942e5e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 1 deletion.
4 changes: 4 additions & 0 deletions nucliadb/src/nucliadb/ingest/orm/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from typing import Optional

import aiohttp.client_exceptions
import nats.errors
import nats.js.errors

from nucliadb.common import datamanagers, locking
from nucliadb.common.cluster.settings import settings as cluster_settings
Expand Down Expand Up @@ -325,6 +327,8 @@ async def txn(
aiohttp.client_exceptions.ClientError,
ConflictError,
MaindbServerError,
nats.errors.NoRespondersError,
nats.js.errors.NoStreamResponseError,
): # pragma: no cover
# Unhandled exceptions here that should bubble and hard fail
# XXX We swallow too many exceptions here!
Expand Down
11 changes: 10 additions & 1 deletion nucliadb/src/nucliadb/writer/api/v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
from fastapi import HTTPException

from nucliadb_protos.writer_pb2 import BrokerMessage
from nucliadb_utils.transaction import MaxTransactionSizeExceededError, TransactionCommitTimeoutError
from nucliadb_utils.transaction import (
MaxTransactionSizeExceededError,
StreamingServerTimeoutError,
TransactionCommitTimeoutError,
)
from nucliadb_utils.utilities import get_transaction_utility


Expand All @@ -39,3 +43,8 @@ async def commit(writer: BrokerMessage, partition: int, wait: bool = True) -> No
status_code=413,
detail="Transaction size exceeded. The resource is too large to be stored. Consider using file fields or split into multiple requests.",
)
except StreamingServerTimeoutError:
raise HTTPException(
status_code=504,
detail="Timeout waiting for the streaming server to respond. Please back off and retry.",
)
14 changes: 14 additions & 0 deletions nucliadb_utils/src/nucliadb_utils/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class MaxTransactionSizeExceededError(Exception):
pass


class StreamingServerTimeoutError(Exception):
pass


class LocalTransactionUtility:
async def commit(
self,
Expand Down Expand Up @@ -204,6 +208,16 @@ async def commit(
res = await self.js.publish(target_subject, writer.SerializeToString(), headers=headers)
except nats.errors.MaxPayloadError as ex:
raise MaxTransactionSizeExceededError() from ex
except nats.errors.TimeoutError as ex:
logger.exception(
"Nats server timeout error on publish",
extra={
"partition": partition,
"kbid": writer.kbid,
"resource": writer.uuid,
},
)
raise StreamingServerTimeoutError() from ex

waiting_for.seq = res.seq

Expand Down

0 comments on commit e942e5e

Please sign in to comment.