Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: background ingestion in LOCAL mode #321

Merged
merged 13 commits into from
Sep 6, 2024

Conversation

kunwar31
Copy link
Contributor

@kunwar31 kunwar31 commented Sep 5, 2024

Solves #319

backend/indexer/indexer.py Outdated Show resolved Hide resolved
backend/indexer/indexer.py Outdated Show resolved Hide resolved
backend/indexer/indexer.py Outdated Show resolved Hide resolved
backend/indexer/indexer.py Outdated Show resolved Hide resolved
@kunwar31
Copy link
Contributor Author

kunwar31 commented Sep 6, 2024

@chiragjn The main issue is that there is no "manager" for the process i created, and I have intentionally not added it, as this is only meant for LOCAL ingestions.

  1. I can add another deamon thread to manage any running background process, which can update the status as FAILED if the process dies unexpectedly
  2. However, the above solution will again add futher unwanted complexity. I agree that long term we need a proper solution (based on queues + background workers)

@kunwar31
Copy link
Contributor Author

kunwar31 commented Sep 6, 2024

@chiragjn changed to use ProcessPoolExecutor, initialized in the lifespan context manager of fastAPI.
number of workers can be set using PROCESS_POOL_WORKERS environment variable (have set default to 4 in compose.env, 1 in Settings class)
Now, before fastAPI shuts down it will ensure the there are no tasks running in the process pool.
However, unexpected termination of ingestion process will still not be tracked

@chiragjn
Copy link
Member

chiragjn commented Sep 6, 2024

However, unexpected termination of ingestion process will still not be tracked

That's alright, we have to start somewhere :)

Comment on lines 375 to 376
from backend.server.app import process_pool
# future of this submission is ignored, ingestion failure due to process termination will not be tracked
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not cross import
it is fine to pass pool as input arg which is by default None

Copy link
Member

@chiragjn chiragjn Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to keep all the code from main as is and use something on lines of

loop = asyncio.get_running_loop()
coro = loop.run_in_executor(sync_data_source_to_collection(...), pool, ...)
if pool is None:
    await coro
else:
    asyncio.create_task(coro)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chiragjn run_in_executor is used to run synchronous code, so sync_data_source_to_collection can't be async
further, asyncio.create_task(coro) will again block the event loop (waiting for coro to finish at some point)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_in_executor is used to run synchronous code

You are correct, I am not framing it correctly, give me some time, I'll get back on this

Copy link
Contributor Author

@kunwar31 kunwar31 Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chiragjn I spend a few hours thinking about it too, I know you want to run the whole ingestion in the same async event loop, but there are blocking synchronous parts in the sync_data_source_to_collection function, mainly the getting of chunks from unstructured_api, which will keep blocking other tasks in the event loop.

this method get_chunks in
backend.modules.parsers.unstructured_io.UnstructuredIoParser
does a very long POST call which blocks everything

if we make somehow make this request non-blocking, we would be in a better shape, but there still wont be any guarantee that it doesn't block the main event loop.
Let me know your thoughts.

Copy link
Contributor Author

@kunwar31 kunwar31 Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not cross import it is fine to pass pool as input arg which is by default None

@chiragjn added a process_pool module to make this cleaner. At some later stage this can be refactored into a queue based worker pool, all its references to pool.submit will still work.

Copy link
Member

@chiragjn chiragjn Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you want to run the whole ingestion in the same async event loop,

Actually I don't want to in run in same loop. I want to run stuff in a separate process, my only hesistation is wrapping all async coroutines to be sync again, just trying to see if we can somehow avoid that. Worst case we have to wrap and just accept it

this method get_chunks in
backend.modules.parsers.unstructured_io.UnstructuredIoParser
does a very long POST call which blocks everything

if we make somehow make this request non-blocking, we would be in a better shape, but there still wont be any guarantee that it doesn't block the main event loop.

Yes this and plus the whole vector store implementation also needs to be made async.
At least the unstructured API call should be easy to change, instead of requests we just have to use asyncio httpx or aiohttp

but there still wont be any guarantee that it doesn't block the main event loop.

That is true, some bad parser can still block, hence eventually we will move away to a queue and workers system.

@kunwar31
Copy link
Contributor Author

kunwar31 commented Sep 6, 2024

Actually I don't want to in run in same loop. I want to run stuff in a separate process, my only hesistation is wrapping all async coroutines to be sync again, just trying to see if we can somehow avoid that. Worst case we have to wrap and just accept it

@chiragjn I think i've understood what you meant. I have simplified the diff, created a generic AsyncProcessPoolExecutor in process_pool module which handles all the steps. no need to wrap individual functions. the initial coroutine is still wrapped though.

The only way to avoid this wrapping hack is to make all synchronous parts of the async method asynchronous, which will be a much longer task. and if there are CPU-bound sync parts, run them in process_pool

backend/indexer/indexer.py Outdated Show resolved Hide resolved
@chiragjn
Copy link
Member

chiragjn commented Sep 6, 2024

We are almost there, I like this version of the code, just some code organizing changes. Thanks for being really patient and iterating with us!

@kunwar31
Copy link
Contributor Author

kunwar31 commented Sep 6, 2024

We are almost there, I like this version of the code, just some code organizing changes. Thanks for being really patient and iterating with us!

@chiragjn Refactored

  1. AsyncProcessPoolExecutor is now in utils.py
  2. api.state contains the process_pool, which is passed to ingestion function as an optional arg
  3. had to "hack" around in router's ingest method to get the app.state, not ideal

@kunwar31
Copy link
Contributor Author

kunwar31 commented Sep 6, 2024

Tested this by running 4 ingestion jobs, 2 data sources and 2 collections, all 4 run in background parallely (workers=4) and backend is responsive

Copy link
Member

@chiragjn chiragjn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉
Thanks a lot for this contribution

@chiragjn chiragjn merged commit c58dafc into truefoundry:main Sep 6, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants