Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow subscribing to reflector store updates #1426

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ thiserror = "1.0.29"
backoff = "0.4.0"
async-trait = "0.1.64"
hashbrown = "0.14.0"
async-stream = "0.3.5"

[dependencies.k8s-openapi]
version = "0.21.0"
Expand Down
14 changes: 12 additions & 2 deletions kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ pub mod store;

pub use self::object_ref::{Extra as ObjectRefExtra, Lookup, ObjectRef};
use crate::watcher;
use futures::{Stream, TryStreamExt};
use async_stream::stream;
use futures::{Stream, StreamExt};
use std::hash::Hash;
pub use store::{store, Store};

Expand Down Expand Up @@ -94,7 +95,16 @@ where
K::DynamicType: Eq + Hash + Clone,
W: Stream<Item = watcher::Result<watcher::Event<K>>>,
{
stream.inspect_ok(move |event| writer.apply_watcher_event(event))
// TODO: why does pin! not work? not sure...
let mut stream = Box::pin(stream);
stream! {
while let Some(event) = stream.next().await {
if let Ok(event) = &event {
writer.apply_watcher_event(event).await;
}
yield event;
}
}
}

#[cfg(test)]
Expand Down
126 changes: 88 additions & 38 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use super::{Lookup, ObjectRef};
use crate::{
utils::delayed_init::{self, DelayedInit},
utils::{
broadcast::Broadcaster,
delayed_init::{self, DelayedInit},
},
watcher,
};
use ahash::AHashMap;
use ahash::{AHashMap, HashSet};
use derivative::Derivative;
use futures::Stream;
use parking_lot::RwLock;
use std::{fmt::Debug, hash::Hash, sync::Arc};
use thiserror::Error;

type Cache<K> = Arc<RwLock<AHashMap<ObjectRef<K>, Arc<K>>>>;

/// A writable Store handle
///
Expand All @@ -20,10 +23,10 @@ pub struct Writer<K: 'static + Lookup>
where
K::DynamicType: Eq + Hash,
{
store: Cache<K>,
store: Arc<Inner<K>>,
dyntype: K::DynamicType,
ready_tx: Option<delayed_init::Initializer<()>>,
ready_rx: Arc<DelayedInit<()>>,
touched_objects_broadcaster: Broadcaster<Arc<ObjectRef<K>>>,
}

impl<K: 'static + Lookup + Clone> Writer<K>
Expand All @@ -37,10 +40,13 @@ where
pub fn new(dyntype: K::DynamicType) -> Self {
let (ready_tx, ready_rx) = DelayedInit::new();
Writer {
store: Default::default(),
store: Arc::new(Inner {
cache: Default::default(),
ready_rx,
}),
dyntype,
ready_tx: Some(ready_tx),
ready_rx: Arc::new(ready_rx),
touched_objects_broadcaster: Broadcaster::new(1),
}
}

Expand All @@ -52,28 +58,54 @@ where
pub fn as_reader(&self) -> Store<K> {
Store {
store: self.store.clone(),
ready_rx: self.ready_rx.clone(),
}
}

/// Returns a [`Stream`] of objects that have been touched in any way (created/modified/deleted).
///
/// Note that delays in handling objects may affect all subscribers, as well as the backing reflector (or other source).
pub fn subscribe_touched_objects(&mut self) -> impl Stream<Item = Arc<ObjectRef<K>>> {
self.touched_objects_broadcaster.subscribe()
}

/// Applies a single watcher event to the store
pub fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
pub async fn apply_watcher_event(&mut self, event: &watcher::Event<K>) {
match event {
watcher::Event::Applied(obj) => {
let key = obj.to_object_ref(self.dyntype.clone());
let key = Arc::new(obj.to_object_ref(self.dyntype.clone()));
let obj = Arc::new(obj.clone());
self.store.write().insert(key, obj);
self.store.cache.write().insert(key.clone(), obj);
self.touched_objects_broadcaster.send(key).await;
}
watcher::Event::Deleted(obj) => {
let key = obj.to_object_ref(self.dyntype.clone());
self.store.write().remove(&key);
let removed_obj = self.store.cache.write().remove_entry(&key);
if let Some((key, _)) = removed_obj {
self.touched_objects_broadcaster.send(key).await;
}
}
watcher::Event::Restarted(new_objs) => {
let new_objs = new_objs
let mut new_objs = new_objs
.iter()
.map(|obj| (obj.to_object_ref(self.dyntype.clone()), Arc::new(obj.clone())))
.map(|obj| {
(
Arc::new(obj.to_object_ref(self.dyntype.clone())),
Arc::new(obj.clone()),
)
})
.collect::<AHashMap<_, _>>();
*self.store.write() = new_objs;
let mutated_obj_refs = {
let mut cache = self.store.cache.write();
std::mem::swap(&mut *cache, &mut new_objs);
let old_objs = new_objs;
old_objs
.into_keys()
.chain(cache.keys().cloned())
.collect::<HashSet<_>>()
};
for key in mutated_obj_refs {
self.touched_objects_broadcaster.send(key).await;
}
}
}

Expand Down Expand Up @@ -105,8 +137,7 @@ pub struct Store<K: 'static + Lookup>
where
K::DynamicType: Hash + Eq,
{
store: Cache<K>,
ready_rx: Arc<DelayedInit<()>>,
store: Arc<Inner<K>>,
}

#[derive(Debug, Error)]
Expand All @@ -125,7 +156,7 @@ where
/// # Errors
/// Returns an error if the [`Writer`] was dropped before any value was written.
pub async fn wait_until_ready(&self) -> Result<(), WriterDropped> {
self.ready_rx.get().await.map_err(WriterDropped)
self.store.ready_rx.get().await.map_err(WriterDropped)
}

/// Retrieve a `clone()` of the entry referred to by `key`, if it is in the cache.
Expand All @@ -139,7 +170,7 @@ where
/// reasonable `error_policy`.
#[must_use]
pub fn get(&self, key: &ObjectRef<K>) -> Option<Arc<K>> {
let store = self.store.read();
let store = self.store.cache.read();
store
.get(key)
// Try to erase the namespace and try again, in case the object is cluster-scoped
Expand All @@ -157,7 +188,7 @@ where
/// Return a full snapshot of the current values
#[must_use]
pub fn state(&self) -> Vec<Arc<K>> {
let s = self.store.read();
let s = self.store.cache.read();
s.values().cloned().collect()
}

Expand All @@ -168,6 +199,7 @@ where
P: Fn(&K) -> bool,
{
self.store
.cache
.read()
.iter()
.map(|(_, k)| k)
Expand All @@ -178,13 +210,13 @@ where
/// Return the number of elements in the store
#[must_use]
pub fn len(&self) -> usize {
self.store.read().len()
self.store.cache.read().len()
}

/// Return whether the store is empty
#[must_use]
pub fn is_empty(&self) -> bool {
self.store.read().is_empty()
self.store.cache.read().is_empty()
}
}

Expand All @@ -203,15 +235,25 @@ where
(r, w)
}

/// Shared data between all facets (the [`Writer`] and all [readers](`Store`) of a store)
///
/// This should never be exposed outside of this module.
#[derive(Derivative)]
#[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"))]
struct Inner<K: Lookup> {
cache: RwLock<AHashMap<Arc<ObjectRef<K>>, Arc<K>>>,
ready_rx: DelayedInit<()>,
}

#[cfg(test)]
mod tests {
use super::{store, Writer};
use crate::{reflector::ObjectRef, watcher};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::api::ObjectMeta;

#[test]
fn should_allow_getting_namespaced_object_by_namespaced_ref() {
#[tokio::test]
async fn should_allow_getting_namespaced_object_by_namespaced_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
Expand All @@ -221,13 +263,15 @@ mod tests {
..ConfigMap::default()
};
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
store_w
.apply_watcher_event(&watcher::Event::Applied(cm.clone()))
.await;
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}

#[test]
fn should_not_allow_getting_namespaced_object_by_clusterscoped_ref() {
#[tokio::test]
async fn should_not_allow_getting_namespaced_object_by_clusterscoped_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
Expand All @@ -239,13 +283,13 @@ mod tests {
let mut cluster_cm = cm.clone();
cluster_cm.metadata.namespace = None;
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm));
store_w.apply_watcher_event(&watcher::Event::Applied(cm)).await;
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None);
}

#[test]
fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() {
#[tokio::test]
async fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
Expand All @@ -255,12 +299,14 @@ mod tests {
..ConfigMap::default()
};
let (store, mut writer) = store();
writer.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
writer
.apply_watcher_event(&watcher::Event::Applied(cm.clone()))
.await;
assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
}

#[test]
fn should_allow_getting_clusterscoped_object_by_namespaced_ref() {
#[tokio::test]
async fn should_allow_getting_clusterscoped_object_by_namespaced_ref() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
Expand All @@ -273,13 +319,15 @@ mod tests {
let mut nsed_cm = cm.clone();
nsed_cm.metadata.namespace = Some("ns".to_string());
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
store_w
.apply_watcher_event(&watcher::Event::Applied(cm.clone()))
.await;
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm));
}

#[test]
fn find_element_in_store() {
#[tokio::test]
async fn find_element_in_store() {
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("obj".to_string()),
Expand All @@ -292,14 +340,16 @@ mod tests {

let (reader, mut writer) = store::<ConfigMap>();
assert!(reader.is_empty());
writer.apply_watcher_event(&watcher::Event::Applied(cm));
writer.apply_watcher_event(&watcher::Event::Applied(cm)).await;

assert_eq!(reader.len(), 1);
assert!(reader.find(|k| k.metadata.generation == Some(1234)).is_none());

target_cm.metadata.name = Some("obj1".to_string());
target_cm.metadata.generation = Some(1234);
writer.apply_watcher_event(&watcher::Event::Applied(target_cm.clone()));
writer
.apply_watcher_event(&watcher::Event::Applied(target_cm.clone()))
.await;
assert!(!reader.is_empty());
assert_eq!(reader.len(), 2);
let found = reader.find(|k| k.metadata.generation == Some(1234));
Expand Down
Loading
Loading