Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow configuring controller's concurrency #1277

Merged
merged 4 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ObjectMeta, Patch, PatchParams, Resource},
runtime::{
controller::{Action, Controller},
controller::{Action, Config, Controller},
watcher,
},
Client, CustomResource,
Expand Down Expand Up @@ -102,8 +102,12 @@ async fn main() -> Result<()> {
}
});

// limit the controller to running a maximum of two concurrent reconciliations
let config = Config::default().concurrency(2);

Controller::new(cmgs, watcher::Config::default())
.owns(cms, watcher::Config::default())
.with_config(config)
.reconcile_all_on(reload_rx.map(|_| ()))
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(Data { client }))
Expand Down
4 changes: 4 additions & 0 deletions kube-runtime/src/controller/future_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ where
pub fn contains_key(&self, key: &K) -> bool {
self.futures.contains_key(key)
}

pub fn len(&self) -> usize {
self.futures.len()
}
}

impl<K, F> Stream for FutureHashMap<K, F>
Expand Down
83 changes: 51 additions & 32 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,39 +293,43 @@
)),
// all the Oks from the select gets passed through the scheduler stream, and are then executed
move |s| {
Runner::new(debounced_scheduler(s, config.debounce), move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(obj, err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()
Runner::new(
debounced_scheduler(s, config.debounce),
config.concurrency,

Check warning on line 298 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L298

Added line #L298 was not covered by tests
move |request| {
let request = request.clone();
clux marked this conversation as resolved.
Show resolved Hide resolved
match store.get(&request.obj_ref) {
Some(obj) => {
let scheduler_tx = scheduler_tx.clone();
let error_policy_ctx = context.clone();
let error_policy = error_policy.clone();
let reconciler_span = info_span!(
"reconciling object",
"object.ref" = %request.obj_ref,
object.reason = %request.reason

Check warning on line 309 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L307-L309

Added lines #L307 - L309 were not covered by tests
);
reconciler_span
.in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
.into_future()

Check warning on line 313 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L313

Added line #L313 was not covered by tests
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(obj, err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
// Reconciler errors are OK from the applier's PoV, we need to apply the error policy
// to them separately
.map(|res| Ok((request.obj_ref, res)))
})
.instrument(reconciler_span)
.left_future()

Check warning on line 327 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L327

Added line #L327 was not covered by tests
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),

Check warning on line 329 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L329

Added line #L329 was not covered by tests
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
}
})
},
)
.delay_tasks_until(async move {
tracing::debug!("applier runner held until store is ready");
let res = delay_store.wait_until_ready().await;
Expand Down Expand Up @@ -423,6 +427,7 @@
#[derive(Clone, Debug, Default)]
pub struct Config {
debounce: Duration,
concurrency: u16,
}

impl Config {
Expand All @@ -442,6 +447,20 @@
self.debounce = debounce;
self
}

/// The number of concurrent reconciliations of that are allowed to run at an given moment.
///
/// This can be adjusted to the controller's needs to increase
/// performance and/or make performance predictable. By default, its 0 meaning
/// the controller runs with unbounded concurrency.
///
/// Note that despite concurrency, a controller never schedules concurrent reconciles
/// on the same object.
#[must_use]
pub fn concurrency(mut self, concurrency: u16) -> Self {
self.concurrency = concurrency;
self

Check warning on line 462 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L460-L462

Added lines #L460 - L462 were not covered by tests
}
}

/// Controller for a Resource `K`
Expand Down
148 changes: 141 additions & 7 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,26 @@
ready_to_execute_after: future::Fuse<Ready>,
is_ready_to_execute: bool,
stopped: bool,
max_concurrent_executions: u16,
}

impl<T, R, F, MkF> Runner<T, R, F, MkF>
where
F: Future + Unpin,
MkF: FnMut(&T) -> F,
{
pub fn new(scheduler: Scheduler<T, R>, run_msg: MkF) -> Self {
/// Creates a new [`Runner`]. [`max_concurrent_executions`] can be used to
/// limit the number of items are run concurrently. It can be set to 0 to
/// allow for unbounded concurrency.
pub fn new(scheduler: Scheduler<T, R>, max_concurrent_executions: u16, run_msg: MkF) -> Self {
Self {
scheduler,
run_msg,
slots: FutureHashMap::default(),
ready_to_execute_after: future::ready(Ok(())).fuse(),
is_ready_to_execute: false,
stopped: false,
max_concurrent_executions,
}
}

Expand All @@ -67,10 +72,12 @@
ready_to_execute_after: ready_to_execute_after.fuse(),
is_ready_to_execute: false,
stopped: false,
max_concurrent_executions: self.max_concurrent_executions,
}
}
}

