Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-29842: Make Executor.map less eager so it handles large/unbounded… #18566

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

graingert
Copy link
Contributor

@graingert graingert commented Feb 20, 2020

… input iterables appropriately

bugs.python.org/issue29842

bugs.python.org/issue29842

recreate of #707 with conflicts fixed and versionchanged updated to py3.9

/cc @MojoVampire

https://bugs.python.org/issue29842

Copy link

@rdarder rdarder left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi there! sorry for intruding. I was looking for this feature and decided to try this patch out. Works great! Just found a small issue so I sent you a comment.
Thank you!

except StopIteration:
argsiter = None
else:
fs.append(self.submit(fn, *args))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the executor has been shut down, this will raise:
cannot schedule new futures after shutdown
But, also the base executor holds no state, so at this level it'll be pretty hard to tell if the executor has been shut down or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdarder
this could catch that RuntimeError and just return, or the executor could keep a weakset of result_iterators and close them on shutdown

.. versionchanged:: 3.5
Added the *chunksize* argument.

.. versionchanged:: 3.9
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.. versionchanged:: 3.9
.. versionchanged:: 3.11

@Tronic
Copy link

Tronic commented Oct 29, 2021

While waiting for this to be included in Python, one can easily copy&paste the map function out of this version and call it with the executor as the first argument. Hoping that you get this merged soon!

@kumaraditya303 kumaraditya303 self-requested a review July 18, 2022 15:30
@kumaraditya303 kumaraditya303 added type-feature A feature request or enhancement 3.12 bugs and security fixes labels Jul 18, 2022
@kumaraditya303
Copy link
Contributor

kumaraditya303 commented Jul 23, 2022

@graingert Tests are failing, can you fix them and rebase to main? I'll then review and run through the buildbots.

else:
yield fs.pop().result(end_time - time.monotonic())
Copy link
Contributor Author

@graingert graingert Jul 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't cancel the currently waited on fut correctly: #95166

import contextlib
import functools
import concurrent.futures
import threading
import sys


def fn(num, stop_event):
    if num == 1:
        stop_event.wait()
        return "done 1"

    if num == 2:
        return "done 2"


def main():
    stop_event = threading.Event()
    log = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:

        def print_n_wait(ident):
            log.append(f"{ident=} started")
            try:
                stop_event.wait()
            finally:
                log.append(f"{ident=} stopped")

        fut = pool.submit(print_n_wait, ident="first")
        try:
            with contextlib.closing(pool.map(print_n_wait, ["second", "third"], timeout=1)) as gen:
                try:
                    next(gen)
                except concurrent.futures.TimeoutError:
                    print("timed out")
                else:
                    raise RuntimeError("timeout expected")
        finally:
            stop_event.set()

    assert log == ["ident='first' started", "ident='first' stopped"], f"{log=} is wrong"

if __name__ == "__main__":
    sys.exit(main())

@kumaraditya303
Copy link
Contributor

@graingert You can run make patchcheck to fix the docs CI.

@Jason-Y-Z
Copy link
Contributor

Hey all, I took the inspiration from this PR and continued the work in #114975

@ebonnal
Copy link

ebonnal commented Oct 17, 2024

Hi, fyi here is a follow up PR: #125663 🙏🏻

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3.12 bugs and security fixes awaiting review performance Performance or resource usage stdlib Python modules in the Lib dir type-feature A feature request or enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants