You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I use uvicorn to start my Fastapi program, there is a long task in it, I use aiomultiprocess to speed up my httpx requests, but it always stops after the program runs for a while, I can't find the problem, nest Tried some configurations but nothing helped
Details
import httpx
from aiomultiprocess import Pool
from elasticsearch import AsyncElasticsearch
from fastapi import APIRouter, status, Depends, BackgroundTasks
from models import Article
api = APIRouter()
async def requests(data):
async with httpx.AsyncClient() as sess:
resp = await sess.post(settings.URL, data=data)
return resp.text
async def create_datas(data: list):
list_article = [Article(**item) for item in data]
await Article.bulk_create(list_article)
This function will be called multiple times in the service
async def run_tasks(start_id, es=None):
data = await get_datas(start_id, es) # get data from es
if data:
data_list = []
async with Pool(processes=6,
maxtasksperchild=800,
childconcurrency=6
) as pool:
async for result in pool.map(requests, data):
data_list.append(result)
await create_datas([i for i in data_list if i])
I use uvicorn to start my Fastapi program, there is a long task in it, I use aiomultiprocess to speed up my httpx requests, but it always stops after the program runs for a while, I can't find the problem, nest Tried some configurations but nothing helped
Details
import httpx
from aiomultiprocess import Pool
from elasticsearch import AsyncElasticsearch
from fastapi import APIRouter, status, Depends, BackgroundTasks
from models import Article
api = APIRouter()
async def requests(data):
async with httpx.AsyncClient() as sess:
resp = await sess.post(settings.URL, data=data)
return resp.text
async def create_datas(data: list):
list_article = [Article(**item) for item in data]
await Article.bulk_create(list_article)
This function will be called multiple times in the service
async def run_tasks(start_id, es=None):
data = await get_datas(start_id, es) # get data from es
if data:
data_list = []
async with Pool(processes=6,
maxtasksperchild=800,
childconcurrency=6
) as pool:
async for result in pool.map(requests, data):
data_list.append(result)
await create_datas([i for i in data_list if i])
async def task_process(start, es):
while start < 140000000:
start_time = time.time()
await run_tasks(start, es)
all_time = time.time() - start_time
LOG.info(f'id: {start} - {start + 800} The time spent: {all_time}')
start += 800
@api.get('/tasks/')
async def trans2(background_tasks: BackgroundTasks, start: int, es: AsyncElasticsearch = Depends(get_es)):
if start:
background_tasks.add_task(task_process, start, es)
return my_response(data='', message='starting', code=200, sta=status.HTTP_200_OK)
How can I configure it so that it can run stably?
The text was updated successfully, but these errors were encountered: