Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async support #1714

Merged
merged 27 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
05ebfa5
async support (work in progress)
miguelgrinberg Mar 8, 2024
74eccd7
minor refactoring, fixed linter and docs
miguelgrinberg Mar 11, 2024
8fcf16c
validate _sync directory during lint phase
miguelgrinberg Mar 11, 2024
78a4f72
search unit tests
miguelgrinberg Mar 13, 2024
5f9eb27
async support for Document class
miguelgrinberg Mar 13, 2024
db05fc2
search integration tests
miguelgrinberg Mar 13, 2024
85a1f41
document tests
miguelgrinberg Mar 14, 2024
a0fb061
async support for Index class
miguelgrinberg Mar 14, 2024
a3fa07d
document integration tests
miguelgrinberg Mar 14, 2024
326f965
remove unused mock dependency
miguelgrinberg Mar 19, 2024
90c8eb5
unasync support for update_by_query
miguelgrinberg Mar 19, 2024
071ddea
remove star imports
miguelgrinberg Mar 19, 2024
eca6306
unasync support for mapping.py
miguelgrinberg Mar 19, 2024
73ffe5a
unasync index integration tests
miguelgrinberg Mar 19, 2024
3690105
unasync faceted search
miguelgrinberg Mar 20, 2024
0184a77
review imports for consistency
miguelgrinberg Mar 20, 2024
8add69f
async support for analyzer
miguelgrinberg Mar 20, 2024
4b7e9fb
async examples
miguelgrinberg Mar 21, 2024
38cd862
examples integration tests
miguelgrinberg Mar 22, 2024
7386dd5
restore sync examples unasynced by mistake
miguelgrinberg Mar 22, 2024
8b5fa41
Documentation updates
miguelgrinberg Mar 22, 2024
2468e8a
Review feedback
miguelgrinberg Mar 27, 2024
d06d804
more review feedback
miguelgrinberg Mar 28, 2024
7bdf7dc
another batch of review updates
miguelgrinberg Mar 29, 2024
9c8e63d
documentation reorg
miguelgrinberg Apr 2, 2024
c172281
more documentation improvements
miguelgrinberg Apr 2, 2024
6b56fb3
unasyncing of examples
miguelgrinberg Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion examples/alias_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def migrate(move_data=True, update_alias=True):
)


if __name__ == "__main__":
def main():
# initiate the default connection to elasticsearch
connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]])

Expand All @@ -143,3 +143,10 @@ def migrate(move_data=True, update_alias=True):

# create new index
migrate()

# close the connection
connections.get_connection().close()


if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion examples/async/alias_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
alias.
"""
import asyncio
import os
from datetime import datetime
from fnmatch import fnmatch

Expand Down Expand Up @@ -127,7 +128,7 @@ async def migrate(move_data=True, update_alias=True):

async def main():
# initiate the default connection to elasticsearch
async_connections.create_connection(hosts=["http://localhost:9200"])
async_connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]])

# create the empty index
await setup()
Expand Down
9 changes: 8 additions & 1 deletion examples/completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class Index:
settings = {"number_of_shards": 1, "number_of_replicas": 0}


if __name__ == "__main__":
def main():
# initiate the default connection to elasticsearch
connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]])

Expand All @@ -97,3 +97,10 @@ class Index:
# print out all the options we got
for option in response.suggest.auto_complete[0].options:
print("%10s: %25s (%d)" % (text, option._source.name, option._score))

# close the connection
connections.get_connection().close()


if __name__ == "__main__":
main()
12 changes: 10 additions & 2 deletions examples/composite_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ def run_search(**kwargs):

response = run_search()
while response.aggregations.comp.buckets:
yield from response.aggregations.comp.buckets
for b in response.aggregations.comp.buckets:
yield b
if "after_key" in response.aggregations.comp:
after = response.aggregations.comp.after_key
else:
after = response.aggregations.comp.buckets[-1].key
response = run_search(after=after)


if __name__ == "__main__":
def main():
# initiate the default connection to elasticsearch
connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]])

Expand All @@ -57,3 +58,10 @@ def run_search(**kwargs):
"File %s has been modified %d times, first seen at %s."
% (b.key.files, b.doc_count, b.first_seen.value_as_string)
)

# close the connection
connections.get_connection().close()


if __name__ == "__main__":
main()
13 changes: 11 additions & 2 deletions examples/parent_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def get_answers(self):
"""
if "inner_hits" in self.meta and "answer" in self.meta.inner_hits:
return self.meta.inner_hits.answer.hits
return list(self.search_answers())
return [a for a in self.search_answers()]

def save(self, **kwargs):
self.question_answer = "question"
Expand Down Expand Up @@ -208,7 +208,7 @@ def setup():
index_template.save()


