From 5aaad7dd26847480fb78b6ab4a8d1492b22fc7f6 Mon Sep 17 00:00:00 2001 From: clux Date: Thu, 6 Aug 2020 17:32:30 +0100 Subject: [PATCH] multi_watcher example - closes #302 --- Makefile | 4 +++- examples/Cargo.toml | 4 ++++ examples/README.md | 4 +++- examples/multi_watcher.rs | 46 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 56 insertions(+), 2 deletions(-) create mode 100644 examples/multi_watcher.rs diff --git a/Makefile b/Makefile index afb7e0865..1f767131a 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,9 @@ publish: minikube-create: sudo rm -rf /tmp/juju-mk* /tmp/minikube* - sudo -E minikube start --driver=none --kubernetes-version v1.18.6 --extra-config kubeadm.ignore-preflight-errors=SystemVerification + minikube start --driver=docker \ + --kubernetes-version v1.18.6 \ + --extra-config kubeadm.ignore-preflight-errors=SystemVerification minikube: kubectl config set-context --cluster=minikube --user=minikube --namespace=apps minikube diff --git a/examples/Cargo.toml b/examples/Cargo.toml index b6627f918..e3a1d98e6 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -76,6 +76,10 @@ path = "job_api.rs" name = "log_stream" path = "log_stream.rs" +[[example]] +name = "multi_watcher" +path = "multi_watcher.rs" + [[example]] name = "pod_api" path = "pod_api.rs" diff --git a/examples/README.md b/examples/README.md index ff51398e9..0292fd1f2 100644 --- a/examples/README.md +++ b/examples/README.md @@ -29,11 +29,13 @@ These example watch a single resource and does some basic filtering on the watch ```sh # watch all configmap events in a namespace -cargo run --example configmap_watcher +NAMESPACE=dev cargo run --example configmap_watcher # watch unready pods in a namespace NAMESPACE=dev cargo run --example pod_watcher # watch all event events cargo run --example event_watcher +# watch deployments, configmaps, secrets in one namespace +NAMESPACE=dev cargo run --example multi_watcher # watch broken nodes and cross reference with events api cargo run --example node_watcher ``` diff --git a/examples/multi_watcher.rs b/examples/multi_watcher.rs new file mode 100644 index 000000000..868cab60b --- /dev/null +++ b/examples/multi_watcher.rs @@ -0,0 +1,46 @@ +#[macro_use] extern crate log; +use futures::{stream, StreamExt, TryStreamExt}; +use k8s_openapi::api::{ + apps::v1::Deployment, + core::v1::{ConfigMap, Secret}, +}; +use kube::{ + api::{Api, ListParams, Meta}, + Client, +}; +use kube_runtime::{utils::try_flatten_applied, watcher}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + std::env::set_var("RUST_LOG", "info,node_watcher=debug,kube=debug"); + env_logger::init(); + let client = Client::try_default().await?; + let namespace = std::env::var("NAMESPACE").unwrap_or("default".into()); + + let deploys: Api = Api::namespaced(client.clone(), &namespace); + let cms: Api = Api::namespaced(client.clone(), &namespace); + let secret: Api = Api::namespaced(client.clone(), &namespace); + let dep_watcher = watcher(deploys, ListParams::default()); + let cm_watcher = watcher(cms, ListParams::default()); + let sec_watcher = watcher(secret, ListParams::default()); + + // select on applied events from all watchers + enum Watched { + Config(ConfigMap), + Deploy(Deployment), + Secret(Secret), + } + let mut combo_stream = stream::select_all(vec![ + try_flatten_applied(dep_watcher).map_ok(Watched::Deploy).boxed(), + try_flatten_applied(cm_watcher).map_ok(Watched::Config).boxed(), + try_flatten_applied(sec_watcher).map_ok(Watched::Secret).boxed(), + ]); + while let Some(o) = combo_stream.try_next().await? { + match o { + Watched::Config(cm) => info!("Got configmap: {}", Meta::name(&cm)), + Watched::Deploy(d) => info!("Got deployment: {}", Meta::name(&d)), + Watched::Secret(s) => info!("Got secret: {}", Meta::name(&s)), + } + } + Ok(()) +}