Skip to content

Commit

Permalink
revert reflector using informers
Browse files Browse the repository at this point in the history
this will work in kubernetes >= 1.16 - see #219
for now, don't break the it.
  • Loading branch information
clux committed Apr 7, 2020
1 parent 8cccbda commit 4d42621
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 32 deletions.
3 changes: 3 additions & 0 deletions kube/src/runtime/informer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ use std::{sync::Arc, time::Duration};
/// but we have configured timeouts such that this should not happen frequently.
///
/// On boot, the initial watch causes added events for every currently live object.
///
/// Because of https://github.com/clux/kube-rs/issues/219 we recommend you use this
/// with kubernetes >= 1.16 and watch bookmarks enabled.
#[derive(Clone)]
pub struct Informer<K>
where
Expand Down
132 changes: 100 additions & 32 deletions kube/src/runtime/reflector.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{
api::{Api, ListParams, Meta, Resource, WatchEvent},
runtime::Informer,
api::{Api, ListParams, Meta, WatchEvent},
Error, Result,
};
use futures::{lock::Mutex, StreamExt, TryStreamExt};
use futures::{future::FutureExt, lock::Mutex, pin_mut, select, TryStreamExt};
use serde::de::DeserializeOwned;
use tokio::{signal, time::delay_for};

use std::{collections::BTreeMap, sync::Arc};
use std::{collections::BTreeMap, sync::Arc, time::Duration};

/// A reflection of state for a Kubernetes ['Api'] resource
///
Expand All @@ -26,33 +26,32 @@ where
K: Clone + DeserializeOwned + Send + Meta,
{
state: Arc<Mutex<State<K>>>,
informer: Informer<K>,
resource: Resource,
params: ListParams,
api: Api<K>,
}

