-
-
Notifications
You must be signed in to change notification settings - Fork 341
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for async generator finalization #1564
Conversation
First impression: there is more code here than I was expecting :-) Why do we need a dedicated reaper task with its own entry queue, etc.? Is there some reason the finalizer hook can't do async def aclose_forcefully_with_error_log(obj):
try:
await aclose_forcefully(obj)
except:
log_unraiseable()
token.run_sync_soon(spawn_system_task, aclose_forcefully_with_error_log, obj) ? Do we need to reimplement the fallback finalizer? Won't the VM run that automatically if we don't call I'm not sure we need the |
More than I was expecting too, but I think there's a good reason for most of it in this case.
The entry queue is the runner's entry queue, not an additional one. The only reason I used the entry queue rather than the TrioToken was because trio.run() has some logic for not creating the TrioToken until it's needed, and I figured there was some reason for that that I was missing. If the asyncgen manager used a TrioToken, the TrioToken would always exist. The reaper is its own task mostly because it needs to finalize the async generators that are still alive when the main task exits. (If we just let these dangle, then eventually they'll be GC'ed and will try to call our finalizer after the run has finished.) I guess that could be done directly at the place in
Nope -- objects are only finalized once, per PEP 442.
Trio itself maybe doesn't strictly need it (although it's important if you're doing guest mode on top of asyncio -- otherwise asyncgens created in asyncio will be finalized in Trio). But trio-asyncio will need it, and tricycle will need it, and having each library copy/paste most of
If you have a better way to track "this asyncgen is mine and that one is not", I'd love to hear it!
It would be really nice if CPython had gone with "firstiter hook returns the finalizer hook to use" -- there's already a slot for it in the object! -- rather than "asyncgen remembers which global finalizer was installed when it was first iterated". Then we could remove a lot of this complexity. Unfortunately, with the implementation they actually picked, I think it's necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The entry queue is the runner's entry queue, not an additional one. The only reason I used the entry queue rather than the TrioToken was because trio.run() has some logic for not creating the TrioToken until it's needed, and I figured there was some reason for that that I was missing. If the asyncgen manager used a TrioToken, the TrioToken would always exist.
Eh, I don't think there was any particularly good reason for that. But yeah, I misread. If you're just passing around the entry queue object instead of passing around the TrioToken
, then that's fine, they're basically the same thing anyway.
The reaper is its own task mostly because it needs to finalize the async generators that are still alive when the main task exits. (If we just let these dangle, then eventually they'll be GC'ed and will try to call our finalizer after the run has finished.) I guess that could be done directly at the place in task_exited() where we cancel the system_nursery, but this way seemed better-encapsulated. Also, really we want to start the shutdown process when all other system tasks have exited (except the run_sync_soon task and the asyncgen reaper), not just when the main task has exited, so that system tasks can safely use async generators in the same way that regular tasks can (rather than having a brief window after the main task has exited where newly iterated asyncgens won't be cleaned up). The current code will do that once we implement #1521.
So we do need to figure out when exactly to run the final cleanup hooks, and yeah, a system task is one of the more plausible places for that. But even if we use a system task for it, then I think it would be much simpler to make that task look like:
async def reaper(self):
try:
await sleep_forever()
finally:
for agen in self.pending_agens:
spawn_system_task(safe_aclose, agen)
...and use the system nursery for finalize calls in general. That would let us get rid of all the machinery around pending_finalize_calls
, inflight_starts
, etc., wouldn't it?
Trio itself maybe doesn't strictly need it (although it's important if you're doing guest mode on top of asyncio -- otherwise asyncgens created in asyncio will be finalized in Trio). But trio-asyncio will need it, and tricycle will need it, and having each library copy/paste most of conditional_asyncgen_hooks and claim its own locals key seems worse than doing it centrally.
Mayyyybe... but as currently implemented, it has some problems:
-
it assumes that the context managers will be opened in strict nested order, but that's not guaranteed. If two invocations get interleaved, then you'll get a big mess.
-
trio-asyncio needs to store more per-generator state, because there can be multiple loops, and it needs to track which one each async generator belongs to.
I don't think you even need a context manager. Like, trio-asyncio can install the hooks unconditionally when the first loop is opened, and let trio worry about cleaning them up later:
# Called from inside trio_asyncio.open_loop
def maybe_install_trio_asyncio_hooks():
if installed_asyncgen_hooks_runvar.get():
return
old_firstiter, old_finalizer = sys.get_asyncgen_hooks()
def firstiter(agen):
if in_trio_asyncio:
loop = get_running_loop()
agen.__frame__.__locals__["@trio_asyncio_loop"] = loop
loop._agens.add(agen)
else:
old_firstiter(agen)
def finalizer(agen):
if "@trio_asyncio_loop" in agen.__frame__.__locals__:
loop = agen.__frame__.__locals__
loop._do_finalizer(agen)
else:
old_finalizer(agen)
sys.set_asyncgen_hooks(firstiter, finalizer)
installed_asyncgen_hooks_runvar.set(True)
Can you really make that meaningfully shorter and clearer by using some utility from trio? There's only like, 3 lines of boilerplate or something; everything else has some special knowledge of trio-asyncio.
You can't use a WeakSet of asyncgens-that-are-mine because weakrefs are broken before finalizers are called.
Ughhh god dammit. Fine. Now I have another reason to be annoyed at this feature of weakrefs.
You can't use a set of ids because the finalizer hook is only called for asyncgens that haven't been fully exhausted, leaving the opportunity for the set to grow without bound.
I guess there is some ridiculous design where you install a weakref callback on every asyncgen, and when the callback is invoked you add that asyncgen's id to a set of "recently deceased asyncgens that belong to us", and you regularly purge that set, e.g. on every scheduler tick (since you know that in practice the finalizer hook is going to be called immediately after the weakref callback). But that's worse.
I bet we could sell Yury on a change where firstiter is allowed to return the finalizer hook, or if it returns None then it uses the global one. But that wouldn't help before 3.9 at the earliest. Might be worth it anyway; it'd be a pretty simple change, and at least it'd put a time limit on this frame hacking stuff.
Speaking of CPython remembering the finalizer hook when the async gen is created: doesn't that mean we don't need the fallback finalizer logic, since our finalizer hook will know how to handle everything that it gets given?
trio/_core/_asyncgen.py
Outdated
f"will not be able to manage its lifetime.", | ||
RuntimeWarning, | ||
stacklevel=2, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could reasonably raise an exception in this case too... it looks like firstiter
hooks are allowed to do that, and the exception propagates out of agen.__anext__
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, they are -- I went with a warning because it would technically be a backcompat break to make this an error, but it almost certainly wouldn't matter in practice, and I do prefer the error on aesthetic grounds. (Also, one can catch the exception and call __anext__
again, since at that point it's not the first iteration anymore and doesn't call the hook.)
That would be starting more system tasks after the system nursery has begun shutting down and cancelling the existing ones, which we both said in #1521 shouldn't be allowed for service tasks in general. Now that I think about it, though, we could remove pending_finalize_calls by letting start_close use run_sync_soon to schedule another call to itself if it's invoked before the nursery is open, since that's only a one-tick window. And similarly we could have the reaper task use run_sync_soon to schedule its own wakeup, which would remove the need for inflight_starts.
That's a very good point -- OK, no context manager :-)
We could support this by having our interface be that the firstiter hook returns its corresponding finalizer hook. trio-asyncio could return Or we could use a TreeVar for the hooks-to-use and not have multiple layers of hook delegation going on at all. Trio would install thread-level asyncgen hooks that delegate to the hook in the TreeVar, or call the asyncgen_manager hook if the TreeVar has no value here, or call the previously installed thread-level hook if the TreeVar is inaccessible since we're not in a task. In either case, we would only need to store one datum in the locals dict -- "which finalizer should we use
This seems worthwhile, especially if we still can make 3.9 even though we missed the first beta by a couple weeks. The change is small and likely to be backwards-compatible in all practical cases, although it's possible to write code whose behavior would change (if you're returning something from the firstiter hook even though that return value is currently ignored). What would the process be here? Submit a bug against Python first, or just a PR directly?
If we get firstiter-returns-finalizer upstreamed, then yes, we could remove this bit too: install the thread-level finalizer as None and have firstiter return None if we're not in a task. With current interpreter capabilities, we still need the fallback object, because the asyncgen object has remembered our thread-level finalizer unconditionally by the time our firstiter hook realizes "this isn't one of mine". |
OK yeah if we settle on making our asyncgen hooks extensible then this seems like a much better way to do that.
I think open an issue on BPO. Maybe we should also see about changing the behavior so that if the firstiter hook raises, then the async gen is not marked as having started iterating? (Oh wait, here's another thing to check: can a firstiter hook call
Well, maybe we were wrong, if the first time we try to use them we run into problems with it :-) In general though I'm having trouble keeping track of the combination of asyncgen tricky details + multiple layers of event loop composition + future service task semantics all at once. Minimal useful thing: for plain
That last step is the really tricky one. In theory, we'd better clean up the agens after like... every other piece of user code has finished. If some task is still iterating an agen when we try to Of course, during the cleanup those agens might do arbitrary things, like create new agens or try to spawn new system tasks. There's also some tricky sequencing between shutting down I guess a useful trick is that we could convert our So I'm thinking of a sequence like:
(This will also fix some other bugs, e.g. right now if a user system task uses Then there are a bunch of questions about how to implement this and how to extend it to cover more complex scenarios (guest mode, trio-asyncio, etc.). But first, does that sound like the right semantics to be aiming for? |
This does work, although it's awkward because firstiter is synchronous (you have to say An async generator can still be iterated after aclose(), though; it just raises StopAsyncIteration immediately. This seems confusing enough that we shouldn't bother -- if a user is actively trying to subvert our firstiter hook by catching the exception and continuing to iterate the generator, I think they deserve what they get.
Signal/GC context isn't async-colored, and firstiter-ing an async generator is. Function color is an illusion, sure, but do you think we need to worry about people saying
That seems dangerous to me -- imagine if an async generator had yielded inside I think continuing to run the run_sync_soon task until after asyncgens are finalized is reasonably safe. If we disable spawn_system_task before starting asyncgen finalization, then all newly submitted user code will have to be synchronous, which tends to reduce the chance of it using an async generator.
The sequence you've outlined sounds good to me, except for the order of shutting down the run_sync_soon task as mentioned above. It's worth noting that we can get this order pretty easily given #1521. We would set up the system nursery as:
Shutdown sequence: after the main task exits...
Basically, by implementing #1521 and starting our system tasks in the right order, we get the right shutdown order without having to do much at all by way of careful shutdown orchestration. And can still simplify this PR #1564 a good deal relative to what I originally uploaded -- for example, the asyncgen system task no longer does anything while user code is running, so there's no need to manage startup/shutdown quite so carefully. |
Compared to everything else we're doing here, that level of awkwardness barely registers :-) It sounds like we may not need to shut down asyncgen production though, so that's nice.
Oh yeah, good point. (Also I guess
A-ha! Fantastic example, thank you.
No doubt, but I think for right now I'd rather try to get something landed without blocking on other reworks :-). It's fewer things to keep in our heads at once, and we can always refactor later. So how about something like this: async def teardown_asyncgen(self, asyncgen):
with CancelScope() as cscope:
cscope.cancel()
# If another asyncgen is waiting on it, put it back in the queue for later
if asyncgen.ag_running:
self.asyncgens.add(asyncgen)
else:
try:
await asyncgen.aclose()
except BaseException as exc:
asyncgen_logger.exception(exc)
async def init(self):
# This is also where we put the autojump clock, somehow
async with open_nursery() as self.real_system_nursery:
real_system_nursery.start_soon(entry_queue_task)
# This is where spawn_system_task puts things, and it's auto-cancelled when main exits.
# When it's closed, spawn_system_task stops working.
async with open_nursery() as self.user_system_nursery:
self.main_task = spawn(self.user_system_nursery, ...)
# At this point we know all user tasks are gone.
self.asyncgens = set(self.asyncgens)
# No more asyncgen finalizers will be called. But there might be some working their
# way through the run_sync_soon queue. So flush it.
task = current_task()
self.entry_queue.run_sync_soon(reschedule, task)
await wait_task_rescheduled(lambda _: Abort.FAILED)
# All pending finalizers have run now. Add any trailing asyncgens to our force-cleanup set.
self.asyncgens.update(self.trailing_finalizer_asyncgens)
self.trailing_finalizer_asyncgens = None
# Loop needed in case some asyncgen's cleanup path firstiters a new asyncgen, or manages
# to block our closing another asyncgen.
# I guess ideally we'd do some dance involving the GC referrers graph to try to close
# asyncgens from the top down...
while self.asyncgens:
batch = self.asyncgens
self.asyncgens = set()
async with open_nursery() as kill_them_all:
for asyncgen in batch:
kill_them_all.start_soon(self.teardown_asyncgen, asyncgen)
# There are no more asyncgens, woohoo.
self.real_system_nursery.cancel()
def finalizer(self, asyncgen):
def finalizer_cb():
try:
self.user_system_nursery.start_soon(self.teardown_asyncgen, asyncgen)
except RuntimeError: # nursery is closed, final shutdown is starting
self.trailing_finalizer_asyncgens.add(asyncgen)
self.entry_queue.run_sync_soon(finalizer_cb) |
That sketch seems reasonable to me. (Nice touch with the loop during final teardown -- my previous attempts just denied asyncgen firstiter'ing after that point.) I do still want to at least try to get #1521 in first, because I think doing the teardown dance on top of it will be cleaner. I have a mostly-complete implementation which just needs #1579 to be workable; if it winds up raising design questions that aren't quickly resolvable then I'll come back and implement the interim non-service-task-based approach to asyncgen teardown here. |
My issue with blocking on #1521 is that I don't think we're confident of all the details of that API yet, and I'd rather focus on that on its own so it gets the attention it deserves, without trying to juggle this at the same time. |
https://bugs.python.org/issue40916
Fair enough - I'll rework this to not depend on that. I do think we at least need to resolve #1579 first though. |
After #1588, there will be only one task that would go in the "real" system nursery: the run_sync_soon task. Should we just... unroll that into some things that |
Maybe? But let's something landed first here before worrying about refactoring |
3b7f2f8
to
26f1aea
Compare
Have updated this according to the discussion upthread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A quick pass with some comments. I'm sleepy so probably not 100% comprehensive but I figured you'd rather hear what I had now rather than waiting :-)
Reminder, mostly for myself: I think we can defer solving trio-asyncio's problems for a followup PR, but we do need to think about how guest mode will work before merging this.
await wait_task_rescheduled(lambda _: Abort.FAILED) | ||
|
||
# Now it's safe to proceed with shutting down system tasks | ||
self.system_nursery.cancel_scope.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, so this is an interesting change compared to the sketch in my comment. In my sketch, I did the self.asyncgens = set(self.asyncgens)
and run_sync_soon
flush after shutting down the system nursery, and you're doing it before.
I guess the tradeoff is: in my sketch, the finalizer's run_sync_soon
payload has to be prepared to run after the system nursery is shut down. (I handled that by having it detect this case, and instead place the object into self.asyncgens
.) So that's a bit more complex. In this version, it's a bit simpler, but it means that during the time period where system tasks are shutting down, async generator garbage collection is disabled. That makes me a bit nervous, because arbitrary user code is within its rights to create unbounded amounts of garbage and expect the GC to clean it all up.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I figured we could trust system tasks to not run for an unbounded amount of time after they're cancelled, but you're right that that's not a very robust assumption to make. I'll change it to use your original approach. I think it will be quite difficult to test that reliably though, since we only have a one-tick window in between the system nursery becoming closed and the init task continuing on. I'll see what I can do...
except RuntimeError: | ||
# We're quite late in the shutdown process and | ||
# the system nursery is already closed. | ||
_core.current_task().parent_nursery.start_soon(kill_everything, exc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a gross hack and I don't like it. (And I don't like the subtle coupling with the finalize_remaining_asyncgens
code, where it has to use a shield because of this.)
But maybe there's no better solution for right now, and we should merge this as is, and then address #1607 in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is very much a gross hack, but I think it's the best solution for right now. I'll leave a comment.
Given the issues with interdependencies between asyncgen shutdowns, is there any value in parallelizing the final asyncgen collection at all? Even on 3.8+ with accurate ag_running, we only do the right thing if the user's asend() (or whatever) starts before our aclose() does. If our aclose() starts first, then their asend() will raise RuntimeError. (They only get a nice clean StopAsyncIteration if our aclose() finishes before their asend() starts.) Setup:
Imagine the final round of asyncgen finalization starts here:
Everything is running in a cancelled scope, so there's not much parallelism lost by serializing things, and it lets our behavior not be version-dependent. Thoughts? |
Codecov Report
@@ Coverage Diff @@
## master #1564 +/- ##
========================================
Coverage 99.60% 99.61%
========================================
Files 113 115 +2
Lines 14103 14445 +342
Branches 1090 1106 +16
========================================
+ Hits 14047 14389 +342
Misses 41 41
Partials 15 15
|
I think this is now ready. |
I didn't look at the code but the docs are excellent, as usual, thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks solid overall! I agree that parallelizing async generator shutdown is not that important.
As part of the ongoing struggle against _run.py
collapsing into a black hole, would it make sense to move the bulk of the hook and asyncgens
-set management code into a separate file?
docs/source/reference-core.rst
Outdated
guarantee is that it *won't* happen in the task that was using the | ||
generator. That task will continue on with whatever else it's doing, | ||
and the async generator cleanup will happen "sometime later, | ||
somewhere else". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add something here about why this matters: that it means timeouts are gone, contextvars are gone, nurseries might be closed, etc., and your cleanup code may not be expecting that.
Maybe we should also mention somewhere that most of this applies to regular generators too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a sentence mentioning possible problems. I couldn't figure out how to also mention synchronous generators without getting too far afield, but feel free to take a shot yourself!
Do we have a test that async generator objects are finalized promptly when the object becomes collectible, without waiting for the final cleanup logic? Because this weird bug seems to suggest that currently there's a bug in the interpreter that prevents that: https://bugs.python.org/issue41229 |
...On closer examination, it looks like we do have such tests, so... I'm not sure what's going on with that upstream bug report. But as I mentioned in the bug log, I can reproduce the issue with this PR, so there's something happening... |
Oh, never mind, that upstream bug report was spurious. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. Sorry for the late review. A few minor comments, and there are some minor merge conflicts, but feel free to merge after addressing those.
batch = self.alive | ||
self.alive = set() | ||
for agen in batch: | ||
await self._finalize_one(agen, name_asyncgen(agen)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, does the batching here do anything? Would it work just as well to write:
while self.alive:
agen = self.alive.pop()
await self._finalize_one(agen, name_asyncgen(agen))
? I'm not too worried either way but I'm wondering if I'm missing something...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The batching finalizes the currently-active generators before any new ones that get firstiter'ed during that process. I can't think of anywhere that the difference would matter but the batch-based ordering seems more intuitive to me.
This follows the basic outline previously proposed in #265, without most of the extensions I suggested. Feedback welcome.
Closes #265.