Skip to content

Commit

Permalink
owner refs cached
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Sep 28, 2023
1 parent deb9af6 commit c4252f6
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 9 deletions.
43 changes: 43 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ path = "lib/rust/lib.rs"
[dependencies]
anyhow = "1.0.75"
async-recursion = "1.0.5"
cached = { version = "0.45.1", features = ["async"] }
chrono = "0.4.26"
clap = { version = "4.3.21", features = ["derive"] }
futures = "0.3.28"
Expand Down
32 changes: 23 additions & 9 deletions lib/rust/watch/pod_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use std::sync::{
};

use async_recursion::async_recursion;
use cached::{
Cached,
SizedCache,
};
use futures::stream::{
StreamExt,
TryStreamExt,
Expand All @@ -21,10 +25,6 @@ use kube::{
};
use tokio::runtime::Handle;
use tokio::task::block_in_place;
use tokio::time::{
sleep,
Duration,
};
use tracing::*;

use super::PodStream;
Expand All @@ -41,6 +41,8 @@ use crate::time::{
};
use crate::trace::Tracer;

type OwnerCache = SizedCache<String, Vec<metav1::OwnerReference>>;

pub struct PodWatcher {
clock: Box<dyn Clockable + Send>,
pod_stream: PodStream,
Expand Down Expand Up @@ -75,33 +77,45 @@ impl PodWatcher {
}

#[async_recursion(?Send)]
async fn compute_owner_chain(apiset: &mut ApiSet, obj: &impl Resource) -> anyhow::Result<Vec<metav1::OwnerReference>> {
info!("computing owner references for {}", namespaced_name(obj));
async fn compute_owner_chain(
apiset: &mut ApiSet,
obj: &impl Resource,
cache: &mut OwnerCache,
) -> anyhow::Result<Vec<metav1::OwnerReference>> {
let ns_name = namespaced_name(obj);

if let Some(owners) = cache.cache_get(&ns_name) {
info!("found owners for {} in cache", ns_name);
return Ok(owners.clone());
}

info!("computing owner references for {}", ns_name);
let mut owners = Vec::from(obj.owner_references());

for rf in obj.owner_references() {
let gvk = GVK::from_owner_ref(rf)?;
sleep(Duration::from_secs(10)).await;
let api = apiset.api_for(gvk).await?;
let resp = api.list(&list_params_for(&obj.namespace().unwrap(), &rf.name)).await?;
if resp.items.len() != 1 {
bail!("could not find single owner for {}, found {:?}", namespaced_name(obj), resp.items);
}

let owner = &resp.items[0];
owners.extend(compute_owner_chain(apiset, owner).await?);
owners.extend(compute_owner_chain(apiset, owner, cache).await?);
}

cache.cache_set(ns_name, owners.clone());
Ok(owners)
}

fn build_stream_for_pods(mut apiset: ApiSet) -> PodStream {
let pod_api: kube::Api<corev1::Pod> = kube::Api::all(apiset.client().clone());
let mut cache: SizedCache<String, Vec<metav1::OwnerReference>> = SizedCache::with_size(100);
watcher(pod_api, Default::default())
.modify(move |pod| {
block_in_place(|| {
Handle::current().block_on(async {
let owners = compute_owner_chain(&mut apiset, pod).await;
let owners = compute_owner_chain(&mut apiset, pod, &mut cache).await;
pod.metadata.owner_references = owners.ok();
})
});
Expand Down

0 comments on commit c4252f6

Please sign in to comment.