Skip to content

Commit

Permalink
Introduce HandlerContext to simplify TaskHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Dec 12, 2023
1 parent f29cabb commit 584bd35
Showing 1 changed file with 29 additions and 22 deletions.
51 changes: 29 additions & 22 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ type AtomicSchedulerId = AtomicU64;
#[derive(Debug)]
pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
schedulers: Mutex<Vec<Box<S>>>,
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
handler_context: HandlerContext,
// weak_self could be elided by changing InstalledScheduler::take_scheduler()'s receiver to
// Arc<Self> from &Self, because SchedulerPool is used as in the form of Arc<SchedulerPool>
// almost always. But, this would cause wasted and noisy Arc::clone()'s at every call sites.
Expand All @@ -61,6 +58,14 @@ pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
_phantom: PhantomData<TH>,
}

#[derive(Debug)]
pub struct HandlerContext {
log_messages_bytes_limit: Option<usize>,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: Option<ReplayVoteSender>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
}

pub type DefaultSchedulerPool =
SchedulerPool<PooledScheduler<DefaultTaskHandler>, DefaultTaskHandler>;

Expand All @@ -73,10 +78,12 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> SchedulerPool<S, TH> {
) -> Arc<Self> {
Arc::new_cyclic(|weak_self| Self {
schedulers: Mutex::default(),
log_messages_bytes_limit,
transaction_status_sender,
replay_vote_sender,
prioritization_fee_cache,
handler_context: HandlerContext {
log_messages_bytes_limit,
transaction_status_sender,
replay_vote_sender,
prioritization_fee_cache,
},
weak_self: weak_self.clone(),
next_scheduler_id: AtomicSchedulerId::new(PRIMARY_SCHEDULER_ID),
_phantom: PhantomData,
Expand Down Expand Up @@ -136,27 +143,27 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> InstalledSchedulerPool for Sche
}

pub trait TaskHandler: Send + Sync + Debug + Sized + 'static {
fn handle<S: SpawnableScheduler<Self>>(
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
transaction: &SanitizedTransaction,
index: usize,
pool: &SchedulerPool<S, Self>,
handler_context: &HandlerContext,
);
}

#[derive(Debug)]
pub struct DefaultTaskHandler;

impl TaskHandler for DefaultTaskHandler {
fn handle<S: SpawnableScheduler<Self>>(
fn handle(
result: &mut Result<()>,
timings: &mut ExecuteTimings,
bank: &Arc<Bank>,
transaction: &SanitizedTransaction,
index: usize,
pool: &SchedulerPool<S, Self>,
handler_context: &HandlerContext,
) {
// scheduler must properly prevent conflicting tx executions. thus, task handler isn't
// responsible for locking.
Expand All @@ -169,11 +176,11 @@ impl TaskHandler for DefaultTaskHandler {
*result = execute_batch(
&batch_with_indexes,
bank,
pool.transaction_status_sender.as_ref(),
pool.replay_vote_sender.as_ref(),
handler_context.transaction_status_sender.as_ref(),
handler_context.replay_vote_sender.as_ref(),
timings,
pool.log_messages_bytes_limit,
&pool.prioritization_fee_cache,
handler_context.log_messages_bytes_limit,
&handler_context.prioritization_fee_cache,
);
}
}
Expand Down Expand Up @@ -248,7 +255,7 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
self.context().bank(),
transaction,
index,
&self.pool,
&self.pool.handler_context,
);
}
}
Expand Down Expand Up @@ -578,7 +585,7 @@ mod tests {
context.bank(),
&transaction_and_index.0,
transaction_and_index.1,
&pool,
&pool.handler_context,
);
(result, timings)
}));
Expand Down Expand Up @@ -630,10 +637,10 @@ mod tests {
PooledScheduler::<DefaultTaskHandler> {
id: pool.new_scheduler_id(),
pool: SchedulerPool::new(
pool.log_messages_bytes_limit,
pool.transaction_status_sender.clone(),
pool.replay_vote_sender.clone(),
pool.prioritization_fee_cache.clone(),
pool.handler_context.log_messages_bytes_limit,
pool.handler_context.transaction_status_sender.clone(),
pool.handler_context.replay_vote_sender.clone(),
pool.handler_context.prioritization_fee_cache.clone(),
),
context: Some(initial_context),
result_with_timings: Mutex::default(),
Expand Down

0 comments on commit 584bd35

Please sign in to comment.