Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The problem I encountered when using celery task to execute qdrant in fastApi #672

Open
WatermelonSumer opened this issue Jun 29, 2024 · 1 comment

Comments

@WatermelonSumer
Copy link

I`m using an asynchronous method to perform qdrant operations in the celery task, and set prefer_grpc to True, and the qdrant asynchronous client will be blocked. It should be caused by a problem with the event loop.
My code snippet:

@db_required
async def test(file_id: str):
    # some other code...
    try:
        file_obj: UploadFile = await OSSServices.read_file(file_res, file.file_name)
        collection_name = file.kb_id

        rag = RAGInterface(collection_name)
        # insert with metadata
        metadata = {
            "kb_id": file.kb_id,
            "file_name": file.file_name,
            "file_id": file.file_id,
            "created_at": file.created_at.strftime("%Y-%m-%d %H:%M:%S"),
        }
        await rag.insert(files=file_obj, kwargs=metadata)
        # vector parser success
    except Exception as e:
        logger.exception(f"[vector_create_task]:{file.file_name}\n{str(e)}")


@shared_task
def vector_create_task(file_id: str):
    """向量解析-提交"""

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    try:
        async_result = loop.run_until_complete(test(file_id=file_id))
        return async_result
    finally:
        loop.close()

In my RAGInterface, I made some business encapsulation based on qdrant api. After my debugging, I found that every time I called AsyncQdrantClient, I would get stuck. For example, when I execute: async_client.collection_exists(collection_name), I will be stuck in the qdrant source code:

  async def collection_exists(self, collection_name: str, **kwargs: Any) -> bool:
      if self._prefer_grpc:
          return (
              await self.grpc_collections.CollectionExists(
                  grpc.CollectionExistsRequest(collection_name=collection_name),
                  timeout=self._timeout,
              )
          ).result.exists
      result: Optional[models.CollectionExistence] = (
          await self.http.collections_api.collection_exists(collection_name=collection_name)
      ).result
      assert result is not None, "Collection exists returned None"
      return result.exists

code from : qdrant_client.async_qdrant_remote.AsyncQdrantRemote

I`m very confused in this problem, anybody can help me? thanks very much!!!

@joein
Copy link
Member

joein commented Jul 3, 2024

Hello @WatermelonSumer

I was not able to reproduce the issue, could you please provide a minimum reproducible example, so we could help you?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants