Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] - Eventloop busy with CPU bound operations #3454

Closed
a-sharma11 opened this issue Dec 18, 2018 · 11 comments
Closed

[Question] - Eventloop busy with CPU bound operations #3454

a-sharma11 opened this issue Dec 18, 2018 · 11 comments
Labels

Comments

@a-sharma11
Copy link

a-sharma11 commented Dec 18, 2018

Scenario - I have an aiohttp service with two endpoints 1) /health which is called on a regular 3 second internal (with timeout of 3 seconds) by some external container orchestration service to know that service is responding. If there are 3 consecutive health check failures, it will think the service is in a bad state and would restart it. 2) /transform - this is an api that call many external endpoint to fetch data and then run some pandas operations.

For /transform - the call to external endpoints are fine but the actual pandas work can be CPU bound and puts the event loop in a busy state. When that happens, the health end point starts to fails and the orchestration service would restart the container.

SampleCode:

async def transformStock():
	stocks = await getStockData()
	securities = await getSecurityData()
	#…create data frames and run join operations
	return stockDF

async def appendCountryInfo(stockDF):
	countries = await GetCountryData()
	countryDF = stockDF.merge(countries…)
	return countryDF
	
#…..and more transformation functions like above

#the /transform endpoint calls them

async def transform(self, request: web.Request):
	stocksDF = await transformStock()
	countryDF = await appendCountryInfo(stocksDF)
	# ..and more such transformation calls
	return web.Response(finalDF.to_json())


app.router.add_post(‘/transform’, transform)

I would like the main event loop which aiohttp is using to not be hijacked by the long running CPU operations. From asyncio docs seems like the strategy for running CPU bound operations should be to run it inside ProcessPoolExecutor as shown in the link below but the problem is that all these functions has async syntax.

https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools

Is it possible to use two event loops? One for main aiohttp and other for running these transformation functions? Or if there is any other way possible to achieve the same? Some working example would be great!

Thanks!

@aio-libs-bot
Copy link

GitMate.io thinks the contributor most likely able to help you is @asvetlov.

Possibly related issues are #2992 (question), #172 (Question about ServerHttpProtocol.closing()), #3066 (Question: What's timeout?), #1325 ([QUESTION] WebSocketResponse timeout parameter), and #2482 ([Question] primarydomain for aiohttp project).

@asvetlov
Copy link
Member

I guess a standard thread pool executor should be fine for pandas. Check it out.

You don't need two loops even for process executor, just create an instance and pass it into run_in_executor

@a-sharma11
Copy link
Author

Thanks @asvetlov
I am not sure how to use asyn function with ThreadPoolExecutor

loop = asyncio.get_event_loop()
result = await loop.run_in_executor(pool, await transform())

the second await inside run_in_executor is now allowed. I can create a sync function as entry point for ThreadPoolExecutor but not sure how to call async function from sync function

@asvetlov
Copy link
Member

async def transformStock():
	stocks = await getStockData()
	securities = await getSecurityData()
	#…create data frames and run join operations
	return stockDF

async def appendCountryInfo(stockDF):
	countries = await GetCountryData()
        loop = asyncio.get_event_loop()
	countryDF = await loop.run_in_executor(None, stockDF.merge, countries, …)
	return countryDF
	
#…..and more transformation functions like above

#the /transform endpoint calls them

async def transform(self, request: web.Request):
	stocksDF = await transformStock()
	countryDF = await appendCountryInfo(stocksDF)
	# ..and more such transformation calls
	return web.Response(finalDF.to_json())


app.router.add_post(‘/transform’, transform)

@a-sharma11
Copy link
Author

a-sharma11 commented Dec 18, 2018

This would require a lot of refactoring to the code. I wonder if there is any better way to do it at high level. I tried something like this:

# create a high level async function that calls all the lower level async function - so no code changes required downstream
async def transform_async():
    stocksDF = await transformStock()
	countryDF = await appendCountryInfo(stocksDF)
	# ..and more such transformation calls
	return finalDF.to_json()

# create a sync function to call transform_async and wait for it to complete. This will be called from ThreadPoolExecutor
def transform_sync():
    loop = asyncio.new_event_loop()
    loop.run_until_complete(transform_async())
    return future.result()

# The request handler now just calls run_in_executor with transform_sync
async def transform(self, request: web.Request):
    result = await loop.run_in_executor(pool, transform_sync)
    return web.Response(result)

This obviously not working. But if there is a way to do it at high level, it would make my life much easier.

@asvetlov
Copy link
Member

You have to run any blocking code in an executor.
Nested loops are not allowed.
Running async code back from synchronous one is technically possible but the API is very low level and tricky, I suggest to avoid it.
That's how asyncio works, the problem is not specific to aiohttp itself.

@a-sharma11
Copy link
Author

Agree the issue is not aiohttp specific. Thanks @asvetlov for help!

@asvetlov
Copy link
Member

Welcome!

@samuelcolvin
Copy link
Member

The simplest solution is to call run_in_executor only for the (synchronous) CPU intensive code, so in your first example.

async def transformStock():
	stocks = await getStockData()
	securities = await getSecurityData()
	stockDF = await loop.run_in_executor(None, create_dataframe_etc)
	return stockDF

As it happens I was implementing almost this exact thing this morning, see here (Note, while this is tested and working I haven't probed its performance at all).

@a-sharma11
Copy link
Author

Yup looks like i will have to refactor the code to add run_in_executor for CPU intensive code blocks. Thanks @samuelcolvin for the example. The problem for me is that its spread all over and we have 100s of these transformation functions which performs both - fetching the data and running CPU intensive work.

@lock
Copy link

lock bot commented Dec 18, 2019

This thread has been automatically locked since there has not been
any recent activity after it was closed. Please open a new issue for
related bugs.

If you feel like there's important points made in this discussion,
please include those exceprts into that new issue.

@lock lock bot added the outdated label Dec 18, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Dec 18, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

4 participants