#[allow(clippy::match_wildcard_for_single_variants)]
impl<T, R, F, MkF, Ready, ReadyErr> Stream for Runner<T, R, F, MkF, Ready>
where
T: Eq + Hash + Clone + Unpin,
Expand Down Expand Up @@ -102,12 +109,26 @@
Poll::Pending => {}
}
loop {
// If we are at our limit or not ready to start executing, then there's
// no point in trying to get something from the scheduler, so just put
// all expired messages emitted from the queue into pending.
if (*this.max_concurrent_executions > 0
&& slots.len() >= *this.max_concurrent_executions as usize)
|| !*this.is_ready_to_execute
{
match scheduler.as_mut().hold().poll_next_unpin(cx) {
Poll::Pending | Poll::Ready(None) => break Poll::Pending,
// The above future never returns Poll::Ready(Some(_)).
_ => unreachable!(),
};
};

// Try to take take a new message that isn't already being processed
// leave the already-processing ones in the queue, so that we can take them once
// we're free again.
let next_msg_poll = scheduler
.as_mut()
.hold_unless(|msg| *this.is_ready_to_execute && !slots.contains_key(msg))
.hold_unless(|msg| !slots.contains_key(msg))
.poll_next_unpin(cx);
match next_msg_poll {
Poll::Ready(Some(msg)) => {
Expand Down Expand Up @@ -142,13 +163,21 @@
};
use futures::{
channel::{mpsc, oneshot},
future, poll, stream, SinkExt, StreamExt, TryStreamExt,
future::{self},
poll, stream, Future, SinkExt, StreamExt, TryStreamExt,
};
use std::{
cell::RefCell,
collections::HashMap,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
time::Duration,
};
use std::{cell::RefCell, collections::HashMap, sync::Mutex, time::Duration};
use tokio::{
runtime::Handle,
task::yield_now,
time::{pause, sleep, timeout, Instant},
time::{advance, pause, sleep, timeout, Instant},
};

#[tokio::test]
Expand All @@ -160,7 +189,7 @@
let mut runner = Box::pin(
// The debounce period needs to zero because a debounce period > 0
// will lead to the second request to be discarded.
Runner::new(scheduler(sched_rx), |_| {
Runner::new(scheduler(sched_rx), 0, |_| {
count += 1;
// Panic if this ref is already held, to simulate some unsafe action..
let mutex_ref = rc.borrow_mut();
Expand Down Expand Up @@ -205,7 +234,7 @@
// pause();
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let (result_tx, result_rx) = oneshot::channel();
let mut runner = Runner::new(scheduler(sched_rx), |msg: &u8| futures::future::ready(*msg));
let mut runner = Runner::new(scheduler(sched_rx), 0, |msg: &u8| futures::future::ready(*msg));
// Start a background task that starts listening /before/ we enqueue the message
// We can't just use Stream::poll_next(), since that bypasses the waker system
Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() });
Expand Down Expand Up @@ -245,6 +274,7 @@
}])
.chain(stream::pending()),
),
0,
|msg| {
assert!(*is_ready.lock().unwrap());
future::ready(*msg)
Expand Down Expand Up @@ -281,6 +311,7 @@
])
.chain(stream::pending()),
),
0,
|msg| {
assert!(*is_ready.lock().unwrap());
future::ready(*msg)
Expand Down Expand Up @@ -316,6 +347,7 @@
}])
.chain(stream::pending()),
),
0,
|()| {
panic!("run_msg should never be invoked if readiness gate fails");
// It's "useless", but it helps to direct rustc to the correct types
Expand All @@ -332,4 +364,106 @@
Error::Readiness(delayed_init::InitDropped)
));
}

// A Future that is Ready after the specified duration from its initialization.
struct DurationalFuture {
start: Instant,
ready_after: Duration,
}

impl DurationalFuture {
fn new(expires_in: Duration) -> Self {
let start = Instant::now();
DurationalFuture {
start,
ready_after: expires_in,
}
}
}

impl Future for DurationalFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let now = Instant::now();
if now.duration_since(self.start) > self.ready_after {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

#[tokio::test]
async fn runner_should_respect_max_concurrent_executions() {
clux marked this conversation as resolved.
Show resolved Hide resolved
pause();

let count = Arc::new(Mutex::new(0));
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let mut runner = Box::pin(
Runner::new(scheduler(sched_rx), 2, |_| {
let mut num = count.lock().unwrap();
*num += 1;
DurationalFuture::new(Duration::from_secs(2))
})
.for_each(|_| async {}),
);

sched_tx
.send(ScheduleRequest {
message: 1,
run_at: Instant::now(),
})
.await
.unwrap();
assert!(poll!(runner.as_mut()).is_pending());
sched_tx
.send(ScheduleRequest {
message: 2,
run_at: Instant::now(),
})
.await
.unwrap();
assert!(poll!(runner.as_mut()).is_pending());
sched_tx
.send(ScheduleRequest {
message: 3,
run_at: Instant::now(),
})
.await
.unwrap();
assert!(poll!(runner.as_mut()).is_pending());
// Assert that we only ran two out of the three requests
assert_eq!(*count.lock().unwrap(), 2);
clux marked this conversation as resolved.
Show resolved Hide resolved

advance(Duration::from_secs(3)).await;
assert!(poll!(runner.as_mut()).is_pending());
// Assert that we run the third request when we have the capacity to
assert_eq!(*count.lock().unwrap(), 3);
clux marked this conversation as resolved.
Show resolved Hide resolved
advance(Duration::from_secs(3)).await;
assert!(poll!(runner.as_mut()).is_pending());

let (mut sched_tx, sched_rx) = mpsc::unbounded();
let mut runner = Box::pin(
Runner::new(scheduler(sched_rx), 1, |_| {
DurationalFuture::new(Duration::from_secs(2))
})
.for_each(|_| async {}),

Check warning on line 452 in kube-runtime/src/controller/runner.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/runner.rs#L452

Added line #L452 was not covered by tests
);

sched_tx
.send(ScheduleRequest {
message: 1,
run_at: Instant::now(),
})
.await
.unwrap();
assert!(poll!(runner.as_mut()).is_pending());

// Drop the sender to test that we stop the runner when the requests
// stream finishes.
drop(sched_tx);
assert_eq!(poll!(runner.as_mut()), Poll::Pending);
}
}
Loading
Loading