From af1b5ae610597bf724e3e5cead9d1c620327b63e Mon Sep 17 00:00:00 2001 From: Sanskar Jaiswal Date: Fri, 4 Aug 2023 23:06:48 +0530 Subject: [PATCH] add `debounce_scheduler()` to configure debounce for scheduler Signed-off-by: Sanskar Jaiswal --- examples/configmapgen_controller.rs | 5 +- kube-runtime/src/controller/mod.rs | 90 +++++++++++++-------------- kube-runtime/src/controller/runner.rs | 11 ++-- kube-runtime/src/lib.rs | 2 +- kube-runtime/src/scheduler.rs | 40 +++++++----- 5 files changed, 74 insertions(+), 74 deletions(-) diff --git a/examples/configmapgen_controller.rs b/examples/configmapgen_controller.rs index 130108686..20422be74 100644 --- a/examples/configmapgen_controller.rs +++ b/examples/configmapgen_controller.rs @@ -7,7 +7,7 @@ use k8s_openapi::api::core::v1::ConfigMap; use kube::{ api::{Api, ObjectMeta, Patch, PatchParams, Resource}, runtime::{ - controller::{Action, Config, Controller}, + controller::{Action, Controller}, watcher, }, Client, CustomResource, @@ -102,11 +102,8 @@ async fn main() -> Result<()> { } }); - let mut config = Config::default(); - config.debounce(Duration::from_secs(2)); Controller::new(cmgs, watcher::Config::default()) .owns(cms, watcher::Config::default()) - .with_config(config) .reconcile_all_on(reload_rx.map(|_| ())) .shutdown_on_signal() .run(reconcile, error_policy, Arc::new(Data { client })) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 7bb16ddd6..14dd614d6 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -7,7 +7,7 @@ use crate::{ store::{Store, Writer}, ObjectRef, }, - scheduler::{scheduler, ScheduleRequest}, + scheduler::{debounced_scheduler, ScheduleRequest}, utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt}, watcher::{self, metadata_watcher, watcher, DefaultBackoff}, }; @@ -234,7 +234,6 @@ impl Display for ReconcileReason { } const APPLIER_REQUEUE_BUF_SIZE: usize = 100; -const SCHEDULER_DEBOUNCE_PERIOD: Duration = Duration::from_secs(1); /// Apply a reconciler to an input stream, with a given retry policy /// @@ -253,7 +252,7 @@ pub fn applier( context: Arc, store: Store, queue: QueueStream, - debounce: Option, + config: Config, ) -> impl Stream, Action), Error>> where K: Clone + Resource + 'static, @@ -293,42 +292,39 @@ where )), // all the Oks from the select gets passed through the scheduler stream, and are then executed move |s| { - Runner::new( - scheduler(s, debounce.or(Some(SCHEDULER_DEBOUNCE_PERIOD))), - move |request| { - let request = request.clone(); - match store.get(&request.obj_ref) { - Some(obj) => { - let scheduler_tx = scheduler_tx.clone(); - let error_policy_ctx = context.clone(); - let error_policy = error_policy.clone(); - let reconciler_span = info_span!( - "reconciling object", - "object.ref" = %request.obj_ref, - object.reason = %request.reason - ); - reconciler_span - .in_scope(|| reconciler(Arc::clone(&obj), context.clone())) - .into_future() - .then(move |res| { - let error_policy = error_policy; - RescheduleReconciliation::new( - res, - |err| error_policy(obj, err, error_policy_ctx), - request.obj_ref.clone(), - scheduler_tx, - ) - // Reconciler errors are OK from the applier's PoV, we need to apply the error policy - // to them separately - .map(|res| Ok((request.obj_ref, res))) - }) - .instrument(reconciler_span) - .left_future() - } - None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(), + Runner::new(debounced_scheduler(s, config.debounce), move |request| { + let request = request.clone(); + match store.get(&request.obj_ref) { + Some(obj) => { + let scheduler_tx = scheduler_tx.clone(); + let error_policy_ctx = context.clone(); + let error_policy = error_policy.clone(); + let reconciler_span = info_span!( + "reconciling object", + "object.ref" = %request.obj_ref, + object.reason = %request.reason + ); + reconciler_span + .in_scope(|| reconciler(Arc::clone(&obj), context.clone())) + .into_future() + .then(move |res| { + let error_policy = error_policy; + RescheduleReconciliation::new( + res, + |err| error_policy(obj, err, error_policy_ctx), + request.obj_ref.clone(), + scheduler_tx, + ) + // Reconciler errors are OK from the applier's PoV, we need to apply the error policy + // to them separately + .map(|res| Ok((request.obj_ref, res))) + }) + .instrument(reconciler_span) + .left_future() } - }, - ) + None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(), + } + }) .delay_tasks_until(async move { tracing::debug!("applier runner held until store is ready"); let res = delay_store.wait_until_ready().await; @@ -426,15 +422,15 @@ where /// the behavior of the contorller. #[derive(Clone, Debug, Default)] pub struct Config { - /// The debounce time that allows for deduplication of events, preventing - /// unnecessary reconciliations. By default, it is set to 1 second, but users - /// should modify it according to the needs of their controller. - debounce: Option, + debounce: Duration, } impl Config { + /// Sets the debounce period for the controller that allows for deduplication + /// of events, preventing unnecessary reconciliations. This is particularly useful + /// if the object is requeued instantly for a reconciliation by multiple streams. pub fn debounce(&mut self, debounce: Duration) { - self.debounce = Some(debounce); + self.debounce = debounce; } } @@ -1245,7 +1241,7 @@ where self.reader, StreamBackoff::new(self.trigger_selector, self.trigger_backoff) .take_until(future::select_all(self.graceful_shutdown_selector)), - self.config.debounce, + self.config, ) .take_until(futures::future::select_all(self.forceful_shutdown_selector)) } @@ -1260,7 +1256,7 @@ mod tests { applier, reflector::{self, ObjectRef}, watcher::{self, metadata_watcher, watcher, Event}, - Controller, + Config, Controller, }; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::ConfigMap; @@ -1327,6 +1323,8 @@ mod tests { let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::>(); let (store_rx, mut store_tx) = reflector::store(); + let mut config = Config::default(); + config.debounce(Duration::from_millis(1)); let applier = applier( |obj, _| { Box::pin(async move { @@ -1341,7 +1339,7 @@ mod tests { Arc::new(()), store_rx, queue_rx.map(Result::<_, Infallible>::Ok), - Some(Duration::from_millis(1)), + config, ); pin_mut!(applier); for i in 0..items { diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index 50210bbea..3e33e8df3 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -158,9 +158,9 @@ mod tests { let mut count = 0; let (mut sched_tx, sched_rx) = mpsc::unbounded(); let mut runner = Box::pin( - // The debounce period needs to zero because otherwise the scheduler has a default - // debounce period of 1 ms, which will lead to the second request to be discarded. - Runner::new(scheduler(sched_rx, Some(Duration::ZERO)), |_| { + // The debounce period needs to zero because a debounce period > 0 + // will lead to the second request to be discarded. + Runner::new(scheduler(sched_rx), |_| { count += 1; // Panic if this ref is already held, to simulate some unsafe action.. let mutex_ref = rc.borrow_mut(); @@ -205,7 +205,7 @@ mod tests { // pause(); let (mut sched_tx, sched_rx) = mpsc::unbounded(); let (result_tx, result_rx) = oneshot::channel(); - let mut runner = Runner::new(scheduler(sched_rx, None), |msg: &u8| futures::future::ready(*msg)); + let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg)); // Start a background task that starts listening /before/ we enqueue the message // We can't just use Stream::poll_next(), since that bypasses the waker system Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() }); @@ -244,7 +244,6 @@ mod tests { run_at: Instant::now(), }]) .chain(stream::pending()), - None, ), |msg| { assert!(*is_ready.lock().unwrap()); @@ -281,7 +280,6 @@ mod tests { }, ]) .chain(stream::pending()), - None, ), |msg| { assert!(*is_ready.lock().unwrap()); @@ -317,7 +315,6 @@ mod tests { run_at: Instant::now(), }]) .chain(stream::pending()), - None, ), |()| { panic!("run_msg should never be invoked if readiness gate fails"); diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index 2df6d039d..f59d6d607 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -29,7 +29,7 @@ pub mod utils; pub mod wait; pub mod watcher; -pub use controller::{applier, Controller}; +pub use controller::{applier, Config, Controller}; pub use finalizer::finalizer; pub use reflector::reflector; pub use scheduler::scheduler; diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index abf81a34a..c6ebdb86b 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -45,7 +45,11 @@ pub struct Scheduler { /// Incoming queue of scheduling requests. #[pin] requests: Fuse, - /// Debounce time to allow for deduplication of requests. + /// Debounce time to allow for deduplication of requests. It is added to the request's + /// initial expiration time. If another request with the same message arrives before + /// the request expires, its added to the new request's expiration time. This allows + /// for a request to be emitted, if the scheduler is "uninterrupted" for the configured + /// debounce period. Its primary purpose to deduplicate requests that expire instantly. debounce: Duration, } @@ -209,18 +213,29 @@ where /// is ready for it). /// /// The [`Scheduler`] terminates as soon as `requests` does. -pub fn scheduler>>( +pub fn scheduler>>(requests: S) -> Scheduler { + Scheduler::new(requests, Duration::ZERO) +} + +/// Stream transformer that delays and deduplicates [`Stream`] items. +/// +/// The debounce period lets the scheduler deduplicate requests that ask to be +/// emitted instantly, by making sure we wait for the configured period of time +/// to receive an uninterrupted request before actually emitting it. +/// +/// For more info, see [`scheduler()`]. +pub fn debounced_scheduler>>( requests: S, - debounce: Option, + debounce: Duration, ) -> Scheduler { - Scheduler::new(requests, debounce.unwrap_or(Duration::ZERO)) + Scheduler::new(requests, debounce) } #[cfg(test)] mod tests { use crate::utils::KubeRuntimeStreamExt; - use super::{scheduler, ScheduleRequest}; + use super::{debounced_scheduler, scheduler, ScheduleRequest}; use derivative::Derivative; use futures::{channel::mpsc, future, pin_mut, poll, stream, FutureExt, SinkExt, StreamExt}; use std::task::Poll; @@ -248,7 +263,6 @@ mod tests { run_at: Instant::now(), }]) .on_complete(sleep(Duration::from_secs(4))), - None, )); assert!(!scheduler.contains_pending(&1)); assert!(poll!(scheduler.as_mut().hold_unless(|_| false).next()).is_pending()); @@ -265,7 +279,7 @@ mod tests { async fn scheduler_should_not_reschedule_pending_items() { pause(); let (mut tx, rx) = mpsc::unbounded::>(); - let mut scheduler = Box::pin(scheduler(rx, None)); + let mut scheduler = Box::pin(scheduler(rx)); tx.send(ScheduleRequest { message: 1, run_at: Instant::now(), @@ -306,7 +320,6 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(2))), - None, )); assert_eq!( scheduler.as_mut().hold_unless(|x| *x != 1).next().await.unwrap(), @@ -329,7 +342,6 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), - None, ); pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); @@ -357,7 +369,6 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), - None, ); pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); @@ -382,7 +393,6 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), - None, ); pin_mut!(scheduler); assert!(poll!(scheduler.next()).is_pending()); @@ -396,7 +406,7 @@ mod tests { async fn scheduler_dedupe_should_allow_rescheduling_emitted_item() { pause(); let (mut schedule_tx, schedule_rx) = mpsc::unbounded(); - let mut scheduler = scheduler(schedule_rx, None); + let mut scheduler = scheduler(schedule_rx); schedule_tx .send(ScheduleRequest { message: (), @@ -438,7 +448,6 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), - None, ); assert_eq!(scheduler.map(|msg| msg.0).collect::>().await, vec![2]); } @@ -460,7 +469,6 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), - None, ); assert_eq!(scheduler.map(|msg| msg.0).collect::>().await, vec![1]); } @@ -471,7 +479,7 @@ mod tests { let now = Instant::now(); let (mut sched_tx, sched_rx) = mpsc::unbounded::>(); - let mut scheduler = scheduler(sched_rx, Some(Duration::from_secs(2))); + let mut scheduler = debounced_scheduler(sched_rx, Duration::from_secs(2)); sched_tx .send(ScheduleRequest { @@ -491,7 +499,7 @@ mod tests { let now = Instant::now(); let (mut sched_tx, sched_rx) = mpsc::unbounded::>(); - let mut scheduler = scheduler(sched_rx, Some(Duration::from_secs(2))); + let mut scheduler = debounced_scheduler(sched_rx, Duration::from_secs(2)); sched_tx .send(ScheduleRequest {