Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to watch all resources based on labelSelector? #302

Closed
PeterGrace opened this issue Aug 6, 2020 · 6 comments
Closed

How to watch all resources based on labelSelector? #302

PeterGrace opened this issue Aug 6, 2020 · 6 comments
Labels
api Api abstraction related wontfix This will not be worked on

Comments

@PeterGrace
Copy link

I am new to Rust, but trying to learn the language better by writing something that is useful to me. I'm attempting to write a daemon that will watch all objects based on a labelSelector and if it notices that a matched object has changed, it will notify me. The idea is that I can theoretically send a webhook when a Pod or a Deployment spec are updated, alerting someone to the fact that they changed.

I was starting with the configmap_watcher example, but then realized that the watcher was (obviously) only scoped to ConfigMaps. I saw a reference in #194 that mentions that a user could implement a Api::all() connection with the generic Resource, which I think will do exactly what I am looking for. When I try to do:

let foo: Api<k8s_openapi::Resource> = Api::all(client);

I am warned that the Resource type does not have a size. The compiler helpfully suggests that I use the dyn keyword but that doesn't seem to be the right choice.

Thanks for any hints you could provide, or alternative approaches you might suggest. My other thought was I could potentially open multiple Api::all() watchers for the various types I want to look at (Pod, Deployment, ReplicaSet, etc) but I wasn't sure if that was necessarily very efficient either.

@clux clux added the api Api abstraction related label Aug 6, 2020
@clux
Copy link
Member

clux commented Aug 6, 2020

Oh, yeah, you are right. We don't support the kubectl get all case at the moment because it's not a single Resource type.

It also does not appear to be found on a single resource url either; if you use kubectl get all -v=9 it literally just does a list on all resources sequentially.

@clux
Copy link
Member

clux commented Aug 6, 2020

If I were you I would put a watcher on Api<Deployment> (which covers the replicaset and pod in terms of ownership) plus whatever top level objects you need. It's going to be more efficient.

For the get all feature: Don't see a clean way to support a fake all-endpoint without doing a series of calls to the supported endpoints api resources, and then looping over those results dynamically. It is possible we could implement that, but it'd be an annoying request that would have an annoying responce type (e.g.Vec<(Resource, serde_json::Value)>`) which would have to be manually deserialized.

EDIT: marking as wontfix as I don't think there's anything reasonable we can do with all resources atm.

@clux clux added the wontfix This will not be worked on label Aug 6, 2020
@PeterGrace
Copy link
Author

PeterGrace commented Aug 6, 2020

So in a scenario where I am making numerous watchers, i.e. Api<Deployment>, Api<Secret>, Api<PersistentVolumeClaim> etc, would I have to create individual watchers for this? Would it be possble to pass an array of Api connections to watch them in tandem? I'm thinking about in the example how we have an event loop here:

   let mut w = watcher(cms, lp).boxed();
    while let Some(event) = w.try_next().await? {
        info!("Got: {:?}", event);
    }

If I were to create n watchers, I'd probably have to coalesce them into an iterable and do a 'is there an event available' foreach rather than the await for the next event, I guess?

@clux
Copy link
Member

clux commented Aug 6, 2020

Yeah, you would need individual watchers. How you run all of them in the cleanest way I'm not sure what's best. You can:

  • use loop around tokio::select! to await the next element from a set of streams (pretty deep nested logic)
  • use Stream::select_all to combine the stream into a bigger stream (then use that in a try_next(), i think)

The second one is probably harder to get to compile, but I expect it will look nicer. You might have to have a unified output type for all the streams so you might need to pass each of them through some map that is turns the Items into an enum (that covers all your types).

We actually do really need a good user example for the latter. If you manage to get that to work; I'd appreciate you posting it here :-)

@clux
Copy link
Member

clux commented Aug 6, 2020

Try something like this:

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    std::env::set_var("RUST_LOG", "info,node_watcher=debug,kube=debug");
    env_logger::init();
    let client = Client::try_default().await?;

    let deploys: Api<Deployment> = Api::all(client.clone());
    let cms: Api<ConfigMap> = Api::all(client.clone());

    let deploy_stream = try_flatten_applied(watcher(deploys, ListParams::default()))
        .map_ok(|d| Watched::Deploy(d))
        .boxed();
    let configmap_stream = try_flatten_applied(watcher(cms, ListParams::default()))
        .map_ok(|cm| Watched::Config(cm))
        .boxed();

    let mut combo_stream = stream::select_all(vec![deploy_stream, configmap_stream]);
    while let Some(o) = combo_stream.try_next().await? {
        match o {
            Watched::Config(cm) => info!("Got configmap: {}", Meta::name(&cm)),
            Watched::Deploy(d) => info!("Got deployment: {}", Meta::name(&d)),
        }
    }
    Ok(())
}

enum Watched {
    Config(ConfigMap),
    Deploy(Deployment),
}

@clux clux closed this as completed in 5aaad7d Aug 6, 2020
@PeterGrace
Copy link
Author

Thanks @clux, this works quite well!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api Api abstraction related wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

2 participants