-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CX_CLEANUP] Integrate new Task
, in the code and remove the old
#2493
Conversation
Task
, in the code and remove the old
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Not really requesting changes, but having some questions I'd like to ask before approving the PR.
Generally looks great!! Love the simplification, and the dependency trait logic. I only had a couple of small nits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, though some nit comments from Justin aren't resolved yet.
crates/task-impls/src/vid.rs
Outdated
event: Self::Event, | ||
task: &mut Task<Self>, | ||
) -> Option<HotShotTaskCompleted> { | ||
// TODO: Don't clone the sender |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'm resolving this conversation since the TODO is still here.
Closes #2428
This PR:
The main goal of this PR is to replace the
ChannelStream
withasync_broadcast
channels. To do that it means we have to replace everything incrates/task
because all of the task infrastructure was heavily tied toChannelStream
. The previous PR: #2500 is for the implementation of the Task replacement.There were 3 areas that needed refactoring, Initialization and startup, the task_impls crate, and the testing architecture. I'll describe the changes below:
Sender
,Receiver
pair all we need to do is create the tasks states for each task and give them a clone of the event stream senders and receivers. Further since eachTaskState
we create implementsfilter_event
andhandle_event
we don't need to construct those function objects during task creation. Finally theTaskRegistry
is simplified such that it just collects theJoinHandles
of each task so we only have to do:task_registry.register(task.run()).await
which spawns the task and registers theJoinHandle
. Register is async not because it's waiting for the task to complete, but because it internally is locking the collection of tasks to insert the new one, the task isasync_spawn
ed.TaskState
for each...TaskState
types (e.g.ConsensusTaskState
). This is pretty simple, just calling handle_event function we already had implemented, and not returning the state anymore. Now we have an&mut self
so we just modify the our state during handling. The other thing is to replace theevent_stream.pulish
with the newevent_broadcast
. This should is functionally equivalent from the tasks perspective.task_impls
the actual test tasks were not all able to be converted toTestTaskState
. For the more complex one we implement that trait, but for completion task and transaction task it's simpler to just create simple loop and spawn that. Finally for unit tests we are able finally not have to duplicate the input on the output. We simply create the channels such that the test sends into the task and the task we are testing sends back to the test. This means the task won't receive its own events but I think that's ok since we should be controlling the input events ourselves.Some other things to note:
Shutdown
event should cause the tasks to exit, may need to do something a bit more complex but right now we don't perform any extra steps on shutdown anyway.Sender
by default waits for there to be an active receiver. This is fine for the main event stream since every task will have an active receiver (allasync_broadcast::Receriver
s are active when created). However for the output events we have a choice. If we keep an activeReceiver
around then it will queue up messages until it overflows (or blocks until the somebody callsrecv
on it, we can choose this mode). The next person to receive from it (really clone from it will get an overflow error first then the oldest message in the queue. Or we can keep andInactiveReceiver
and the sender will get an error if there is no active receiver. I choose this approach because it means if the application clones the receiver (callsget_event_stream
) then it won't have to deal with an error and a bunch of arbitrarily old events. It's a bit in the weeds but seeingInactiveReceiver
might be otherwise confusing.Receiver
implsStream
so they can continue to using the event_stream fromget_event_stream
like a stream, but it won't have an ID anymore. That's fine since they don't use that or could create their own ids if they really wanted to when they get the stream.I was hoping to be able to break this up but it's kind of all or nothing since
ChannelStream
was the fabric that tied everything together and now it's replaced.This PR does not:
The internal logic of the tests and consensus should not be changed logically, it should only be how we run tasks and pass messages between them.
Key places to review:
All the asks in
crates/task_impl.rs
andcrates/testing/source
.All the places we create and spawn tasks:
SystemContex::run_tasks
,crates/hotshot/src/tasks/mods.rs
,crates/testing/src/test_runner.rs
, and the new event stream interface incrates/hotshot/src/types/handle.rs
DO NOT REVIEW
/crates/task
HERE. REVIEW THEM IN: #2500