if __name__ == "__main__":
def main():
# initiate the default connection to elasticsearch
connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]])

Expand Down Expand Up @@ -243,3 +243,12 @@ def setup():
)
question.save()
answer = question.add_answer(honza, "Just use `elasticsearch-py`!")

# close the connection
connections.get_connection().close()

return answer


if __name__ == "__main__":
main()
9 changes: 8 additions & 1 deletion examples/percolate.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,15 @@ def setup():
).save(refresh=True)


if __name__ == "__main__":
def main():
# initiate the default connection to elasticsearch
connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]])

setup()

# close the connection
connections.get_connection().close()


if __name__ == "__main__":
main()
9 changes: 8 additions & 1 deletion examples/search_as_you_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Index:
settings = {"number_of_shards": 1, "number_of_replicas": 0}


if __name__ == "__main__":
def main():
# initiate the default connection to elasticsearch
connections.create_connection(hosts=[os.environ["ELASTICSEARCH_URL"]])

Expand Down Expand Up @@ -92,3 +92,10 @@ class Index:
# print out all the options we got
for h in response:
print("%15s: %25s" % (text, h.name))

# close the connection
connections.get_connection().close()


if __name__ == "__main__":
main()
67 changes: 47 additions & 20 deletions utils/run-unasync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,33 @@
import os
import subprocess
import sys
from glob import glob
from pathlib import Path

import unasync


def main(check=False):
# the list of directories that need to be processed with unasync
# each entry has two paths:
# - the source path with the async sources
# - the destination path where the sync sources should be written
source_dirs = [
"elasticsearch_dsl",
"tests",
"tests/test_integration",
"tests/test_integration/test_examples",
(
"elasticsearch_dsl/_async/",
"elasticsearch_dsl/_sync/",
),
("tests/_async/", "tests/_sync/"),
(
"tests/test_integration/_async/",
"tests/test_integration/_sync/",
),
(
"tests/test_integration/test_examples/_async",
"tests/test_integration/test_examples/_sync/",
),
("examples/async/", "examples/"),
]
output_dir = "_sync" if not check else "_sync_check"

# Unasync all the generated async code
additional_replacements = {
Expand Down Expand Up @@ -58,11 +72,11 @@ def main(check=False):
}
rules = [
unasync.Rule(
fromdir=f"{source_dir}/_async/",
todir=f"{source_dir}/{output_dir}/",
fromdir=dir[0],
todir=f"{dir[0]}_sync_check/" if check else dir[1],
additional_replacements=additional_replacements,
)
for source_dir in source_dirs
for dir in source_dirs
]

filepaths = []
Expand All @@ -75,24 +89,37 @@ def main(check=False):
filepaths.append(os.path.join(root, filename))

unasync.unasync_files(filepaths, rules)

if check:
# make sure there are no differences between _sync and _sync_check
for source_dir in source_dirs:
for dir in source_dirs:
output_dir = f"{dir[0]}_sync_check/" if check else dir[1]
subprocess.check_call(["black", "--target-version=py38", output_dir])
subprocess.check_call(["isort", output_dir])
for file in glob("*.py", root_dir=dir[0]):
# remove asyncio from sync files
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few doubts about 1/ doing this for all files and not only examples and 2/ using a sed command carefully crafted to work on both Linux and macOS. This seems fragile, but it works, so let's do it.

Copy link
Collaborator Author

@miguelgrinberg miguelgrinberg Apr 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, 100% agree on the fragility. But I don't feel doing this for all files is a problem, since what we are doing is the correct thing to do as a general case. I could actually make the regex in the sed a bit smarter and capture the function name that is passed to asyncio.run() instead of having it hardcoded. I would think this type of transformation would make sense to add to unasync (although not with sed!).

subprocess.check_call(
["black", "--target-version=py38", f"{source_dir}/_sync_check/"]
["sed", "-i.bak", "/^import asyncio$/d", f"{output_dir}{file}"]
)
subprocess.check_call(["isort", f"{source_dir}/_sync_check/"])
subprocess.check_call(
[
"diff",
"-x",
"__pycache__",
f"{source_dir}/_sync",
f"{source_dir}/_sync_check",
"sed",
"-i.bak",
"s/asyncio\\.run(main())/main()/",
f"{output_dir}{file}",
]
)
subprocess.check_call(["rm", "-rf", f"{source_dir}/_sync_check"])
subprocess.check_call(["rm", f"{output_dir}{file}.bak"])

if check:
# make sure there are no differences between _sync and _sync_check
subprocess.check_call(
[
"diff",
f"{dir[1]}{file}",
f"{output_dir}{file}",
]
)

if check:
subprocess.check_call(["rm", "-rf", output_dir])


if __name__ == "__main__":
Expand Down
Loading