diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index c854b3a725fcb3..4a8d2d357fb637 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -42,10 +42,7 @@ type AtomicSchedulerId = AtomicU64; #[derive(Debug)] pub struct SchedulerPool, TH: TaskHandler> { schedulers: Mutex>>, - log_messages_bytes_limit: Option, - transaction_status_sender: Option, - replay_vote_sender: Option, - prioritization_fee_cache: Arc, + handler_context: HandlerContext, // weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to // Arc from &Self, because SchedulerPool is used as in the form of Arc // almost always. But, this would cause wasted and noisy Arc::clone()'s at every call sites. @@ -61,6 +58,14 @@ pub struct SchedulerPool, TH: TaskHandler> { _phantom: PhantomData, } +#[derive(Debug)] +pub struct HandlerContext { + log_messages_bytes_limit: Option, + transaction_status_sender: Option, + replay_vote_sender: Option, + prioritization_fee_cache: Arc, +} + pub type DefaultSchedulerPool = SchedulerPool, DefaultTaskHandler>; @@ -73,10 +78,12 @@ impl, TH: TaskHandler> SchedulerPool { ) -> Arc { Arc::new_cyclic(|weak_self| Self { schedulers: Mutex::default(), - log_messages_bytes_limit, - transaction_status_sender, - replay_vote_sender, - prioritization_fee_cache, + handler_context: HandlerContext { + log_messages_bytes_limit, + transaction_status_sender, + replay_vote_sender, + prioritization_fee_cache, + }, weak_self: weak_self.clone(), next_scheduler_id: AtomicSchedulerId::new(PRIMARY_SCHEDULER_ID), _phantom: PhantomData, @@ -136,13 +143,13 @@ impl, TH: TaskHandler> InstalledSchedulerPool for Sche } pub trait TaskHandler: Send + Sync + Debug + Sized + 'static { - fn handle>( + fn handle( result: &mut Result<()>, timings: &mut ExecuteTimings, bank: &Arc, transaction: &SanitizedTransaction, index: usize, - pool: &SchedulerPool, + handler_context: &HandlerContext, ); } @@ -150,13 +157,13 @@ pub trait TaskHandler: Send + Sync + Debug + Sized + 'static { pub struct DefaultTaskHandler; impl TaskHandler for DefaultTaskHandler { - fn handle>( + fn handle( result: &mut Result<()>, timings: &mut ExecuteTimings, bank: &Arc, transaction: &SanitizedTransaction, index: usize, - pool: &SchedulerPool, + handler_context: &HandlerContext, ) { // scheduler must properly prevent conflicting tx executions. thus, task handler isn't // responsible for locking. @@ -169,11 +176,11 @@ impl TaskHandler for DefaultTaskHandler { *result = execute_batch( &batch_with_indexes, bank, - pool.transaction_status_sender.as_ref(), - pool.replay_vote_sender.as_ref(), + handler_context.transaction_status_sender.as_ref(), + handler_context.replay_vote_sender.as_ref(), timings, - pool.log_messages_bytes_limit, - &pool.prioritization_fee_cache, + handler_context.log_messages_bytes_limit, + &handler_context.prioritization_fee_cache, ); } } @@ -248,7 +255,7 @@ impl InstalledScheduler for PooledScheduler { self.context().bank(), transaction, index, - &self.pool, + &self.pool.handler_context, ); } } @@ -578,7 +585,7 @@ mod tests { context.bank(), &transaction_and_index.0, transaction_and_index.1, - &pool, + &pool.handler_context, ); (result, timings) })); @@ -630,10 +637,10 @@ mod tests { PooledScheduler:: { id: pool.new_scheduler_id(), pool: SchedulerPool::new( - pool.log_messages_bytes_limit, - pool.transaction_status_sender.clone(), - pool.replay_vote_sender.clone(), - pool.prioritization_fee_cache.clone(), + pool.handler_context.log_messages_bytes_limit, + pool.handler_context.transaction_status_sender.clone(), + pool.handler_context.replay_vote_sender.clone(), + pool.handler_context.prioritization_fee_cache.clone(), ), context: Some(initial_context), result_with_timings: Mutex::default(),