From 6e5c847adb09f1eb83d60cdfbdacf0d058a2424c Mon Sep 17 00:00:00 2001 From: clux Date: Fri, 14 Jul 2023 18:53:28 +0100 Subject: [PATCH 1/4] Add WatchStreamExt::reflect to allow chaining on a reflector Signed-off-by: clux --- examples/crd_reflector.rs | 12 ++-- examples/node_reflector.rs | 9 ++- examples/pod_reflector.rs | 22 +++--- kube-runtime/src/utils/mod.rs | 2 + kube-runtime/src/utils/reflect.rs | 105 ++++++++++++++++++++++++++++ kube-runtime/src/utils/watch_ext.rs | 17 ++++- 6 files changed, 149 insertions(+), 18 deletions(-) create mode 100644 kube-runtime/src/utils/reflect.rs diff --git a/examples/crd_reflector.rs b/examples/crd_reflector.rs index efb0e54ea..f01fb9165 100644 --- a/examples/crd_reflector.rs +++ b/examples/crd_reflector.rs @@ -36,10 +36,15 @@ async fn main() -> anyhow::Result<()> { let (reader, writer) = reflector::store::(); let foos: Api = Api::default_namespaced(client); - let wc = watcher::Config::default().timeout(20); // low timeout in this example - let rf = reflector(writer, watcher(foos, wc)); + let wc = watcher::Config::default().any_semantic(); + let mut stream = watcher(foos, wc) + .default_backoff() + .reflect(writer) + .applied_objects() + .boxed(); tokio::spawn(async move { + reader.wait_until_ready().await.unwrap(); loop { // Periodically read our state // while this runs you can kubectl apply -f crd-baz.yaml or crd-qux.yaml and see it works @@ -48,8 +53,7 @@ async fn main() -> anyhow::Result<()> { info!("Current crds: {:?}", crds); } }); - let mut rfa = rf.applied_objects().boxed(); - while let Some(event) = rfa.try_next().await? { + while let Some(event) = stream.try_next().await? { info!("saw {}", event.name_any()); } Ok(()) diff --git a/examples/node_reflector.rs b/examples/node_reflector.rs index 12d6afb12..32339bdf3 100644 --- a/examples/node_reflector.rs +++ b/examples/node_reflector.rs @@ -18,12 +18,15 @@ async fn main() -> anyhow::Result<()> { .timeout(10); // short watch timeout in this example let (reader, writer) = reflector::store(); - let rf = reflector(writer, watcher(nodes, wc)) + let stream = watcher(nodes, wc) + .default_backoff() + .reflect(writer) .applied_objects() .predicate_filter(predicates::labels.combine(predicates::annotations)); // NB: requires an unstable feature // Periodically read our state in the background tokio::spawn(async move { + reader.wait_until_ready().await.unwrap(); loop { let nodes = reader.state().iter().map(|r| r.name_any()).collect::>(); info!("Current {} nodes: {:?}", nodes.len(), nodes); @@ -32,8 +35,8 @@ async fn main() -> anyhow::Result<()> { }); // Log applied events with changes from the reflector - pin_mut!(rf); - while let Some(node) = rf.try_next().await? { + pin_mut!(stream); + while let Some(node) = stream.try_next().await? { info!("saw node {} with new labels/annots", node.name_any()); } diff --git a/examples/pod_reflector.rs b/examples/pod_reflector.rs index cbcda3d46..ab41f791f 100644 --- a/examples/pod_reflector.rs +++ b/examples/pod_reflector.rs @@ -18,6 +18,7 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(async move { // Show state every 5 seconds of watching loop { + reader.wait_until_ready().await.unwrap(); tokio::time::sleep(std::time::Duration::from_secs(5)).await; info!("Current pod count: {}", reader.state().len()); // full information with debug logs @@ -28,19 +29,20 @@ async fn main() -> anyhow::Result<()> { } }); - let stream = watcher(api, watcher::Config::default()).modify(|pod| { - // memory optimization for our store - we don't care about managed fields/annotations/status - pod.managed_fields_mut().clear(); - pod.annotations_mut().clear(); - pod.status = None; - }); - - let rf = reflector(writer, stream) + let stream = watcher(api, watcher::Config::default().any_semantic()) + .default_backoff() + .modify(|pod| { + // memory optimization for our store - we don't care about managed fields/annotations/status + pod.managed_fields_mut().clear(); + pod.annotations_mut().clear(); + pod.status = None; + }) + .reflect(writer) .applied_objects() .predicate_filter(predicates::resource_version); // NB: requires an unstable feature - futures::pin_mut!(rf); + futures::pin_mut!(stream); - while let Some(pod) = rf.try_next().await? { + while let Some(pod) = stream.try_next().await? { info!("saw {}", pod.name_any()); } Ok(()) diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 53e50af9f..0ba7c4ece 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -5,6 +5,7 @@ pub(crate) mod delayed_init; mod event_flatten; mod event_modify; #[cfg(feature = "unstable-runtime-predicates")] mod predicate; +mod reflect; mod stream_backoff; #[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe; mod watch_ext; @@ -14,6 +15,7 @@ pub use event_flatten::EventFlatten; pub use event_modify::EventModify; #[cfg(feature = "unstable-runtime-predicates")] pub use predicate::{predicates, Predicate, PredicateFilter}; +pub use reflect::Reflect; pub use stream_backoff::StreamBackoff; #[cfg(feature = "unstable-runtime-subscribe")] pub use stream_subscribe::StreamSubscribe; diff --git a/kube-runtime/src/utils/reflect.rs b/kube-runtime/src/utils/reflect.rs new file mode 100644 index 000000000..8bfd5a97c --- /dev/null +++ b/kube-runtime/src/utils/reflect.rs @@ -0,0 +1,105 @@ +use core::{ + pin::Pin, + task::{Context, Poll}, +}; + +use futures::{Stream, TryStream}; +use pin_project::pin_project; + +use crate::{ + reflector::store::Writer, + watcher::{Error, Event}, +}; +use kube_client::Resource; + +#[pin_project] +/// Stream returned by the [`reflect`](super::WatchStreamExt::reflect) method. +pub struct Reflect +where + K: Resource + Clone + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone, +{ + #[pin] + stream: St, + writer: Writer, +} + +impl Reflect +where + St: TryStream>, + K: Resource + Clone, + K::DynamicType: Eq + std::hash::Hash + Clone, +{ + pub(super) fn new(stream: St, writer: Writer) -> Reflect { + Self { stream, writer } + } +} + +impl Stream for Reflect +where + K: Resource + Clone, + K::DynamicType: Eq + std::hash::Hash + Clone, + St: Stream, Error>>, +{ + type Item = Result, Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.project(); + //stream.inspect_ok(move |event| writer.apply_watcher_event(event)) + me.stream.as_mut().poll_next(cx).map_ok(move |event| { + me.writer.apply_watcher_event(&event); + event + }) + } +} + +#[cfg(test)] +pub(crate) mod test { + use std::{task::Poll, vec}; + + use super::{Error, Event, Reflect}; + use crate::reflector; + use futures::{pin_mut, poll, stream, StreamExt}; + use k8s_openapi::api::core::v1::Pod; + + fn testpod(name: &str) -> Pod { + let mut pod = Pod::default(); + pod.metadata.name = Some(name.to_string()); + pod + } + + #[tokio::test] + async fn reflect_passes_events_through() { + let foo = testpod("foo"); + let bar = testpod("bar"); + let st = stream::iter([ + Ok(Event::Applied(foo.clone())), + Err(Error::TooManyObjects), + Ok(Event::Restarted(vec![foo, bar])), + ]); + let (reader, writer) = reflector::store(); + + let reflect = Reflect::new(st, writer); + pin_mut!(reflect); + assert_eq!(reader.len(), 0); + + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Applied(_)))) + )); + assert_eq!(reader.len(), 1); + + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Err(Error::TooManyObjects))) + )); + assert_eq!(reader.len(), 1); + + let restarted = poll!(reflect.next()); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restarted(_)))))); + assert_eq!(reader.len(), 2); + + assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); + assert_eq!(reader.len(), 2); + } +} diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index e8cda00e0..b75e8fef2 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -6,7 +6,9 @@ use crate::{ utils::{event_flatten::EventFlatten, event_modify::EventModify, stream_backoff::StreamBackoff}, watcher, }; -#[cfg(feature = "unstable-runtime-predicates")] use kube_client::Resource; +use kube_client::Resource; + +use crate::{reflector::store::Writer, utils::Reflect}; use crate::watcher::DefaultBackoff; use backoff::backoff::Backoff; @@ -190,6 +192,19 @@ pub trait WatchStreamExt: Stream { { StreamSubscribe::new(self) } + + /// Reflect a [`watcher()`] stream into a [`Store`] through a [`Writer`] + /// + /// Returns the stream unmodified, but passes every [`watcher::Event`] through a [`Writer`]. + /// This populates a [`Store`] as the stream is polled. + fn reflect(self, writer: Writer) -> Reflect + where + Self: Stream, watcher::Error>> + Sized, + K: Resource + Clone + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone, + { + Reflect::new(self, writer) + } } impl WatchStreamExt for St where St: Stream {} From b3733de3aa73283f6f0461f292855d0f23d228c8 Mon Sep 17 00:00:00 2001 From: Eirik A Date: Sun, 16 Jul 2023 22:14:22 +0100 Subject: [PATCH 2/4] Update kube-runtime/src/utils/watch_ext.rs Co-authored-by: David Herberth Signed-off-by: Eirik A --- kube-runtime/src/utils/watch_ext.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index b75e8fef2..401835786 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -199,7 +199,7 @@ pub trait WatchStreamExt: Stream { /// This populates a [`Store`] as the stream is polled. fn reflect(self, writer: Writer) -> Reflect where - Self: Stream, watcher::Error>> + Sized, + Self: Stream>> + Sized, K: Resource + Clone + 'static, K::DynamicType: Eq + std::hash::Hash + Clone, { From 437ab66e54df4846161a58b261876dd8acf914b2 Mon Sep 17 00:00:00 2001 From: clux Date: Sun, 16 Jul 2023 22:14:30 +0100 Subject: [PATCH 3/4] leftover line + doc/macro inversion Signed-off-by: clux --- kube-runtime/src/utils/reflect.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kube-runtime/src/utils/reflect.rs b/kube-runtime/src/utils/reflect.rs index 8bfd5a97c..43fa65c2a 100644 --- a/kube-runtime/src/utils/reflect.rs +++ b/kube-runtime/src/utils/reflect.rs @@ -12,8 +12,8 @@ use crate::{ }; use kube_client::Resource; +/// Stream returned by the [`reflect`](super::WatchStreamExt::reflect) method #[pin_project] -/// Stream returned by the [`reflect`](super::WatchStreamExt::reflect) method. pub struct Reflect where K: Resource + Clone + 'static, @@ -45,7 +45,6 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); - //stream.inspect_ok(move |event| writer.apply_watcher_event(event)) me.stream.as_mut().poll_next(cx).map_ok(move |event| { me.writer.apply_watcher_event(&event); event From 562b6c1fa7f433a5f3cddb801518172c49e56d9e Mon Sep 17 00:00:00 2001 From: clux Date: Sun, 16 Jul 2023 22:39:10 +0100 Subject: [PATCH 4/4] better doc examples Signed-off-by: clux --- kube-runtime/src/controller/mod.rs | 19 ++++++------- kube-runtime/src/utils/watch_ext.rs | 42 +++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index c52762cdd..80ed72fb9 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -436,7 +436,7 @@ where /// General setup: /// ```no_run /// use kube::{Api, Client, CustomResource}; -/// use kube::runtime::{controller::{Controller, Action}, watcher, reflector}; +/// use kube::runtime::{controller::{Controller, Action}, watcher}; /// # use serde::{Deserialize, Serialize}; /// # use tokio::time::Duration; /// use futures::StreamExt; @@ -589,7 +589,9 @@ where /// # async fn doc(client: kube::Client) { /// let api: Api = Api::default_namespaced(client); /// let (reader, writer) = reflector::store(); - /// let deploys = reflector(writer, watcher(api, watcher::Config::default())) + /// let deploys = watcher(api, watcher::Config::default()) + /// .default_backoff() + /// .reflect(writer) /// .applied_objects() /// .predicate_filter(predicates::generation); /// @@ -1053,13 +1055,12 @@ where /// # /// // Store can be used in the reconciler instead of querying Kube /// let (pod_store, writer) = reflector::store(); - /// let pod_stream = reflector( - /// writer, - /// watcher(Api::::all(client.clone()), Config::default()), - /// ) - /// .applied_objects() - /// // Map to the relevant `ObjectRef` to reconcile - /// .map_ok(|pod| ObjectRef::new(&format!("{}-cm", pod.name_any())).within(&pod.namespace().unwrap())); + /// let pod_stream = watcher(Api::::all(client.clone()), Config::default()) + /// .default_backoff() + /// .reflect(writer) + /// .applied_objects() + /// // Map to the relevant `ObjectRef` to reconcile + /// .map_ok(|pod| ObjectRef::new(&format!("{}-cm", pod.name_any())).within(&pod.namespace().unwrap())); /// /// Controller::new(Api::::all(client), Config::default()) /// .reconcile_on(pod_stream) diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 401835786..6f9994586 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -197,6 +197,48 @@ pub trait WatchStreamExt: Stream { /// /// Returns the stream unmodified, but passes every [`watcher::Event`] through a [`Writer`]. /// This populates a [`Store`] as the stream is polled. + /// + /// ## Usage + /// ```no_run + /// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; + /// # use std::time::Duration; + /// # use tracing::{info, warn}; + /// use kube::{Api, Client, ResourceExt}; + /// use kube_runtime::{watcher, WatchStreamExt, reflector}; + /// use k8s_openapi::api::apps::v1::Deployment; + /// # async fn wrapper() -> Result<(), Box> { + /// # let client: kube::Client = todo!(); + /// + /// let deploys: Api = Api::default_namespaced(client); + /// let (reader, writer) = reflector::store::(); + /// + /// tokio::spawn(async move { + /// // start polling the store once the reader is ready + /// reader.wait_until_ready().await.unwrap(); + /// loop { + /// let names = reader.state().iter().map(|d| d.name_any()).collect::>(); + /// info!("Current {} deploys: {:?}", names.len(), names); + /// tokio::time::sleep(Duration::from_secs(10)).await; + /// } + /// }); + /// + /// // configure the watcher stream and populate the store while polling + /// watcher(deploys, watcher::Config::default()) + /// .reflect(writer) + /// .applied_objects() + /// .for_each(|res| async move { + /// match res { + /// Ok(o) => info!("saw {}", o.name_any()), + /// Err(e) => warn!("watcher error: {}", e), + /// } + /// }) + /// .await; + /// + /// # Ok(()) + /// # } + /// ``` + /// + /// [`Store`]: crate::reflector::Store fn reflect(self, writer: Writer) -> Reflect where Self: Stream>> + Sized,