Skip to content

Commit

Permalink
add controller::Config and debounce period to scheduler
Browse files Browse the repository at this point in the history
Add `controller::Config` to allow configuring the behavior of the
controller. Introduce a debounce period for the scheduler to allow for
deduplication of requests. By default, the debounce period is set to 1
second.

Signed-off-by: Sanskar Jaiswal <[email protected]>
  • Loading branch information
aryan9600 committed Aug 2, 2023
1 parent 823f4b8 commit fe94fee
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 59 deletions.
5 changes: 4 additions & 1 deletion examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ObjectMeta, Patch, PatchParams, Resource},
runtime::{
controller::{Action, Controller},
controller::{Action, Config, Controller},
watcher,
},
Client, CustomResource,
Expand Down Expand Up @@ -102,8 +102,11 @@ async fn main() -> Result<()> {
}
});

let mut config = Config::default();
config.debounce(Duration::from_secs(2));
Controller::new(cmgs, watcher::Config::default())
.owns(cms, watcher::Config::default())
.with_config(config)
.reconcile_all_on(reload_rx.map(|_| ()))
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(Data { client }))
Expand Down
127 changes: 81 additions & 46 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
},
scheduler::{scheduler, ScheduleRequest},
utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
watcher::{self, metadata_watcher, watcher, Config, DefaultBackoff},
watcher::{self, metadata_watcher, watcher, DefaultBackoff},
};
use backoff::backoff::Backoff;
use derivative::Derivative;
Expand Down Expand Up @@ -234,6 +234,7 @@ impl Display for ReconcileReason {
}

const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
const SCHEDULER_DEBOUNCE_PERIOD: Duration = Duration::from_secs(1);

/// Apply a reconciler to an input stream, with a given retry policy
///
Expand All @@ -252,6 +253,7 @@ pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
debounce: Option<Duration>,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Resource + 'static,
Expand All @@ -276,7 +278,7 @@ where
.map_err(Error::QueueError)
.map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
run_at: Instant::now(),
})
.on_complete(async move {
// On error: scheduler has already been shut down and there is nothing for us to do
Expand All @@ -291,39 +293,42 @@ where
)),
// all the Oks from the select gets passed through the scheduler stream, and are then executed
move |s| {
Runner::new(scheduler(s), move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(obj, err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
Runner::new(
scheduler(s, debounce.or(Some(SCHEDULER_DEBOUNCE_PERIOD))),
move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(obj, err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
}
})
},
)
.delay_tasks_until(async move {
tracing::debug!("applier runner held until store is ready");
let res = delay_store.wait_until_ready().await;
Expand Down Expand Up @@ -417,6 +422,22 @@ where
}
}

/// Config contains all the options that can be used to configure
/// the behavior of the contorller.
#[derive(Clone, Debug, Default)]
pub struct Config {
/// The debounce time that allows for deduplication of events, preventing
/// unnecessary reconciliations. By default, it is set to 1 second, but users
/// should modify it according to the needs of their controller.
debounce: Option<Duration>,
}

impl Config {
pub fn debounce(&mut self, debounce: Duration) {
self.debounce = Some(debounce);
}
}

/// Controller for a Resource `K`
///
/// A controller is an infinite stream of objects to be reconciled.
Expand Down Expand Up @@ -505,6 +526,7 @@ where
forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
dyntype: K::DynamicType,
reader: Store<K>,
config: Config,
}

impl<K> Controller<K>
Expand All @@ -516,11 +538,11 @@ where
///
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
///
/// The [`Config`] controls to the possible subset of objects of `K` that you want to manage
/// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
/// and receive reconcile events for.
/// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
/// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
#[must_use]
pub fn new(main_api: Api<K>, wc: Config) -> Self
pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
where
K::DynamicType: Default,
{
Expand All @@ -531,17 +553,17 @@ where
///
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
///
/// The [`Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
/// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
/// to watch - in the Api's configured scope - and receive reconcile events for.
/// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
///
/// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
///
/// [`Config`]: crate::watcher::Config
/// [`watcher::Config`]: crate::watcher::Config
/// [`Api`]: kube_client::Api
/// [`dynamic`]: kube_client::core::dynamic
/// [`Config::default`]: crate::watcher::Config::default
pub fn new_with(main_api: Api<K>, wc: Config, dyntype: K::DynamicType) -> Self {
pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
let writer = Writer::<K>::new(dyntype.clone());
let reader = writer.as_reader();
let mut trigger_selector = stream::SelectAll::new();
Expand All @@ -564,6 +586,7 @@ where
],
dyntype,
reader,
config: Default::default(),
}
}

