-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Apfitzge/feature/scheduler interface no tests #3
base: apfitzge/feature/scheduler_interface_no_tests_base
Are you sure you want to change the base?
Apfitzge/feature/scheduler interface no tests #3
Conversation
b081cba
to
bdd08c0
Compare
} | ||
} | ||
|
||
pub fn tick(&mut self) -> Result<(), SchedulerError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here
log_messages_bytes_limit: Option<usize>, | ||
connection_cache: Arc<ConnectionCache>, | ||
bank_forks: &Arc<RwLock<BankForks>>, | ||
mut unprocessed_transaction_storage: UnprocessedTransactionStorage, | ||
mut scheduler_handle: SchedulerHandle, | ||
forward_executor: ForwardExecutor, | ||
consume_executor: ConsumeExecutor, | ||
) { | ||
let recorder = poh_recorder.read().unwrap().recorder(); | ||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); | ||
let mut banking_stage_stats = BankingStageStats::new(id); | ||
let mut tracer_packet_stats = TracerPacketStats::new(id); | ||
let qos_service = QosService::new(id); | ||
|
||
let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id); | ||
let mut last_metrics_update = Instant::now(); | ||
|
||
loop { | ||
let my_pubkey = cluster_info.id(); | ||
if !unprocessed_transaction_storage.is_empty() | ||
|| last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD | ||
{ | ||
let (_, process_buffered_packets_time) = measure!( | ||
Self::process_buffered_packets( | ||
&my_pubkey, | ||
&socket, | ||
poh_recorder, | ||
cluster_info, | ||
&mut unprocessed_transaction_storage, | ||
&transaction_status_sender, | ||
&gossip_vote_sender, | ||
&banking_stage_stats, | ||
&recorder, | ||
data_budget, | ||
&qos_service, | ||
&mut slot_metrics_tracker, | ||
log_messages_bytes_limit, | ||
&connection_cache, | ||
&mut tracer_packet_stats, | ||
bank_forks, | ||
), | ||
"process_buffered_packets", | ||
); | ||
slot_metrics_tracker | ||
.increment_process_buffered_packets_us(process_buffered_packets_time.as_us()); | ||
last_metrics_update = Instant::now(); | ||
// Do scheduled work (processing packets) | ||
if let Err(err) = scheduler_handle.do_scheduled_work( | ||
&consume_executor, | ||
&forward_executor, | ||
&mut tracer_packet_stats, | ||
&mut slot_metrics_tracker, | ||
) { | ||
warn!("Banking stage scheduler error: {:?}", err); | ||
break; | ||
} | ||
|
||
tracer_packet_stats.report(1000); | ||
|
||
// Gossip thread will almost always not wait because the transaction storage will most likely not be empty | ||
let recv_timeout = if !unprocessed_transaction_storage.is_empty() { | ||
// If there are buffered packets, run the equivalent of try_recv to try reading more | ||
// packets. This prevents starving BankingStage::consume_buffered_packets due to | ||
// buffered_packet_batches containing transactions that exceed the cost model for | ||
// the current bank. | ||
Duration::from_millis(0) | ||
} else { | ||
// Default wait time | ||
Duration::from_millis(100) | ||
}; | ||
|
||
let (res, receive_and_buffer_packets_time) = measure!( | ||
Self::receive_and_buffer_packets( | ||
packet_deserializer, | ||
recv_start, | ||
recv_timeout, | ||
id, | ||
&mut unprocessed_transaction_storage, | ||
&mut banking_stage_stats, | ||
&mut tracer_packet_stats, | ||
&mut slot_metrics_tracker, | ||
), | ||
"receive_and_buffer_packets", | ||
); | ||
slot_metrics_tracker | ||
.increment_receive_and_buffer_packets_us(receive_and_buffer_packets_time.as_us()); | ||
|
||
match res { | ||
Ok(()) | Err(RecvTimeoutError::Timeout) => (), | ||
Err(RecvTimeoutError::Disconnected) => break, | ||
// Do any necessary updates - check if scheduler is still valid | ||
if let Err(err) = scheduler_handle.tick() { | ||
warn!("Banking stage scheduler error: {:?}", err); | ||
break; | ||
} | ||
banking_stage_stats.report(1000); | ||
|
||
tracer_packet_stats.report(1000); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here
pub mod commit_executor; | ||
pub mod consume_executor; | ||
pub mod decision_maker; | ||
pub mod external_scheduler; | ||
pub mod forward_executor; | ||
pub mod multi_iterator_scheduler; | ||
pub mod packet_receiver; | ||
pub mod record_executor; | ||
pub mod scheduler_error; | ||
pub mod scheduler_handle; | ||
pub mod thread_aware_account_locks; | ||
pub mod thread_local_scheduler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name shedding: it seems both *er
and *_executor
is used? Is there some criteria of using one over the other? I prefer *er
to *_executor
for its shortness.
so, commiter
and consumer
, etc...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not as careful about it as I should have been with naming, but attempted to keep er
s as components which made logical decisions, or passed new data to another component (decision making, packet receiving, scheduling), where executor
s were intended to process data in some way and return some result to the calling component.
I'm also fine, and think it's probably less jarring/confusing to just rename these all to er
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i sensed that overall naming is well thought-out and helpful to navigate this 4k loc patch. :) i just nit-picking here. ;)
pub(crate) enum SchedulerHandle { | ||
ThreadLocalScheduler(ThreadLocalScheduler), | ||
ExternalScheduler(ExternalSchedulerHandle), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you spent some time on good interface designing but i think we can shift this scheduler abstraction further up in higher layer. and that's cleaner, i think.
in short, fiddle tpu like this struct Tpu { ..., banking_stage: Box<dyn PluggableBankingStage> }
with trait PluggableBankingStage { pub fn join(...) }
then impl the trait both for ThreadLocalBankingStage
and MultiIteratorBankingStage
.
finally, tpu.rs
code will look like this:
...
let banking_stage = if use_central_scheduler {
Box::new(MultiIteratorBankingStage::new(...))
} else {
Box::new(ThreadLocalBankingStage::new(...))
};
...
Then remove SchedulerHandle
and this common loop (https://github.com/ryoqun/solana/pull/3/files#r1055131096)
and restore ThreadLocalBankingStage
's original code in thread_local_banking_stage.rs
and put any tailored newly-coded main loop/thread setup code for MultiIteratorBankingStage
at multi_iterator_banking_stage.rs
.
(additional bonus of this restoration would be that existing metrics can be remained as is mostly too, which is desired for quick merging of these massive cleanup prs.)
Also, all the other leaf sub components (DesicionMaker
, Committer
, etc) are factored out as is and wired from thread_local_banking_stage.rs
and multi_iterator_banking_stage.rs
. I really like these break-up, btw.
After all both threading mode is too different to factor out to a common interface. one of .tick()
is empty (https://github.com/ryoqun/solana/pull/3/files#r1055130056) and i couldn't understand the main loop code at first look to be host because it's generalized too much to factorize the different beasts. :)
I know these comments aren't pleasure to see after all the hard work. but hopefully, i think this will make the code easier to follow in the long term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, also the hated dynamic dispatch cost is only incurred when calling ::join()
, which should be negligible, considering it's just on the process termination code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Splitting them entirely rather than trying to fit a common interface might be a better idea, and certainly makes the initial refactoring simpler as all existing code can remain nearly as is.
I'm not sure I see the reason for using a Box<dyn BankingStageTrait>
owned by tpu
, since even the thread-local banking stage will still be spawning a thread separate from the main - a vec of JoinHandle
still covers this. We could still separate existing code and new code into different modules, and the main BankingStage file is just responsible for spawning those. This eliminates needing to fit the main loop code into a common interface, which is probably better.
Probably would also make it more clear that even when using the central scheduler, it is only for non-vote tx and the vote tx still use the thread-local codepath.
Another minor detail, the new BankingStage code-path should be named w/o multi-iterator in mind, it's generic enough to just accept work from any central scheduler that sends the same data over the channels!
I know these comments aren't pleasure to see after all the hard work. but hopefully, i think this will make the code easier to follow in the long term.
Not at all, I just want to make it as best as possible. This is exactly why I reached out more broadly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, all the other leaf sub components (DesicionMaker, Committer, etc) are factored out as is and wired from thread_local_banking_stage.rs and multi_iterator_banking_stage.rs. I really like these break-up, btw.
Yeah, I think regardless from the scheduling related changes that breaking of BankingStage is a good idea. And will hopefully make future bankless leader changes simpler as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I see the reason for using a Box owned by tpu, since even the thread-local banking stage will still be spawning a thread separate from the main. - a vec of JoinHandle still covers this. We could still separate existing code and new code into different modules, and the main BankingStage file is just responsible for spawning those.
Yeah, this code arrangement should work as well. Firstly, I'm NOT firmly leaning towards Box<dyn BnakingStageTrait>
. I just picked dyn
design as it's golden pattern to implement plugins.
Also, I wanted to encapsulate SchedulerStage
from tpu.rs
. That's why my initial reaction was dyn
. I think it's a bit of abstraction leak and not actually one of pipelined processing stages. Namely, it's actively buffering/reordering tasks. so, i would name it as Scheduler
and recognize it as part of internal component of BankingStage
.
While vec of JoinHandle still cover this, i think the design choice would boil down to how to express our concepts philosophically. That said, I recognize the existing code and new one are different ones conceptually. So, it's awkward for me to use the shared field of same type just because we can. Instead i would give them their own types. ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably would also make it more clear that even when using the central scheduler, it is only for non-vote tx and the vote tx still use the thread-local codepath.
FYI, my scheduler won't be like this. vote txs don't use thread local codepath. i'm opiniated here. but I think these different treatment vote/non-vote is just sub-optimal.
Another minor detail, the new BankingStage code-path should be named w/o multi-iterator in mind, it's generic enough to just accept work from any central scheduler that sends the same data over the channels!
yeah, seems so. however, note that my scheduler might not fit into well under ScheudlerKind
, needing minor or major changes outside it. ;)
Not at all, I just want to make it as best as possible. This is exactly why I reached out more broadly.
...
Yeah, I think regardless from the scheduling related changes that breaking of BankingStage is a good idea. And will hopefully make future bankless leader changes simpler as well.
💯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, my scheduler won't be like this. vote txs don't use thread local codepath. i'm opiniated here. but I think these different treatment vote/non-vote is just sub-optimal.
We can definitely move voting to be handled by the scheduler if we thinks that's best. I was trying to only change the code-path where we actually see scheduling issues i.e. non-vote threads, while leaving the rest as-is. I think pulling voting inside central scheduling can be done in a later PR if we see that is a good or necessary change.
different treatment vote/non-vote is just sub-optimal.
It's not optimal in terms of code-cleanliness because we could fit them under the arm of the same scheduler, but I actually think that voting is distinct from other transactions and might make sense to treat differently. Especially, Toly, Tao, and I had a short conversation a few days ago where we mentioned the possibility have handling the execution of votes differently from regular transactions - i.e. a separate VoteBankingStage. Nothing concrete yet, but I think pulling votes closer to regular txs will need wider discussion/consideration on the future of voting. Happy to hear your further thoughts on this!
FYI, my scheduler won't be like this. vote txs don't use thread local codepath.
I'd need to look at yours again, but what are the messages your scheduler sends to the executing threads and what executing threads send back? We can definitely modify the current ScheduledTransactions
or ProcessedTransactions
messages in scheduler_stage.rs to fit your case as well (i'm still iterating on these structs for my own scheduler ideas).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(sorry for late replay. but let me reply to your good comments for our deeper understanding)
FYI, my scheduler won't be like this. vote txs don't use thread local codepath. i'm opiniated here. but I think these different treatment vote/non-vote is just sub-optimal.
We can definitely move voting to be handled by the scheduler if we thinks that's best. I was trying to only change the code-path where we actually see scheduling issues i.e. non-vote threads, while leaving the rest as-is. I think pulling voting inside central scheduling can be done in a later PR if we see that is a good or necessary change.
yep. i understand your intention to make change as small as possible. as you said, i also agree that later prs can address this.
actually think that voting is distinct from other transactions and might make sense to treat differently. Especially, Toly, Tao, and I had a short conversation a few days ago where we mentioned the possibility have handling the execution of votes differently from regular transactions - i.e. a separate VoteBankingStage. Nothing concrete yet, but I think pulling votes closer to regular txs will need wider discussion/consideration on the future of voting. Happy to hear your further thoughts on this!
in short, my main concern is that further increases replayability risk. as you just said, voting is mostly light both for scheduling and executing. so better off treating uniformly with non-vote txes, i think. that's engineering point of view.
also, here's a bit theoretical economic point of view: imo, its distinctiveness would be the vitality for cluster functioning. but i think that should be priced properly (ie. with ComputeBudget
), esp at the times of peak. Say, a whole cluster is crunching on txes at the burning steady rate of 1SOL/block while saturating the blocks (too abstract but assume there exist such cluster state) (don't count in vote txes yet). that's value creation for cluster and validators should be rewarded stake-weighted. And claim must be done as a form of vote tx by validators to earn these rewards. As a whole, assume that all validtor's vote need to consume 50% of blockspace. then, we have to cut by half the rate to include barely paying vote txes, if vote is treated specially. That means cluster-wide value creation is reduced as well.
Instead, we need some mechanism to shake off too small validatros, should that situation occur. so make vote txes compete with non-votes in the battle of block space. then, revenue stream is back to 1sol. that means, surviving (strong) validator operators is still viable. Otherwise, there's unbounded incentive to run validator if they can attain stakes. Put differently, validators are competing for non-vote tx fees external to their vote txes. so there's no point of tightening up their revenue source for long tail of stake validators (if any). take this to the extreme, to the extent there's no block space for non-votes, but only for votes. Now, they're competing for nothing (just zero sum game between node validators).
All in all, priced vote txes should help the cluster find some equibilium for right-size set of validatros and some utility apeal to external world. obviously, I skipped all the realities here. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, my scheduler won't be like this. vote txs don't use thread local codepath.
...... it's generic enough to just accept work from any central scheduler that sends the same data over the channels!
yeah, seems so. however, note that my scheduler might not fit into well under ScheudlerKind, needing minor or major changes outside it. ;)
I'd need to look at yours again, but what are the messages your scheduler sends to the executing threads and what executing threads send back? We can definitely modify the current
ScheduledTransactions
orProcessedTransactions
messages in scheduler_stage.rs to fit your case as well (i'm still iterating on these structs for my own scheduler ideas).
as you know, I actually hooked up my scheduler into banking stage. after all the struggles, i have some concrete ideas to answer now, regarding integrating my scheduler in general into banking, not only messages between scheduler thread <=> executing threads in particular (WARN: cluttered bullet points):
- I want to bypass qos in
ConsumeExecutor
(maybe a flag inScheduledTransactions
should suffice.) recent_blockhash filtering logic isn't needed as well. (i noticed excluding any non essential tasks from critical section is important for perf, considering my scheduler is very latency-sensitive) - and there is no need to lock in ConsumeExecutor via prepare_sanitized_batch_with_results (so, i want to bypass it as well)
- my scheduler thread would like to receive
SanitizedTransactions
(and some pre-populated data structures) to offload some parallelizable jobs (deserializing, sanity check, preloader, etc) to pre-process threads. - I need two channels to send
ScheduledTransactions
from the schduler to represent high-priority (contented tx) low-priority (not contentded). (should be easy to piggybuck on existing per-executing-thread multiple channels plumbing) - forwarding code is making code to harder to follow. i think scheduling and forwarding is different concerns to be separated. it's just merely not overlapping in terms of wall time, no need to mix them and model code after the fact
- I'd like to share as much as code with replay code
- there's a lot of metrics code which weigth in for my single-tx-only batches, not being able to amortizese unlike full-of-tx batches.
- it needs to take
tpu_vote_receiver
andgossip_vote_receiver
for::new()
, indicatingXXXBakingStage::new()
is proper interface boundary.
after actual try, i can say that nothing is a deal breaker. However, i'm now thinking i can slim down banking stage more than i originally thought. sorry about that wrong initial estimate.. so I'm worndering whether it's best idea to continue maintain/overhaul the existing banking code..
so, I'm afraid but I'm a bit reluctant to fit my schduler into this banking stage reorganization. I think it's better off just creating UnifiedSchdulerBankingStage
or something for now... so that my BankingStage can take full blame for any poor perf. ;) also note that it's not fully determined for my schduler to make sense in banking. I'd like to be challenged in the future after i create non-draft real code/doc pr for it...
as a positive side, you can optimize this re-orgs as much as you'd like to be favorable to your scheduler dream. :)
Problem
Summary of Changes
Fixes #