impl<K> Reflector<K>
where
K: Clone + DeserializeOwned + Meta + Send + Sync,
K: Clone + DeserializeOwned + Meta + Send,
{
/// Create a reflector on an api resource
pub fn new(api: Api<K>) -> Self {
Reflector {
resource: api.resource.clone(),
informer: Informer::new(api),
api,
params: ListParams::default(),
state: Default::default(),
}
}

/// Modify the default watch parameters for the underlying watch
pub fn params(mut self, lp: ListParams) -> Self {
self.informer = self.informer.params(lp);
self.params = lp;
self
}

/// Start the reflectors self-driving polling
pub async fn run(self) -> Result<()> {
use futures::{future::FutureExt, pin_mut, select};
use tokio::signal;
self.reset().await?;
loop {
let signal_fut = signal::ctrl_c().fuse(); // TODO: SIGTERM
let stream_fut = self.poll().fuse();
Expand All @@ -64,8 +63,11 @@ where
},
stream = stream_fut => {
if let Err(e) = stream {
error!("Kube state failed to recover: {}", e);
return Err(e);
warn!("Poll error on {}: {}: {:?}", self.api.resource.kind, e, e);
// If desynched due to mismatching resourceVersion, retry in a bit
let dur = Duration::from_secs(10);
delay_for(dur).await;
self.reset().await?; // propagate error if this failed..
}
},
complete => continue, // another poll
Expand All @@ -76,25 +78,42 @@ where

/// A single poll call to modify the internal state
async fn poll(&self) -> Result<()> {
let kind = &self.resource.kind;
trace!("Polling {}", kind);
let mut stream = self.informer.poll().await?.boxed();
let kind = &self.api.resource.kind;
let resource_version = self.state.lock().await.version.clone();
trace!("Polling {} from resourceVersion={}", kind, resource_version);
let stream = self.api.watch(&self.params, &resource_version).await?;

pin_mut!(stream);
// For every event, modify our state
while let Some(ev) = stream.try_next().await? {
let mut state = self.state.lock().await;
match &ev {
WatchEvent::Added(o)
| WatchEvent::Modified(o)
| WatchEvent::Deleted(o)
| WatchEvent::Bookmark(o) => {
// always store the last seen resourceVersion
if let Some(nv) = Meta::resource_ver(o) {
trace!("Updating reflector version for {} to {}", kind, nv);
state.version = nv.clone();
}
}
_ => {}
}

let data = &mut state.data;
match ev {
WatchEvent::Added(o) => {
debug!("Adding {} to {}", Meta::name(&o), kind);
state.entry(ObjectId::key_for(&o)).or_insert_with(|| o.clone());
data.entry(ObjectId::key_for(&o)).or_insert_with(|| o.clone());
}
WatchEvent::Modified(o) => {
debug!("Modifying {} in {}", Meta::name(&o), kind);
state.entry(ObjectId::key_for(&o)).and_modify(|e| *e = o.clone());
data.entry(ObjectId::key_for(&o)).and_modify(|e| *e = o.clone());
}
WatchEvent::Deleted(o) => {
debug!("Removing {} from {}", Meta::name(&o), kind);
state.remove(&ObjectId::key_for(&o));
data.remove(&ObjectId::key_for(&o));
}
WatchEvent::Bookmark(o) => {
debug!("Bookmarking {} from {}", Meta::name(&o), kind);
Expand All @@ -108,12 +127,52 @@ where
Ok(())
}

/// Reset the state of the underlying informer and clear the cache
pub async fn reset(&self) -> Result<()> {
trace!("Resetting {}", self.api.resource.kind);
// Simplified for k8s >= 1.16
//*self.state.lock().await = Default::default();
//self.informer.reset().await

// For now:
let (data, version) = self.get_full_resource_entries().await?;
*self.state.lock().await = State { data, version };
Ok(())
}

/// Legacy helper for kubernetes < 1.16
///
/// Needed to do an initial list operation because of https://github.com/clux/kube-rs/issues/219
/// Soon, this goes away as we drop support for k8s < 1.16
async fn get_full_resource_entries(&self) -> Result<(Cache<K>, String)> {
let res = self.api.list(&self.params).await?;
let version = res.metadata.resource_version.unwrap_or_default();
trace!(
"Got {} {} at resourceVersion={:?}",
res.items.len(),
self.api.resource.kind,
version
);
let mut data = BTreeMap::new();
for i in res.items {
// The non-generic parts we care about are spec + status
data.insert(ObjectId::key_for(&i), i);
}
let keys = data
.keys()
.map(ObjectId::to_string)
.collect::<Vec<_>>()
.join(", ");
debug!("Initialized with: [{}]", keys);
Ok((data, version))
}

/// Read data for users of the reflector
///
/// This is instant if you are reading and writing from the same context.
pub async fn state(&self) -> Result<Vec<K>> {
let state = self.state.lock().await;
Ok(state.values().cloned().collect::<Vec<K>>())
Ok(state.data.values().cloned().collect::<Vec<K>>())
}

/// Read a single entry by name
Expand All @@ -124,10 +183,10 @@ where
pub async fn get(&self, name: &str) -> Result<Option<K>> {
let id = ObjectId {
name: name.into(),
namespace: self.resource.namespace.clone(),
namespace: self.api.resource.namespace.clone(),
};

Ok(self.state.lock().await.get(&id).map(Clone::clone))
Ok(self.state.lock().await.data.get(&id).map(Clone::clone))
}

/// Read a single entry by name within a specific namespace
Expand All @@ -139,14 +198,7 @@ where
name: name.into(),
namespace: Some(ns.into()),
};
Ok(self.state.lock().await.get(&id).map(Clone::clone))
}

/// Reset the state of the underlying informer and clear the cache
pub async fn reset(&self) {
trace!("Resetting {}", self.resource.kind);
*self.state.lock().await = Default::default();
self.informer.reset().await
Ok(self.state.lock().await.data.get(&id).map(Clone::clone))
}
}

Expand Down Expand Up @@ -177,5 +229,21 @@ impl ObjectId {
}
}

/// Internal shared state of Reflector
///
/// Can remove this in k8s >= 1.16 once this uses Informer
struct State<K> {
data: Cache<K>,
version: String,
}

impl<K> Default for State<K> {
fn default() -> Self {
State {
data: Default::default(),
version: 0.to_string(),
}
}
}
/// Internal representation for Reflector
type State<K> = BTreeMap<ObjectId, K>;
type Cache<K> = BTreeMap<ObjectId, K>;

0 comments on commit 4d42621

Please sign in to comment.