Expand Down Expand Up @@ -649,9 +672,17 @@ where
],
dyntype,
reader,
config: Default::default(),
}
}

/// Specify the configuration for the controller's behavior.
#[must_use]
pub fn with_config(mut self, config: Config) -> Self {
self.config = config;
self
}

/// Specify the backoff policy for "trigger" watches
///
/// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`].
Expand Down Expand Up @@ -683,7 +714,7 @@ where
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
wc: Config,
wc: watcher::Config,
) -> Self {
self.owns_with(api, (), wc)
}
Expand All @@ -696,7 +727,7 @@ where
mut self,
api: Api<Child>,
dyntype: Child::DynamicType,
wc: Config,
wc: watcher::Config,
) -> Self
where
Child::DynamicType: Debug + Eq + Hash + Clone,
Expand Down Expand Up @@ -847,7 +878,7 @@ where
pub fn watches<Other, I>(
self,
api: Api<Other>,
wc: Config,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
Expand All @@ -867,7 +898,7 @@ where
mut self,
api: Api<Other>,
dyntype: Other::DynamicType,
wc: Config,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
Expand Down Expand Up @@ -1214,6 +1245,7 @@ where
self.reader,
StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
.take_until(future::select_all(self.graceful_shutdown_selector)),
self.config.debounce,
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
}
Expand Down Expand Up @@ -1298,15 +1330,18 @@ mod tests {
let applier = applier(
|obj, _| {
Box::pin(async move {
// Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
// Try to flood the rescheduling buffer buffer by just putting it back in the queue
// almost immediately, but making sure its after the debounce time, so that the
// scheduler actuallys runs the request.
println!("reconciling {:?}", obj.metadata.name);
Ok(Action::requeue(Duration::ZERO))
Ok(Action::requeue(Duration::from_millis(2)))
})
},
|_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
Arc::new(()),
store_rx,
queue_rx.map(Result::<_, Infallible>::Ok),
Some(Duration::from_millis(1)),
);
pin_mut!(applier);
for i in 0..items {
Expand Down
9 changes: 7 additions & 2 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ mod tests {
let mut count = 0;
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let mut runner = Box::pin(
Runner::new(scheduler(sched_rx), |_| {
// The debounce period needs to zero because otherwise the scheduler has a default
// debounce period of 1 ms, which will lead to the second request to be discarded.
Runner::new(scheduler(sched_rx, Some(Duration::ZERO)), |_| {
count += 1;
// Panic if this ref is already held, to simulate some unsafe action..
let mutex_ref = rc.borrow_mut();
Expand Down Expand Up @@ -203,7 +205,7 @@ mod tests {
// pause();
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let (result_tx, result_rx) = oneshot::channel();
let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg));
let mut runner = Runner::new(scheduler(sched_rx, None), |msg: &u8| futures::future::ready(*msg));
// Start a background task that starts listening /before/ we enqueue the message
// We can't just use Stream::poll_next(), since that bypasses the waker system
Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() });
Expand Down Expand Up @@ -242,6 +244,7 @@ mod tests {
run_at: Instant::now(),
}])
.chain(stream::pending()),
None,
),
|msg| {
assert!(*is_ready.lock().unwrap());
Expand Down Expand Up @@ -278,6 +281,7 @@ mod tests {
},
])
.chain(stream::pending()),
None,
),
|msg| {
assert!(*is_ready.lock().unwrap());
Expand Down Expand Up @@ -313,6 +317,7 @@ mod tests {
run_at: Instant::now(),
}])
.chain(stream::pending()),
None,
),
|()| {
panic!("run_msg should never be invoked if readiness gate fails");
Expand Down
Loading

0 comments on commit fe94fee

Please sign in to comment.