-
Notifications
You must be signed in to change notification settings - Fork 403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cannot use Connection.transaction() in a manually started transaction #978
Comments
I recently started getting that exception, too! InterfaceError: cannot use Connection.transaction() in a manually started transaction
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 714, in _start_transaction
await self._transaction.start()
File "asyncpg/transaction.py", line 101, in start
raise apg_errors.InterfaceError(
AsyncAdapt_asyncpg_dbapi.InterfaceError: <class 'asyncpg.exceptions._base.InterfaceError'>: cannot use Connection.transaction() in a manually started transaction
File "sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
return fn()
File "sqlalchemy/pool/base.py", line 325, in connect
return _ConnectionFairy._checkout(self)
File "sqlalchemy/pool/base.py", line 921, in _checkout
result = pool._dialect.do_ping(fairy.dbapi_connection)
File "sqlalchemy/engine/default.py", line 703, in do_ping
cursor.execute(self._dialect_specific_select_one)
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
self._adapt_connection.await_(
File "sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
await adapt_connection._start_transaction()
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
self._handle_exception(error)
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
raise translated_error from error
InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot use Connection.transaction() in a manually started transaction
(Background on this error at: https://sqlalche.me/e/14/rvf5)
File "starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "starlette/routing.py", line 706, in __call__
await route.handle(scope, receive, send)
File "starlette/routing.py", line 276, in handle
await self.app(scope, receive, send)
File "starlette/routing.py", line 66, in app
response = await func(request)
File "fastapi/routing.py", line 225, in app
solved_result = await solve_dependencies(
File "fastapi/dependencies/utils.py", line 533, in solve_dependencies
solved = await call(**sub_values)
File "core/utils/middlewares/auth.py", line 57, in __call__
user: UserDTO | None = await self._get_user_from_db(decoded_access_token["sub"])
File "core/utils/middlewares/auth.py", line 97, in _get_user_from_db
user: UserDTO | None = await self.users_repo.get_user(UserQueryFilters(uuid=user_uuid))
File "core/data/postgresql/repositories/users.py", line 72, in get_user
user: User = await self.session.scalar(stmt)
File "sqlalchemy/ext/asyncio/session.py", line 240, in scalar
result = await self.execute(
File "sqlalchemy/ext/asyncio/session.py", line 214, in execute
result = await greenlet_spawn(
File "sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "sqlalchemy/orm/session.py", line 1713, in execute
conn = self._connection_for_bind(bind)
File "sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
return self._transaction._connection_for_bind(
File "sqlalchemy/orm/session.py", line 747, in _connection_for_bind
conn = bind.connect()
File "sqlalchemy/future/engine.py", line 406, in connect
return super(Engine, self).connect()
File "sqlalchemy/engine/base.py", line 3315, in connect
return self._connection_cls(self, close_with_result=close_with_result)
File "sqlalchemy/engine/base.py", line 96, in __init__
else engine.raw_connection()
File "sqlalchemy/engine/base.py", line 3394, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
File "sqlalchemy/engine/base.py", line 3364, in _wrap_pool_connect
Connection._handle_dbapi_exception_noconnection(
File "sqlalchemy/engine/base.py", line 2198, in _handle_dbapi_exception_noconnection
util.raise_(
File "sqlalchemy/util/compat.py", line 210, in raise_
raise exception
File "sqlalchemy/engine/base.py", line 3361, in _wrap_pool_connect
return fn()
File "sqlalchemy/pool/base.py", line 325, in connect
return _ConnectionFairy._checkout(self)
File "sqlalchemy/pool/base.py", line 921, in _checkout
result = pool._dialect.do_ping(fairy.dbapi_connection)
File "sqlalchemy/engine/default.py", line 703, in do_ping
cursor.execute(self._dialect_specific_select_one)
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
self._adapt_connection.await_(
File "sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
await adapt_connection._start_transaction()
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
self._handle_exception(error)
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
raise translated_error from error In addition I periodically get two more errors: InterfaceError: cannot perform operation: another operation is in progress
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 739, in commit
self.await_(self._transaction.commit())
File "sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "asyncpg/transaction.py", line 211, in commit
await self.__commit()
File "asyncpg/transaction.py", line 179, in __commit
await self._connection.execute(query)
File "asyncpg/connection.py", line 317, in execute
return await self._protocol.query(query, timeout)
File "asyncpg/protocol/protocol.pyx", line 323, in query
self._check_state()
File "asyncpg/protocol/protocol.pyx", line 707, in asyncpg.protocol.protocol.BaseProtocol._check_state
raise apg_exc.InterfaceError(
AsyncAdapt_asyncpg_dbapi.InterfaceError: <class 'asyncpg.exceptions._base.InterfaceError'>: cannot perform operation: another operation is in progress
File "sqlalchemy/engine/base.py", line 1089, in _commit_impl
self.engine.dialect.do_commit(self.connection)
File "sqlalchemy/engine/default.py", line 686, in do_commit
dbapi_connection.commit()
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 741, in commit
self._handle_exception(error)
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
raise translated_error from error
InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot perform operation: another operation is in progress
(Background on this error at: https://sqlalche.me/e/14/rvf5)
File "starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "fastapi/middleware/asyncexitstack.py", line 15, in __call__
async with AsyncExitStack() as stack:
File "contextlib.py", line 714, in __aexit__
raise exc_details[1]
File "contextlib.py", line 697, in __aexit__
cb_suppress = await cb(*exc_details)
File "contextlib.py", line 206, in __aexit__
await anext(self.gen)
File "core/data/postgresql/engine.py", line 24, in get_db
async with async_session() as session, session.begin():
File "sqlalchemy/ext/asyncio/session.py", line 713, in __aexit__
await greenlet_spawn(
File "sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "sqlalchemy/engine/util.py", line 235, in __exit__
with util.safe_reraise():
File "sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "sqlalchemy/util/compat.py", line 210, in raise_
raise exception
File "sqlalchemy/engine/util.py", line 233, in __exit__
self.commit()
File "sqlalchemy/orm/session.py", line 836, in commit
trans.commit()
File "sqlalchemy/engine/base.py", line 2459, in commit
self._do_commit()
File "sqlalchemy/engine/base.py", line 2649, in _do_commit
self._connection_commit_impl()
File "sqlalchemy/engine/base.py", line 2620, in _connection_commit_impl
self.connection._commit_impl()
File "sqlalchemy/engine/base.py", line 1091, in _commit_impl
self._handle_dbapi_exception(e, None, None, None, None)
File "sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
util.raise_(
File "sqlalchemy/util/compat.py", line 210, in raise_
raise exception
File "sqlalchemy/engine/base.py", line 1089, in _commit_impl
self.engine.dialect.do_commit(self.connection)
File "sqlalchemy/engine/default.py", line 686, in do_commit
dbapi_connection.commit()
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 741, in commit
self._handle_exception(error)
File "sqlalchemy/dialects/postgresql/asyncpg.py", line 682, in _handle_exception
raise translated_error from error The second: AttributeError: 'NoneType' object has no attribute 'twophase'
File "starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "starlette/routing.py", line 706, in __call__
await route.handle(scope, receive, send)
File "starlette/routing.py", line 276, in handle
await self.app(scope, receive, send)
File "starlette/routing.py", line 66, in app
response = await func(request)
File "fastapi/routing.py", line 225, in app
solved_result = await solve_dependencies(
File "fastapi/dependencies/utils.py", line 533, in solve_dependencies
solved = await call(**sub_values)
File "core/utils/middlewares/auth.py", line 57, in __call__
user: UserDTO | None = await self._get_user_from_db(decoded_access_token["sub"])
File "core/utils/middlewares/auth.py", line 97, in _get_user_from_db
user: UserDTO | None = await self.users_repo.get_user(UserQueryFilters(uuid=user_uuid))
File "core/data/postgresql/repositories/users.py", line 65, in get_user
result: User = await self.session.scalar(stmt)
File "sqlalchemy/ext/asyncio/session.py", line 240, in scalar
result = await self.execute(
File "sqlalchemy/ext/asyncio/session.py", line 214, in execute
result = await greenlet_spawn(
File "sqlalchemy/util/_concurrency_py3k.py", line 128, in greenlet_spawn
result = context.switch(value)
File "sqlalchemy/orm/session.py", line 1713, in execute
conn = self._connection_for_bind(bind)
File "sqlalchemy/orm/session.py", line 1552, in _connection_for_bind
return self._transaction._connection_for_bind(
File "sqlalchemy/orm/session.py", line 754, in _connection_for_bind
if self.session.twophase and self._parent is None: I don't know if there is a connection between them, but they first appeared at the same time. asyncpg: 0.27.0 Here's how I determine the connection to the database: from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from src.app.config import config
engine = create_async_engine(
config.POSTGRES_DSN,
future=True,
echo=True,
pool_size=config.POSTGRES_MAX_CONNECTIONS,
max_overflow=config.POSTGRES_CONNECTIONS_OVERFLOW,
pool_pre_ping=True,
pool_recycle=300,
connect_args={"server_settings": {"application_name": f"{config.APP_NAME.lower()}[{config.ENVIRONMENT.lower()}]"}},
)
async_session = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
async def get_db() -> AsyncSession:
async with async_session() as session, session.begin():
yield session here's my Users repository (i use repository pattern in my data layer) (Note that here I pass AsyncSession using fastapi's Depends feature): from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from .engine import get_db
from .models import User
from .dto_models import UserDTO, UserQueryFilters
class UsersRepo:
"""Repository class for work with user entities in PostgreSQL"""
def __init__(self, session: AsyncSession):
self.session: AsyncSession = session
def convert_to_dto(self, obj: User, schema):
return schema.from_orm(obj) if obj else None
async def get_user(
self,
filters: UserQueryFilters,
) -> UserDTO | None:
stmt = select(User)
schema = UserDTO
user: User = await self.session.scalar(stmt)
return self.convert_to_dto(user, schema) if user else None
def get_users_repo(db: AsyncSession = Depends(get_db)) -> UsersRepo:
return UsersRepo(db) here's the middlware dealing with authorization (Note that here I pass UsersRepo using fastapi's Depends feature): ...
class Authenticate:
async def __call__(
self,
encoded_access_token: str = Security(oauth2_scheme),
idp_client: IDPService = Depends(get_idp_client),
users_repo: UsersRepo = Depends(get_users_repo),
):
self.idp_client = idp_client
self.users_repo = users_repo
decoded_access_token: t.Dict[str, t.Any] = self._validate_access_token(encoded_access_token)
groups, permissions = await self._get_user_roles_permissions(encoded_access_token)
user: UserDTO | None = await self._get_user_from_db(decoded_access_token["sub"])
if not user:
raise AccessDeniedError("User account was not found.")
return UserIdentity(**user.dict(), groups=groups, permissions=permissions)
@staticmethod
def _validate_access_token(access_token: str):
...
async def _get_user_roles_permissions(self, access_token: str):
...
async def _get_user_from_db(self, user_uuid) -> UserDTO | None:
"""Get user from db by uuid"""
user: UserDTO | None = await self.users_repo.get_user(UserQueryFilters(uuid=user_uuid))
return user
authenticate = Authenticate() here's how I use this middleware in the requests handler (Note that here I pass authenticate using fastapi's Depends feature): ...
from middlewares.auth import UserIdentity, authenticate
routes: APIRouter = APIRouter()
@routes.get(
"/devices",
response_model=List[GetDevicesResp],
summary="Get devices",
description="Gives devices",
)
async def get_devices(
ids: Set[NonNegativeInt] = Query(None, alias="id", description="devices IDs", example=[1, 12, 23]),
name: str = Query(None, description="device name", example="Tablet"),
use_case: GetDevicesUseCase = Depends(get_devices_usecase),
user: UserIdentity = Depends(authenticate),
) -> List[GetDevicesResp]:
req_obj: GetDevicesReq = GetDevicesReq(user=user, ids=ids, name=name)
result: List[GetDevicesResp] = await use_case.execute(req_obj)
return result besides that, I also pass the db session into the use_case instance (note, in the example above, besides authenticate I also use Depends for get_devices_usecase): import typing as t
from logging import getLogger, Logger
from fastapi import Depends
from pydantic import NonNegativeInt, NoneStr
from sqlalchemy.ext.asyncio import AsyncSession
from src.app.config import config
from src.data.postgresql.engine import get_db
from src.data.postgresql.repositories.devices import DevicesRepo
from src.data.postgresql.repositories.devices_dto import DevicesFilters, DevicesDTO
from src.use_cases.base import BaseUseCase
from src.utils.helpers.serialization_additions import OurBaseModel
from src.utils.middlewares.auth import UserIdentity
logger: Logger = getLogger(config.APP_NAME)
class GetDevicesReq(OurBaseModel):
"""Object of input data"""
ids: t.Set[int] | None
name: NoneStr
user: UserIdentity
class GetDevicesResp(OurBaseModel):
"""Object of output data"""
id: NonNegativeInt # noqa A003
name: str
class GetDevicesUseCase(BaseUseCase):
def __init__(self, devices_repo: DevicesRepo):
self.devices_repo: DevicesRepo = devices_repo
async def execute(self, req: GetDevicesReq) -> t.List[GetDevicesResp]:
"""
Implements the business logic of getting devices
:param req: object with input data
:return: devices
"""
filters: DevicesFilters = DevicesFilters(ids=req.ids, name=req.name)
devices: t.List[DevicesDTO] = await self.devices_repo.get_devices(filters)
return [GetDevicesResp(**_.dict()) for _ in devices]
async def get_devices_usecase(db: AsyncSession = Depends(get_db)) -> GetDevicesUseCase:
"""Input Boundary for GetDevicesUseCase"""
return GetDevicesUseCase(devices_repo=DevicesRepo(db)) here's get_devices_usecase in turn gets the db session, also using fastapi's Depends for the previously mentioned session generator get_db |
This could be a SQLAlchemy bug. @zzzeek, ideas? |
the first issue is that SQLAlchemy adapts asyncpg to behave like the Python dbapi which auto-begins by default, so an operation will be using connection.transaction() internally. as far as why one would get the error described, only if they are either manipulating the connection directly to do such a thing, or they are using connections / ORM sessions in a thread-unsafe or concurrency-unsafe manner in some way. the second issue, a more fine grained example of an asyncpg connection being used in a thread- or concurrency- unsafe manner in some way. overall we do see a lot of people using FastAPI having various integration issues though I don't know that there's any intrinsic issue in any of the components used. They should likely be reporting to FastAPI with more complete examples. |
Indeed I've seen issues like this reported for FastAPI quite a bit, but it's really hard to pinpoint where the issue originates from given all of the wrapping and concurrency stuff going on. For example, this code may or may not split a context manager across different threads depending on server load (how busy the thread pool is). If there's context variables going around, things get even trickier. I'm happy to help debug things on the FastAPI/Starlette side but it would be really helpful to have more reproducible examples (even if the example consists of starting a server and loading it with |
oh wow, that is something. async and threadpools in the framework level. Yeah that kind of thing is kind of like the "giant fusion reaction suspended in a magnetic-field because it would instantly vaporize any solid material it touches" level of tricky. So I think FastAPI developers should be the go-to for these issues. |
hi @alexted - this issue has nothing to do with asyncpg at this stage so you likely dont want to be continuing on here. From all indications this is a heavily FastAPI-dependent issue and you should seek help from FastAPI experts. |
sure, though SQLAlchemy did fix an issue with this recently as of 1.4.41 (see sqlalchemy/sqlalchemy#8419), that was pretty much the main "connections can leak" path for asyncpg as a request that was interrupted or otherwise failed would send the connection to be garbage collected, and we didn't know how to "terminate" it without an event loop. now we do. So, there should be a lot less paths for "leaking connections" now. not sure if these are production loads or not. you would want to get a reproduction case that works with artificial loads so you can test things and also look around in the code / print things to get a better sense of what's going on. such as, just try using NullPool instead of QueuePool, things like that. |
Well, now I understand "where the legs grow from". Thank you for your participation. |
Currently my team is in a different phase of development so I didn't have time to check on the connections lately, but seems like next time I encounter this issue I would look into FastAPI communities. Thanks for the help! |
Hi, I had a similar issue on a project I'm working on. asyncpg version: 0.25.0, AWS RDS 12.8, sqlalchemy 1.4.44, fastapi 0.65.3. I downgraded sqlalchemy from Note that I also tried downgrading sqlalchemy to Maybe this will be useful to someone. |
@bogdanp05 your solution works and helped us a lot, but as you said under load we occasionally get this error. Do you know a way to solve it without having
|
@unkindypie unfortunately not and I don't have access to the source code where the problem appeared anymore:( |
@bogdanp05 sad but thank you for the reply. I suppose it's a FastAPI concurrency issue related to this thread fastapi/fastapi#5707 |
If you're using asyncio.gather, that might cause this bug with async sessions. https://stackoverflow.com/questions/74313692/fastapi-asyncpg-sqlalchemy-cannot-use-connection-transaction-in-a-manual |
the issue with a local PostgreSQL install?: -
uvloop?: Yes (asyncio and uvloop used with FastAPI)
I'm seeing
sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot use Connection.transaction() in a manually started transaction
error, but with some inconsistency.I have a backend server running(built with FastAPI), and one of the endpoint does the following:
It is after the INSERT statement that i'm getting the error. But the funny thing is, about 1/5 times I don't get that error and the endpoint finishes successfully. I searched about the error but found little success.
I dont' have code that 'explicitly' start any transactions. By looking at the logs, I do see 'implicit' begin statements(created by
asyncpg
I assume?). Any idea how I could fix this error? First I thought maybe me not committing after the SELECT could be the issue, but now I'm completely lost after some of them completed successfully.The text was updated successfully, but these errors were encountered: