Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async support #1672

Closed
sla-te opened this issue Sep 7, 2023 · 7 comments
Closed

Async support #1672

sla-te opened this issue Sep 7, 2023 · 7 comments

Comments

@sla-te
Copy link

sla-te commented Sep 7, 2023

Thank you for upgrading the lib to support v8.x.

Maybe this is a good time to ask for officially adding async support according to https://elasticsearch-py.readthedocs.io/en/v8.9.0/async.html, as it is clearly a feature, that will be used by a lot of users (and already has quite a history of requests about it).

I found #1435 which was closed in favor of #1480, which is apparently already 2 years old.

That all being said, we have been using the following "hack", to achieve async support in the past (using v7.4.1):

import asyncio

from elasticsearch import AsyncElasticsearch
from elasticsearch_dsl import Search

config = {...: ...}
indexname = ...
es = AsyncElasticsearch(**config)


async def do_es_async_work():
    s = Search(index=indexname).filter(...).filter(...)
    r = await s.execute().to_dict()
    return r

print(asyncio.run(do_es_async_work()))

On trying it with the newly released version (v.8.9.0), this now sadly throws:

RuntimeWarning: coroutine 'AsyncElasticsearch.search' was never awaited

Upon looking at diffs between v7.4.1 and v.8.9.0, I was able to identify the root of the problem at

self._response = self._response_class(

It seems, that wrapping the response of the call to es.search in an object, now rendered this hack to fail, as body would only exist after awaiting es.search, if AsyncElasticsearch is used as connection. - Now the problem coming up with the new structure is, that I do not see a way to create a new "hack" to achieve passing through the async call through to AsyncElasticsearch directly, without starting to use e.g. asyncio.run_in_executor, which would cause a terrible overhead, as every async call would open a new thread, which kind of destroys the advantage of using async in this scenario if I understand this correctly.

@pquentin
Copy link
Member

pquentin commented Sep 7, 2023

Thank you for trying the new version and thank you for the heads up about async. I'm not yet ready to work on full async support, but what do you think about this totally unsupported hack?

import asyncio

from elasticsearch import AsyncElasticsearch
from elasticsearch_dsl import Search, connections


config = {...: ...}
indexname = ...
es = AsyncElasticsearch(**config)


class AsyncSearch(Search):
    async def execute_async(self, ignore_cache=False):
        if ignore_cache or not hasattr(self, "_response"):
            es = connections.get_connection(self._using)

            async_response = await es.search(
                index=self._index, body=self.to_dict(), **self._params
            )
            self._response = self._response_class(self, async_response.body)
        return self._response


async def do_es_async_work():
    s = AsyncSearch(using=es, index=indexname)
    r = (await s.execute_async()).to_dict()
    return r


print(asyncio.run(do_es_async_work()))

@sla-te
Copy link
Author

sla-te commented Sep 7, 2023

Thank you for the quick reply, yes that works of course but would also require refactoring our entire code base (across multiple repositories) which sometimes calls execute asynchronously and sometimes synchronously (and yes i know thats hacky :) )

@pquentin
Copy link
Member

pquentin commented Sep 8, 2023

Interesting. Can you please show me how you are using this today, with async and sync examples?

@pquentin
Copy link
Member

pquentin commented Sep 8, 2023

I have been thinking about this quite a lot because it could help with the design of the async support and using body when adding support for Elasticsearch 8 was not an obvious decision, and I'm considering returning the full ObjectApiResponse object that behaves like a dict in the next version.

That said, another totally unsupported hack with the current version is:

import asyncio

from elasticsearch import AsyncElasticsearch, Elasticsearch
from elasticsearch_dsl import Search, connections


config = {...: ...}
indexname = ...
async_es = AsyncElasticsearch(**config)
es = Elasticsearch(**config)


class AsyncSearch(Search):
    async def execute(self, ignore_cache=False):
        if ignore_cache or not hasattr(self, "_response"):
            es = connections.get_connection(self._using)

            if isinstance(es, AsyncElasticsearch):
                response = await es.search(
                    index=self._index, body=self.to_dict(), **self._params
                )
            else:
                response = es.search(
                    index=self._index, body=self.to_dict(), **self._params
                )
            self._response = self._response_class(self, response.body)
        return self._response


async def do_work(using):
    s = AsyncSearch(using=using, index=indexname)
    r = (await s.execute()).to_dict()
    print(r["hits"]["total"])


# async
asyncio.run(do_work(using=async_es))

# sync
def run_secretly_sync_async_fn(async_fn, *args, **kwargs):
    coro = async_fn(*args, **kwargs)
    try:
        coro.send(None)
    except StopIteration as exc:
        return exc.value
    else:
        raise RuntimeError("you lied, this async function is not secretly synchronous")


run_secretly_sync_async_fn(do_work, using=es)

It uses two tricks:

  • one to await or not depending on the Elasticsearch client instance,
  • one to run async functions like do_work or execute in a sync context, taking advantage of the fact that they are "secretly synchronous" (source)

@sla-te
Copy link
Author

sla-te commented Sep 12, 2023

Hey, sorry for the late reply, here is what I think:

The workaround for the current version is cool and would actually work (temporarily), nice tricks!

When adding async support I would not follow our rather hacky approach to make a function/method awaitable by passing through the connection object, instead I would define a separate module, that implements new classes with async support analogue to how python-playwright handles such a scenario (structure wise) (https://github.com/microsoft/playwright-python/tree/main/playwright). - Yes that will require the users to refactor their code (including us) but attempting to hack sync and async together would simply create too much potential for bugs.

Apart from the above I would separate out all logic , that is async/sync independant and bundle it in a top level module, to be used by both the async and the sync module. I would think of a structure like this:

image

Going with urllib3s idea to use composition and pass a kind of baseclass/backend, that defines the core logic is sure a valid approach but if you ask me it makes the the entire thing more (and unnecessarily) complex internally.

@miguelgrinberg
Copy link
Collaborator

Also #1355 #1624

@pquentin pquentin mentioned this issue Apr 3, 2024
16 tasks
@miguelgrinberg
Copy link
Collaborator

Done in #1714, will be part of the 8.13 release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants