Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: publishing on RabbitBroker hangs forever when connection lost #1579

Closed
sfran96 opened this issue Jul 10, 2024 · 3 comments · Fixed by mosquito/aio-pika#637
Closed

Bug: publishing on RabbitBroker hangs forever when connection lost #1579

sfran96 opened this issue Jul 10, 2024 · 3 comments · Fixed by mosquito/aio-pika#637
Labels
bug Something isn't working RabbitMQ Issues related to `faststream.rabbit` module and RabbitMQ broker features

Comments

@sfran96
Copy link
Contributor

sfran96 commented Jul 10, 2024

Describe the bug
If an application uses a RabbitBroker broker to publish messages, the application establishes the connection correctly but after the fact the connection is lost (e.g., broker goes offline) the publishing logic hangs for as long as the broker is offline and then attempts to publish all stuck messages at once.

How to reproduce
Follow the instructions in the code snippet.

# example.py
"""
How to reproduce the issue:
1. Start the FastAPI app (`uvicorn example:fastapi_app --host 0.0.0.0 --port 8181`)
2. Stop the broker
3. Send a POST request to http://localhost:8181/run-job (`curl -X 'POST' 'http://localhost:8181/run-job'`) and watch it hang
"""
from contextlib import asynccontextmanager

from fastapi import FastAPI
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker(
    f"amqp://guest:guest@localhost:5672",
    max_consumers=1,
)


@asynccontextmanager
async def lifespan(app):
    await broker.start()
    yield
    await broker.close()


fastapi_app = FastAPI(
    lifespan=lifespan
)
faststream_app = FastStream(
    broker=broker
)


async def run_job(message):
    print(f"Message processed: {message}")


broker.subscriber("run_job")(run_job)


async def run_job_http_endpoint():
    await broker.publish(queue="run_job", message={"job": "run"})
    return {"message": "Job processed successfully"}


fastapi_app.post("/run-job")(run_job_http_endpoint)

Expected behavior
When publishing, if the connection is in an invalid state, the operation fails fast raising some sort of exception.

Observed behavior
When publishing, if the connections is lost after its established correctly, the operation hangs forever.

Environment

$ faststream -v
Running FastStream 0.5.11 with CPython 3.10.12 on Darwin

Additional context
This looks like its linked to an open issue with aio-pika that could be the source of this one, because a check to the connection's state prior to publishing mitigates the issue, i.e., if broker._connection.connected.is_set() is False.

@sfran96 sfran96 added the bug Something isn't working label Jul 10, 2024
@Lancetnik Lancetnik added the RabbitMQ Issues related to `faststream.rabbit` module and RabbitMQ broker features label Jul 10, 2024
@Lancetnik
Copy link
Member

@sfran96 thx for such detail report! Indeed, seems like the problem on aio-pika side: I reported Issue and pinged Mosquito already. Also I am planning to help him with a fix, when we discuss, how the problem should be fixed. Wait a bit for it, please

@Lancetnik
Copy link
Member

Should be closed with tomorrow aio-pika release

@gaby
Copy link

gaby commented Aug 13, 2024

AioPika fix was released today v9.4.3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working RabbitMQ Issues related to `faststream.rabbit` module and RabbitMQ broker features
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants