From 8022ddf01f578592b638e56baf9b0c600530fb22 Mon Sep 17 00:00:00 2001 From: clux Date: Sat, 7 Oct 2023 21:37:05 +0100 Subject: [PATCH 1/6] Better docs for `Controller::reconcile_on` The unstable method currently suggests that this method can be used to help share a store with the reconciler. This is actually nothing specific to `reconcile_on`, and you can do the same with the streams interface with `watches_stream`. We made the `reconcile_on` right before `watches_stream` became a thing so this makes sense. Have reworded the example to highlight that this has a better use-case with actually getting arbitrary third-party info, and then mapping that to kubernetes objects. First example that came to mind was using an IntervalStream with tokio and just cycle through a bunch of objects, but there may be a better example that does not pull in the extra dev dep. Signed-off-by: clux --- kube-runtime/Cargo.toml | 1 + kube-runtime/src/controller/mod.rs | 30 +++++++++++++----------------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 1404d4855..2d4c2e308 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -56,6 +56,7 @@ tokio = { version = "1.14.0", features = ["full", "test-util"] } rand = "0.8.0" schemars = "0.8.6" tracing-subscriber = "0.3.17" +tokio-stream = "0.1.14" [dev-dependencies.k8s-openapi] version = "0.20.0" diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 095f9cf25..2f4b5cc2d 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1087,41 +1087,37 @@ where /// Trigger the reconciliation process for a managed object `ObjectRef` whenever `trigger` emits a value /// - /// For example, this can be used to watch resources once and use the stream to trigger reconciliation and also keep a cache of those objects. - /// That way it's possible to use this up to date cache instead of querying Kubernetes to access those resources + /// This can be used to inject reconciliations for specific objects from an external resource. /// /// # Example: /// /// ```no_run /// # async { /// # use futures::{StreamExt, TryStreamExt}; - /// # use k8s_openapi::api::core::v1::{ConfigMap, Pod}; + /// # use k8s_openapi::api::core::v1::{ConfigMap}; /// # use kube::api::Api; /// # use kube::runtime::controller::Action; /// # use kube::runtime::reflector::{ObjectRef, Store}; /// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt}; /// # use kube::runtime::watcher::Config; /// # use kube::{Client, Error, ResourceExt}; + /// # use tokio_stream::wrappers::IntervalStream; /// # use std::future; + /// # use std::time::Duration; /// # use std::sync::Arc; /// # /// # let client: Client = todo!(); - /// # async fn reconcile(_: Arc, _: Arc>) -> Result { Ok(Action::await_change()) } - /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc>) -> Action { Action::await_change() } + /// # async fn reconcile(_: Arc, _: Arc<()>) -> Result { Ok(Action::await_change()) } + /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } /// # - /// // Store can be used in the reconciler instead of querying Kube - /// let (pod_store, writer) = reflector::store(); - /// let pod_stream = watcher(Api::::all(client.clone()), Config::default()) - /// .default_backoff() - /// .reflect(writer) - /// .applied_objects() - /// // Map to the relevant `ObjectRef` to reconcile - /// .map_ok(|pod| ObjectRef::new(&format!("{}-cm", pod.name_any())).within(&pod.namespace().unwrap())); + /// let ns = "external-configs".to_string(); + /// let mut next_object = [ObjectRef::new("managed-cm1").within(&ns)].into_iter().cycle(); + /// let interval = tokio::time::interval(Duration::from_secs(60)); // hit the object every minute + /// let external_stream = IntervalStream::new(interval).map(|_| Ok(next_object.next().unwrap())); /// - /// Controller::new(Api::::all(client), Config::default()) - /// .reconcile_on(pod_stream) - /// // The store can be re-used between controllers and even inspected from the reconciler through [Context] - /// .run(reconcile, error_policy, Arc::new(pod_store)) + /// Controller::new(Api::::namespaced(client, &ns), Config::default()) + /// .reconcile_on(external_stream) + /// .run(reconcile, error_policy, Arc::new(())) /// .for_each(|_| future::ready(())) /// .await; /// # }; From e86923aed93e397f91374e88de82380d37566d0c Mon Sep 17 00:00:00 2001 From: clux Date: Sat, 7 Oct 2023 22:00:06 +0100 Subject: [PATCH 2/6] do the same as on kube.rs Signed-off-by: clux --- kube-runtime/src/controller/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 2f4b5cc2d..de082ceaa 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1111,7 +1111,8 @@ where /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } /// # /// let ns = "external-configs".to_string(); - /// let mut next_object = [ObjectRef::new("managed-cm1").within(&ns)].into_iter().cycle(); + /// let externals = [ObjectRef::new("managed-cm1").within(&ns)]; + /// let mut next_object = externals.into_iter().cycle(); /// let interval = tokio::time::interval(Duration::from_secs(60)); // hit the object every minute /// let external_stream = IntervalStream::new(interval).map(|_| Ok(next_object.next().unwrap())); /// From 0c569110242e06ed6e8b585392207c787647eb1f Mon Sep 17 00:00:00 2001 From: Eirik A Date: Mon, 9 Oct 2023 18:51:43 +0100 Subject: [PATCH 3/6] Update kube-runtime/src/controller/mod.rs Co-authored-by: David Herberth Signed-off-by: Eirik A --- kube-runtime/src/controller/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index de082ceaa..2837e6169 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1087,7 +1087,7 @@ where /// Trigger the reconciliation process for a managed object `ObjectRef` whenever `trigger` emits a value /// - /// This can be used to inject reconciliations for specific objects from an external resource. + /// This can be used to inject reconciliations for specific objects from an external resource. /// /// # Example: /// From 22b95c5925c84abe59ada46494d8a0a3247f5e21 Mon Sep 17 00:00:00 2001 From: clux Date: Mon, 9 Oct 2023 19:09:28 +0100 Subject: [PATCH 4/6] use david's suggestion Signed-off-by: clux --- kube-runtime/Cargo.toml | 1 - kube-runtime/src/controller/mod.rs | 18 +++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 2d4c2e308..1404d4855 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -56,7 +56,6 @@ tokio = { version = "1.14.0", features = ["full", "test-util"] } rand = "0.8.0" schemars = "0.8.6" tracing-subscriber = "0.3.17" -tokio-stream = "0.1.14" [dev-dependencies.k8s-openapi] version = "0.20.0" diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 2837e6169..7a3f4845e 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1093,7 +1093,7 @@ where /// /// ```no_run /// # async { - /// # use futures::{StreamExt, TryStreamExt}; + /// # use futures::{StreamExt, Stream, stream, TryStreamExt}; /// # use k8s_openapi::api::core::v1::{ConfigMap}; /// # use kube::api::Api; /// # use kube::runtime::controller::Action; @@ -1101,20 +1101,20 @@ where /// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt}; /// # use kube::runtime::watcher::Config; /// # use kube::{Client, Error, ResourceExt}; - /// # use tokio_stream::wrappers::IntervalStream; /// # use std::future; - /// # use std::time::Duration; /// # use std::sync::Arc; /// # /// # let client: Client = todo!(); /// # async fn reconcile(_: Arc, _: Arc<()>) -> Result { Ok(Action::await_change()) } /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } - /// # - /// let ns = "external-configs".to_string(); - /// let externals = [ObjectRef::new("managed-cm1").within(&ns)]; - /// let mut next_object = externals.into_iter().cycle(); - /// let interval = tokio::time::interval(Duration::from_secs(60)); // hit the object every minute - /// let external_stream = IntervalStream::new(interval).map(|_| Ok(next_object.next().unwrap())); + /// # fn watch_external_objects() -> impl Stream + Send + 'static { stream::iter(vec![]) } + /// # let ns = "controller-ns".to_string(); + /// struct ExternalObject { + /// name: String, + /// } + /// let external_stream = watch_external_objects().map(|ext| { + /// Ok(ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)) + /// }); /// /// Controller::new(Api::::namespaced(client, &ns), Config::default()) /// .reconcile_on(external_stream) From f9d39426bfbc52bcb0a4d02882dfb3e6eabcbdf2 Mon Sep 17 00:00:00 2001 From: clux Date: Mon, 9 Oct 2023 19:11:45 +0100 Subject: [PATCH 5/6] no need for send + static Signed-off-by: clux --- kube-runtime/src/controller/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 7a3f4845e..69399d89c 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1107,7 +1107,7 @@ where /// # let client: Client = todo!(); /// # async fn reconcile(_: Arc, _: Arc<()>) -> Result { Ok(Action::await_change()) } /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } - /// # fn watch_external_objects() -> impl Stream + Send + 'static { stream::iter(vec![]) } + /// # fn watch_external_objects() -> impl Stream { stream::iter(vec![]) } /// # let ns = "controller-ns".to_string(); /// struct ExternalObject { /// name: String, From 1a0912bd394e1d65f74ff8e0d99f6c14ae458710 Mon Sep 17 00:00:00 2001 From: clux Date: Mon, 9 Oct 2023 22:00:42 +0100 Subject: [PATCH 6/6] stop pretending to handle errors in reconcile_on Signed-off-by: clux --- kube-runtime/src/controller/mod.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 69399d89c..cd85f241f 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1113,7 +1113,7 @@ where /// name: String, /// } /// let external_stream = watch_external_objects().map(|ext| { - /// Ok(ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)) + /// ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns) /// }); /// /// Controller::new(Api::::namespaced(client, &ns), Config::default()) @@ -1125,15 +1125,14 @@ where /// ``` #[cfg(feature = "unstable-runtime-reconcile-on")] #[must_use] - pub fn reconcile_on( - mut self, - trigger: impl Stream, watcher::Error>> + Send + 'static, - ) -> Self { + pub fn reconcile_on(mut self, trigger: impl Stream> + Send + 'static) -> Self { self.trigger_selector.push( trigger - .map_ok(move |obj| ReconcileRequest { - obj_ref: obj, - reason: ReconcileReason::Unknown, + .map(move |obj| { + Ok(ReconcileRequest { + obj_ref: obj, + reason: ReconcileReason::Unknown, + }) }) .boxed(), );