Skip to content

Commit

Permalink
Fix Controller: pending messages are stuck in the scheduled map (#…
Browse files Browse the repository at this point in the history
…1324)

Fix pop_queue_message_into_pending

Signed-off-by: Corentin Regal <[email protected]>
Signed-off-by: Eirik A <[email protected]>
Co-authored-by: Eirik A <[email protected]>
  • Loading branch information
co42 and clux authored Oct 27, 2023
1 parent 9c81f1f commit e78d488
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
12 changes: 12 additions & 0 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,18 @@ mod tests {
advance(Duration::from_secs(3)).await;
assert!(poll!(runner.as_mut()).is_pending());

// Send the third message again and check it's ran
sched_tx
.send(ScheduleRequest {
message: 3,
run_at: Instant::now(),
})
.await
.unwrap();
advance(Duration::from_secs(3)).await;
assert!(poll!(runner.as_mut()).is_pending());
assert_eq!(*count.lock().unwrap(), 4);

let (mut sched_tx, sched_rx) = mpsc::unbounded();
let mut runner = Box::pin(
Runner::new(scheduler(sched_rx), 1, |_| {
Expand Down
3 changes: 3 additions & 0 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
pub fn pop_queue_message_into_pending(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(msg)) = self.queue.poll_expired(cx) {
let msg = msg.into_inner();
self.scheduled.remove_entry(&msg).expect(
"Expired message was popped from the Scheduler queue, but was not in the metadata map",
);
self.pending.insert(msg);
}
}
Expand Down

0 comments on commit e78d488

Please sign in to comment.