From f08883535b604cd0d1b34cf02267fbfa346e8de9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Sun, 9 Jul 2023 01:25:48 +0200 Subject: [PATCH 1/2] Update the scheduler message when preponing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In practice this should update the reconciliation reason, fixing #1114 Signed-off-by: Natalie Klestrup Röijezon --- kube-runtime/Cargo.toml | 1 + kube-runtime/src/scheduler.rs | 64 +++++++++++++++++++++++++++++++++-- 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 8814219a8..8f2c2b026 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -43,6 +43,7 @@ serde_json = "1.0.68" thiserror = "1.0.29" backoff = "0.4.0" async-trait = "0.1.64" +hashbrown = "0.14.0" [dependencies.k8s-openapi] version = "0.18.0" diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index d82f78490..d161a76b1 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -1,9 +1,10 @@ //! Delays and deduplicates [`Stream`] items use futures::{stream::Fuse, Stream, StreamExt}; +use hashbrown::{hash_map::Entry, HashMap}; use pin_project::pin_project; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::HashSet, hash::Hash, pin::Pin, task::{Context, Poll}, @@ -30,8 +31,13 @@ pub struct Scheduler { /// /// To ensure that the metadata is kept up-to-date, use `schedule_message` and /// `poll_pop_queue_message` rather than manipulating this directly. + /// + /// NOTE: `scheduled` should be considered to hold the "canonical" representation of the message. + /// Always pull the message out of `scheduled` once it has been retrieved from `queue`. queue: DelayQueue, /// Metadata for all currently scheduled messages. Used to detect duplicate messages. + /// + /// `scheduled` is considered to hold the "canonical" representation of the message. scheduled: HashMap, /// Messages that are scheduled to have happened, but have been held using `hold_unless`. pending: HashSet, @@ -67,6 +73,7 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { // TODO: this should add a little delay here to actually debounce self.queue.reset_at(&entry.queue_key, request.run_at); entry.run_at = request.run_at; + old_entry.replace_key(); } Entry::Occupied(_old_entry) => { // Old entry will run before the new request, so ignore the new request.. @@ -96,7 +103,7 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { match self.queue.poll_expired(cx) { Poll::Ready(Some(msg)) => { let msg = msg.into_inner(); - self.scheduled.remove(&msg).expect( + let (msg, _) = self.scheduled.remove_entry(&msg).expect( "Expired message was popped from the Scheduler queue, but was not in the metadata map", ); if can_take_message(&msg) { @@ -204,6 +211,7 @@ mod tests { use crate::utils::KubeRuntimeStreamExt; use super::{scheduler, ScheduleRequest}; + use derivative::Derivative; use futures::{channel::mpsc, future, pin_mut, poll, stream, FutureExt, SinkExt, StreamExt}; use std::task::Poll; use tokio::time::{advance, pause, sleep, Duration, Instant}; @@ -392,4 +400,56 @@ mod tests { scheduler.next().now_or_never().unwrap().unwrap(); assert!(poll!(scheduler.next()).is_pending()); } + + #[tokio::test] + async fn scheduler_should_overwrite_message_with_soonest_version() { + // Message type that is always considered equal to itself + #[derive(Derivative, Eq, Clone, Debug)] + #[derivative(PartialEq, Hash)] + struct SingletonMessage(#[derivative(PartialEq = "ignore", Hash = "ignore")] u8); + + pause(); + + let now = Instant::now(); + let scheduler = scheduler( + stream::iter([ + ScheduleRequest { + message: SingletonMessage(1), + run_at: now + Duration::from_secs(2), + }, + ScheduleRequest { + message: SingletonMessage(2), + run_at: now + Duration::from_secs(1), + }, + ]) + .on_complete(sleep(Duration::from_secs(5))), + ); + assert_eq!(scheduler.map(|msg| msg.0).collect::>().await, vec![2]); + } + + #[tokio::test] + async fn scheduler_should_not_overwrite_message_with_later_version() { + // Message type that is always considered equal to itself + #[derive(Derivative, Eq, Clone, Debug)] + #[derivative(PartialEq, Hash)] + struct SingletonMessage(#[derivative(PartialEq = "ignore", Hash = "ignore")] u8); + + pause(); + + let now = Instant::now(); + let scheduler = scheduler( + stream::iter([ + ScheduleRequest { + message: SingletonMessage(1), + run_at: now + Duration::from_secs(1), + }, + ScheduleRequest { + message: SingletonMessage(2), + run_at: now + Duration::from_secs(2), + }, + ]) + .on_complete(sleep(Duration::from_secs(5))), + ); + assert_eq!(scheduler.map(|msg| msg.0).collect::>().await, vec![1]); + } } From c60f8cce1bdf0f3a9ad68971eeeeb727af01762f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 20 Jul 2023 19:25:18 +0200 Subject: [PATCH 2/2] Move `SingletonMessage` out of the individual tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-runtime/src/scheduler.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index d161a76b1..7ff1cd2e1 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -224,6 +224,11 @@ mod tests { } } + /// Message type that is always considered equal to itself + #[derive(Derivative, Eq, Clone, Debug)] + #[derivative(PartialEq, Hash)] + struct SingletonMessage(#[derivative(PartialEq = "ignore", Hash = "ignore")] u8); + #[tokio::test] async fn scheduler_should_hold_and_release_items() { pause(); @@ -403,11 +408,6 @@ mod tests { #[tokio::test] async fn scheduler_should_overwrite_message_with_soonest_version() { - // Message type that is always considered equal to itself - #[derive(Derivative, Eq, Clone, Debug)] - #[derivative(PartialEq, Hash)] - struct SingletonMessage(#[derivative(PartialEq = "ignore", Hash = "ignore")] u8); - pause(); let now = Instant::now(); @@ -429,11 +429,6 @@ mod tests { #[tokio::test] async fn scheduler_should_not_overwrite_message_with_later_version() { - // Message type that is always considered equal to itself - #[derive(Derivative, Eq, Clone, Debug)] - #[derivative(PartialEq, Hash)] - struct SingletonMessage(#[derivative(PartialEq = "ignore", Hash = "ignore")] u8); - pause(); let now = Instant::now();