Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the scheduler message when preponing #1260

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ serde_json = "1.0.68"
thiserror = "1.0.29"
backoff = "0.4.0"
async-trait = "0.1.64"
hashbrown = "0.14.0"

[dependencies.k8s-openapi]
version = "0.18.0"
Expand Down
59 changes: 57 additions & 2 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Delays and deduplicates [`Stream`] items

use futures::{stream::Fuse, Stream, StreamExt};
use hashbrown::{hash_map::Entry, HashMap};
use pin_project::pin_project;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::HashSet,
hash::Hash,
pin::Pin,
task::{Context, Poll},
Expand All @@ -30,8 +31,13 @@ pub struct Scheduler<T, R> {
///
/// To ensure that the metadata is kept up-to-date, use `schedule_message` and
/// `poll_pop_queue_message` rather than manipulating this directly.
///
/// NOTE: `scheduled` should be considered to hold the "canonical" representation of the message.
/// Always pull the message out of `scheduled` once it has been retrieved from `queue`.
queue: DelayQueue<T>,
/// Metadata for all currently scheduled messages. Used to detect duplicate messages.
///
/// `scheduled` is considered to hold the "canonical" representation of the message.
scheduled: HashMap<T, ScheduledEntry>,
/// Messages that are scheduled to have happened, but have been held using `hold_unless`.
pending: HashSet<T>,
Expand Down Expand Up @@ -67,6 +73,7 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
// TODO: this should add a little delay here to actually debounce
self.queue.reset_at(&entry.queue_key, request.run_at);
entry.run_at = request.run_at;
old_entry.replace_key();
}
Entry::Occupied(_old_entry) => {
// Old entry will run before the new request, so ignore the new request..
Expand Down Expand Up @@ -96,7 +103,7 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
match self.queue.poll_expired(cx) {
Poll::Ready(Some(msg)) => {
let msg = msg.into_inner();
self.scheduled.remove(&msg).expect(
let (msg, _) = self.scheduled.remove_entry(&msg).expect(
"Expired message was popped from the Scheduler queue, but was not in the metadata map",
);
if can_take_message(&msg) {
Expand Down Expand Up @@ -204,6 +211,7 @@ mod tests {
use crate::utils::KubeRuntimeStreamExt;

use super::{scheduler, ScheduleRequest};
use derivative::Derivative;
use futures::{channel::mpsc, future, pin_mut, poll, stream, FutureExt, SinkExt, StreamExt};
use std::task::Poll;
use tokio::time::{advance, pause, sleep, Duration, Instant};
Expand All @@ -216,6 +224,11 @@ mod tests {
}
}

/// Message type that is always considered equal to itself
#[derive(Derivative, Eq, Clone, Debug)]
#[derivative(PartialEq, Hash)]
struct SingletonMessage(#[derivative(PartialEq = "ignore", Hash = "ignore")] u8);

#[tokio::test]
async fn scheduler_should_hold_and_release_items() {
pause();
Expand Down Expand Up @@ -392,4 +405,46 @@ mod tests {
scheduler.next().now_or_never().unwrap().unwrap();
assert!(poll!(scheduler.next()).is_pending());
}

#[tokio::test]
async fn scheduler_should_overwrite_message_with_soonest_version() {
pause();

let now = Instant::now();
let scheduler = scheduler(
stream::iter([
ScheduleRequest {
message: SingletonMessage(1),
run_at: now + Duration::from_secs(2),
},
ScheduleRequest {
message: SingletonMessage(2),
run_at: now + Duration::from_secs(1),
},
])
.on_complete(sleep(Duration::from_secs(5))),
);
assert_eq!(scheduler.map(|msg| msg.0).collect::<Vec<_>>().await, vec![2]);
}

#[tokio::test]
async fn scheduler_should_not_overwrite_message_with_later_version() {
pause();

let now = Instant::now();
let scheduler = scheduler(
stream::iter([
ScheduleRequest {
message: SingletonMessage(1),
run_at: now + Duration::from_secs(1),
},
ScheduleRequest {
message: SingletonMessage(2),
run_at: now + Duration::from_secs(2),
},
])
.on_complete(sleep(Duration::from_secs(5))),
);
assert_eq!(scheduler.map(|msg| msg.0).collect::<Vec<_>>().await, vec![1]);
}
}