diff --git a/Cargo.lock b/Cargo.lock index 0baa7f0f..a4ab27cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -933,6 +933,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -1993,6 +2002,7 @@ dependencies = [ "chrono", "clap", "futures", + "itertools", "k8s-openapi", "kube", "paste", diff --git a/Cargo.toml b/Cargo.toml index a33a311d..3059d1be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ cached = { version = "0.45.1", features = ["async"] } chrono = "0.4.26" clap = { version = "4.3.21", features = ["derive"] } futures = "0.3.28" +itertools = "0.11.0" k8s-openapi = { version = "0.19.0", features = ["v1_27"] } paste = "1.0.14" reqwest = { version = "0.11.18", default-features = false, features = ["json", "rustls-tls"] } diff --git a/ctrl/controller.rs b/ctrl/controller.rs index 99066440..0553c35b 100644 --- a/ctrl/controller.rs +++ b/ctrl/controller.rs @@ -7,10 +7,7 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; use kube::runtime::controller::Action; use kube::ResourceExt; use reqwest::Url; -use simkube::k8s::{ - add_common_fields, - namespaced_name, -}; +use simkube::k8s::add_common_fields; use simkube::prelude::*; use simkube::trace::storage; use tokio::time::Duration; @@ -117,6 +114,6 @@ pub(crate) async fn reconcile( } pub(crate) fn error_policy(simulation: Arc, error: &ReconcileError, _: Arc) -> Action { - warn!("reconcile failed on simulation {}: {:?}", namespaced_name(simulation.deref()), error); + warn!("reconcile failed on simulation {}: {:?}", simulation.namespaced_name(), error); Action::requeue(Duration::from_secs(5 * 60)) } diff --git a/driver/main.rs b/driver/main.rs index f78ddcae..bf0e480b 100644 --- a/driver/main.rs +++ b/driver/main.rs @@ -1,5 +1,4 @@ use std::cmp::max; -use std::collections::BTreeMap; use std::fs; use std::time::Duration; @@ -15,6 +14,7 @@ use kube::api::{ use kube::ResourceExt; use serde_json::json; use simkube::jsonutils; +use simkube::k8s::macros::*; use simkube::k8s::{ add_common_fields, prefixed_ns, @@ -48,7 +48,7 @@ fn build_virtual_ns(sim_name: &str, ns_name: &str, sim_root: &SimulationRoot) -> let mut ns = corev1::Namespace { metadata: metav1::ObjectMeta { name: Some(ns_name.into()), - labels: Some(BTreeMap::from([(VIRTUAL_LABEL_KEY.into(), "true".into())])), + labels: klabel!(VIRTUAL_LABEL_KEY = "true"), ..Default::default() }, ..Default::default() diff --git a/lib/rust/errors.rs b/lib/rust/errors.rs index f8e5245b..e9b32d5a 100644 --- a/lib/rust/errors.rs +++ b/lib/rust/errors.rs @@ -8,8 +8,8 @@ pub(crate) use thiserror::Error; pub type EmptyResult = anyhow::Result<()>; -macro_rules! err_impl_helper { - ($errtype:ident, $item:ident, String) => { +macro_rules! err_impl { + (@hidden $errtype:ident, $item:ident, String) => { paste! { pub(crate) fn [<$item:snake>](in_: &str) -> anyhow::Error { anyhow!{$errtype::$item(in_.into())} @@ -17,31 +17,26 @@ macro_rules! err_impl_helper { } }; - ($errtype:ident, $item:ident, $($dtype:tt)::+) => { + (@hidden $errtype:ident, $item:ident, $($dtype:tt)::+) => { paste! { pub(crate) fn [<$item:snake>](in_: &$($dtype)::+) -> anyhow::Error { anyhow!{$errtype::$item(in_.clone())} } } }; -} -macro_rules! err_impl { ($errtype:ident, $(#[$errinfo:meta] $item:ident($($dtype:tt)::+),)+ - ) => ( + ) => { #[derive(Debug, Error)] pub(crate) enum $errtype { $(#[$errinfo] $item($($dtype)::+)),+ } impl $errtype { - $(err_impl_helper! {$errtype, $item, $($dtype)::+})+ + $(err_impl! {@hidden $errtype, $item, $($dtype)::+})+ } - ) + }; } -pub(crate) use { - err_impl, - err_impl_helper, -}; +pub(crate) use err_impl; diff --git a/lib/rust/k8s/container.rs b/lib/rust/k8s/container.rs new file mode 100644 index 00000000..d69673fe --- /dev/null +++ b/lib/rust/k8s/container.rs @@ -0,0 +1,27 @@ +use super::*; + +impl StartEndTimeable for corev1::ContainerState { + fn start_ts(&self) -> anyhow::Result> { + match self { + corev1::ContainerState { running: Some(r), terminated: None, waiting: None } => { + Ok(Some(r.started_at.as_ref().unwrap().0.timestamp())) + }, + corev1::ContainerState { running: None, terminated: Some(t), waiting: None } => { + Ok(Some(t.started_at.as_ref().unwrap().0.timestamp())) + }, + corev1::ContainerState { running: None, terminated: None, waiting: Some(_) } => Ok(None), + _ => Err(KubernetesError::malformed_container_state(self)), + } + } + + fn end_ts(&self) -> anyhow::Result> { + match self { + corev1::ContainerState { running: Some(_), terminated: None, waiting: None } => Ok(None), + corev1::ContainerState { running: None, terminated: Some(t), waiting: None } => { + Ok(Some(t.finished_at.as_ref().unwrap().0.timestamp())) + }, + corev1::ContainerState { running: None, terminated: None, waiting: Some(_) } => Ok(None), + _ => Err(KubernetesError::malformed_container_state(self)), + } + } +} diff --git a/lib/rust/k8s/macros.rs b/lib/rust/k8s/macros.rs new file mode 100644 index 00000000..4e771d6b --- /dev/null +++ b/lib/rust/k8s/macros.rs @@ -0,0 +1,10 @@ +#[macro_export] +macro_rules! klabel { + ($($key:tt=$val:literal),+$(,)?) => { + Some(BTreeMap::from([$(($key.to_string(), $val.to_string())),+])) + }; +} + +pub use std::collections::BTreeMap; + +pub use klabel; diff --git a/lib/rust/k8s/mod.rs b/lib/rust/k8s/mod.rs index 9e506260..28e9f979 100644 --- a/lib/rust/k8s/mod.rs +++ b/lib/rust/k8s/mod.rs @@ -1,10 +1,58 @@ mod apiset; +mod container; mod gvk; +pub mod macros; +mod pod; +mod pod_lifecycle; mod util; pub use apiset::*; pub use gvk::*; +use k8s_openapi::api::core::v1 as corev1; +use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; pub use util::*; +use crate::errors::*; +use crate::macros::partial_ord_eq_ref; + +const LAST_APPLIED_CONFIG_LABEL_KEY: &str = "kubectl.kubernetes.io/last-applied-configuration"; +const DEPL_REVISION_LABEL_KEY: &str = "deployment.kubernetes.io/revision"; + +err_impl! {KubernetesError, + #[error("field not found in struct: {0}")] + FieldNotFound(String), + + #[error("malformed container status: {0:?}")] + MalformedContainerState(corev1::ContainerState), + + #[error("malformed label selector: {0:?}")] + MalformedLabelSelector(metav1::LabelSelectorRequirement), +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum PodLifecycleData { + Empty, + Running(i64), + Finished(i64, i64), +} +partial_ord_eq_ref!(PodLifecycleData); + +pub trait KubeResourceExt { + fn namespaced_name(&self) -> String; + fn matches(&self, sel: &metav1::LabelSelector) -> anyhow::Result; +} + +pub trait PodExt { + fn spec(&self) -> anyhow::Result<&corev1::PodSpec>; + fn status(&self) -> anyhow::Result<&corev1::PodStatus>; + fn spec_mut(&mut self) -> &mut corev1::PodSpec; + fn status_mut(&mut self) -> &mut corev1::PodStatus; +} + +trait StartEndTimeable { + fn start_ts(&self) -> anyhow::Result>; + fn end_ts(&self) -> anyhow::Result>; +} + #[cfg(test)] -mod util_test; +mod test; diff --git a/lib/rust/k8s/pod.rs b/lib/rust/k8s/pod.rs new file mode 100644 index 00000000..c5315647 --- /dev/null +++ b/lib/rust/k8s/pod.rs @@ -0,0 +1,31 @@ +use super::*; + +impl PodExt for corev1::Pod { + fn spec(&self) -> anyhow::Result<&corev1::PodSpec> { + match self.spec.as_ref() { + None => bail!(KubernetesError::field_not_found("pod spec")), + Some(ps) => Ok(ps), + } + } + + fn status(&self) -> anyhow::Result<&corev1::PodStatus> { + match self.status.as_ref() { + None => bail!(KubernetesError::field_not_found("pod status")), + Some(ps) => Ok(ps), + } + } + + fn spec_mut(&mut self) -> &mut corev1::PodSpec { + if self.spec.is_none() { + self.spec = Some(Default::default()); + } + self.spec.as_mut().unwrap() + } + + fn status_mut(&mut self) -> &mut corev1::PodStatus { + if self.status.is_none() { + self.status = Some(Default::default()); + } + self.status.as_mut().unwrap() + } +} diff --git a/lib/rust/k8s/pod_lifecycle.rs b/lib/rust/k8s/pod_lifecycle.rs new file mode 100644 index 00000000..c44a69f7 --- /dev/null +++ b/lib/rust/k8s/pod_lifecycle.rs @@ -0,0 +1,211 @@ +use std::cmp::{ + max, + Ordering, +}; + +use chrono::Utc; +use kube::ResourceExt; + +use super::*; +use crate::util::min_some; + +impl PodLifecycleData { + fn new(start_ts: Option, end_ts: Option) -> PodLifecycleData { + match (start_ts, end_ts) { + (None, _) => PodLifecycleData::Empty, + (Some(ts), None) => PodLifecycleData::Running(ts), + (Some(start), Some(end)) => PodLifecycleData::Finished(start, end), + } + } + + pub fn new_for(pod: &corev1::Pod) -> anyhow::Result { + let (mut earliest_start_ts, mut latest_end_ts) = (None, None); + let mut terminated_container_count = 0; + + let pod_status = pod.status()?; + if let Some(cstats) = pod_status.init_container_statuses.as_ref() { + for state in cstats.iter().filter_map(|s| s.state.as_ref()) { + earliest_start_ts = min_some(state.start_ts()?, earliest_start_ts); + latest_end_ts = max(latest_end_ts, state.end_ts()?); + } + } + + if let Some(cstats) = pod_status.container_statuses.as_ref() { + for state in cstats.iter().filter_map(|s| s.state.as_ref()) { + earliest_start_ts = min_some(state.start_ts()?, earliest_start_ts); + let end_ts = state.end_ts()?; + if end_ts.is_some() { + terminated_container_count += 1; + } + latest_end_ts = max(latest_end_ts, end_ts); + } + } + + if terminated_container_count != pod.spec()?.containers.len() { + latest_end_ts = None; + } + Ok(PodLifecycleData::new(earliest_start_ts, latest_end_ts)) + } + + pub fn guess_finished( + maybe_pod: Option<&corev1::Pod>, + current_lifecycle_data: &PodLifecycleData, + ) -> PodLifecycleData { + let (new_lifecycle_data, pod_creation_ts) = match maybe_pod { + None => (PodLifecycleData::Empty, None), + Some(pod) => (PodLifecycleData::new_for(pod).unwrap_or(PodLifecycleData::Empty), pod.creation_timestamp()), + }; + + let end_ts = Some(Utc::now().timestamp()); + let start_ts = if let PodLifecycleData::Running(ts) = current_lifecycle_data { + Some(*ts) + } else if let PodLifecycleData::Running(ts) = new_lifecycle_data { + Some(ts) + } else { + pod_creation_ts.map(|t| t.0.timestamp()) + }; + + PodLifecycleData::new(start_ts, end_ts) + } + + pub fn empty(&self) -> bool { + self == PodLifecycleData::Empty + } + + pub fn running(&self) -> bool { + matches!(self, PodLifecycleData::Running(_)) + } + + pub fn finished(&self) -> bool { + matches!(self, PodLifecycleData::Finished(..)) + } +} + +impl PartialOrd for PodLifecycleData { + fn partial_cmp(&self, other: &PodLifecycleData) -> Option { + match self { + PodLifecycleData::Empty => { + if !other.empty() { + Some(Ordering::Less) + } else { + Some(Ordering::Equal) + } + }, + PodLifecycleData::Running(ts) => match other { + PodLifecycleData::Empty => Some(Ordering::Greater), + PodLifecycleData::Running(other_ts) => { + if ts == other_ts { + Some(Ordering::Equal) + } else { + None + } + }, + PodLifecycleData::Finished(..) => Some(Ordering::Less), + }, + PodLifecycleData::Finished(sts, ets) => match other { + PodLifecycleData::Empty => Some(Ordering::Greater), + PodLifecycleData::Running(other_ts) => { + if sts == other_ts { + Some(Ordering::Greater) + } else { + None + } + }, + PodLifecycleData::Finished(other_sts, other_ets) => { + if sts == other_sts && ets == other_ets { + Some(Ordering::Equal) + } else { + None + } + }, + }, + } + } +} + + +impl PartialEq> for PodLifecycleData { + fn eq(&self, other: &Option) -> bool { + match self { + PodLifecycleData::Empty => other.is_none() || other.as_ref().is_some_and(|plt| plt.empty()), + _ => other.as_ref().is_some_and(|plt| plt == self), + } + } +} + +impl PartialOrd> for PodLifecycleData { + fn partial_cmp(&self, other: &Option) -> Option { + match self { + PodLifecycleData::Empty => other.as_ref().map_or(Some(Ordering::Equal), |o| self.partial_cmp(o)), + _ => other.as_ref().map_or(Some(Ordering::Greater), |o| self.partial_cmp(o)), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_partial_eq() { + assert_eq!(PodLifecycleData::Empty, None); + assert_eq!(PodLifecycleData::Empty, Some(PodLifecycleData::Empty)); + assert_eq!(PodLifecycleData::Running(1), Some(PodLifecycleData::Running(1))); + assert_eq!(PodLifecycleData::Finished(1, 2), Some(PodLifecycleData::Finished(1, 2))); + + assert_ne!(PodLifecycleData::Empty, Some(PodLifecycleData::Running(1))); + assert_ne!(PodLifecycleData::Empty, Some(PodLifecycleData::Finished(1, 2))); + assert_ne!(PodLifecycleData::Running(1), None); + assert_ne!(PodLifecycleData::Running(1), Some(PodLifecycleData::Empty)); + assert_ne!(PodLifecycleData::Running(1), Some(PodLifecycleData::Running(2))); + assert_ne!(PodLifecycleData::Running(1), Some(PodLifecycleData::Finished(1, 2))); + assert_ne!(PodLifecycleData::Finished(1, 2), None); + assert_ne!(PodLifecycleData::Finished(1, 2), Some(PodLifecycleData::Empty)); + assert_ne!(PodLifecycleData::Finished(1, 2), Some(PodLifecycleData::Running(2))); + assert_ne!(PodLifecycleData::Finished(1, 2), Some(PodLifecycleData::Finished(1, 3))); + } + + #[test] + fn test_partial_ord() { + for cmp in [ + PodLifecycleData::Empty.partial_cmp(&None), + PodLifecycleData::Empty.partial_cmp(&Some(PodLifecycleData::Empty)), + PodLifecycleData::Running(1).partial_cmp(&Some(PodLifecycleData::Running(1))), + PodLifecycleData::Finished(1, 2).partial_cmp(&Some(PodLifecycleData::Finished(1, 2))), + ] { + assert_eq!(cmp, Some(Ordering::Equal)); + } + + for cmp in [ + PodLifecycleData::Empty.partial_cmp(&Some(PodLifecycleData::Running(1))), + PodLifecycleData::Empty.partial_cmp(&Some(PodLifecycleData::Finished(1, 2))), + PodLifecycleData::Running(1).partial_cmp(&None), + PodLifecycleData::Running(1).partial_cmp(&Some(PodLifecycleData::Empty)), + PodLifecycleData::Running(1).partial_cmp(&Some(PodLifecycleData::Running(2))), + PodLifecycleData::Running(1).partial_cmp(&Some(PodLifecycleData::Finished(1, 2))), + PodLifecycleData::Finished(1, 2).partial_cmp(&None), + PodLifecycleData::Finished(1, 2).partial_cmp(&Some(PodLifecycleData::Empty)), + PodLifecycleData::Finished(1, 2).partial_cmp(&Some(PodLifecycleData::Running(2))), + PodLifecycleData::Finished(1, 2).partial_cmp(&Some(PodLifecycleData::Finished(1, 3))), + ] { + assert_ne!(cmp, Some(Ordering::Equal)); + } + + assert!(PodLifecycleData::Empty < Some(PodLifecycleData::Running(1))); + assert!(PodLifecycleData::Empty < Some(PodLifecycleData::Finished(1, 2))); + assert!(PodLifecycleData::Running(1) < Some(PodLifecycleData::Finished(1, 2))); + + assert!(PodLifecycleData::Running(1) > None); + assert!(PodLifecycleData::Running(1) > Some(PodLifecycleData::Empty)); + assert!(PodLifecycleData::Finished(1, 2) > None); + assert!(PodLifecycleData::Finished(1, 2) > Some(PodLifecycleData::Empty)); + assert!(PodLifecycleData::Finished(1, 2) > Some(PodLifecycleData::Running(1))); + + assert!(!(PodLifecycleData::Finished(1, 2) > Some(PodLifecycleData::Running(0)))); + assert!(!(PodLifecycleData::Finished(1, 2) < Some(PodLifecycleData::Running(0)))); + assert!(!(PodLifecycleData::Finished(1, 2) > Some(PodLifecycleData::Finished(1, 3)))); + assert!(!(PodLifecycleData::Finished(1, 2) < Some(PodLifecycleData::Finished(1, 3)))); + assert!(!(PodLifecycleData::Running(1) < Some(PodLifecycleData::Running(2)))); + assert!(!(PodLifecycleData::Running(1) > Some(PodLifecycleData::Running(2)))); + } +} diff --git a/lib/rust/k8s/test/mod.rs b/lib/rust/k8s/test/mod.rs new file mode 100644 index 00000000..ae532418 --- /dev/null +++ b/lib/rust/k8s/test/mod.rs @@ -0,0 +1,19 @@ +mod pod_lifecycle_test; +mod util_test; + +use rstest::*; + +use super::macros::*; +use super::*; + +#[fixture] +fn pod() -> corev1::Pod { + corev1::Pod { + metadata: metav1::ObjectMeta { + labels: klabel!("foo" = "bar"), + ..Default::default() + }, + spec: Some(corev1::PodSpec { ..Default::default() }), + status: Some(corev1::PodStatus { ..Default::default() }), + } +} diff --git a/lib/rust/k8s/test/pod_lifecycle_test.rs b/lib/rust/k8s/test/pod_lifecycle_test.rs new file mode 100644 index 00000000..4ae3ff9a --- /dev/null +++ b/lib/rust/k8s/test/pod_lifecycle_test.rs @@ -0,0 +1,99 @@ +use chrono::{ + DateTime, + Duration, + Utc, +}; + +use super::{ + pod, + *, +}; + +#[rstest] +fn test_pod_lifecycle_data_for_empty(pod: corev1::Pod) { + let res = PodLifecycleData::new_for(&pod).unwrap(); + assert_eq!(res, PodLifecycleData::Empty); +} + +#[rstest] +#[case::with_init_container(true)] +#[case::without_init_container(false)] +fn test_pod_lifecycle_data_for_start_time_only(mut pod: corev1::Pod, #[case] init_container: bool) { + let t1 = Utc::now(); + + add_container_status_running(&mut pod.status_mut().container_statuses, &(t1 + Duration::seconds(10))); + if init_container { + add_container_status_running(&mut pod.status_mut().init_container_statuses, &t1); + add_container_status_running(&mut pod.status_mut().container_statuses, &(t1 + Duration::seconds(5))); + } else { + add_container_status_running(&mut pod.status_mut().init_container_statuses, &t1); + } + + let res = PodLifecycleData::new_for(&pod).unwrap(); + assert_eq!(res, PodLifecycleData::Running(t1.timestamp())); +} + +#[rstest] +fn test_pod_lifecycle_data_for_with_some_end_times(mut pod: corev1::Pod) { + let t1 = Utc::now(); + let tmid = t1 + Duration::seconds(5); + let t2 = t1 + Duration::seconds(10); + + add_container_status_finished(&mut pod.status_mut().init_container_statuses, &t1, &tmid); + add_container_status_running(&mut pod.status_mut().container_statuses, &tmid); + add_container_status_finished(&mut pod.status_mut().container_statuses, &tmid, &t2); + + let res = PodLifecycleData::new_for(&pod).unwrap(); + assert_eq!(res, PodLifecycleData::Running(t1.timestamp())); +} + +#[rstest] +fn test_pod_lifecycle_data_for_with_end_times(mut pod: corev1::Pod) { + let t1 = Utc::now(); + let tmid = t1 + Duration::seconds(5); + let t2 = t1 + Duration::seconds(10); + + pod.spec_mut().containers.extend(vec![Default::default(), Default::default()]); + add_container_status_finished(&mut pod.status_mut().init_container_statuses, &t1, &tmid); + add_container_status_finished(&mut pod.status_mut().container_statuses, &tmid, &(tmid + Duration::seconds(1))); + add_container_status_finished(&mut pod.status_mut().container_statuses, &tmid, &t2); + + let res = PodLifecycleData::new_for(&pod).unwrap(); + assert_eq!(res, PodLifecycleData::Finished(t1.timestamp(), t2.timestamp())); +} + +fn add_container_status_running(container_statuses: &mut Option>, t: &DateTime) { + if container_statuses.is_none() { + *container_statuses = Some(vec![]) + } + + container_statuses.as_mut().unwrap().push(corev1::ContainerStatus { + state: Some(corev1::ContainerState { + running: Some(corev1::ContainerStateRunning { started_at: Some(metav1::Time(t.clone())) }), + ..Default::default() + }), + ..Default::default() + }) +} + +fn add_container_status_finished( + container_statuses: &mut Option>, + t1: &DateTime, + t2: &DateTime, +) { + if container_statuses.is_none() { + *container_statuses = Some(vec![]) + } + + container_statuses.as_mut().unwrap().push(corev1::ContainerStatus { + state: Some(corev1::ContainerState { + terminated: Some(corev1::ContainerStateTerminated { + started_at: Some(metav1::Time(t1.clone())), + finished_at: Some(metav1::Time(t2.clone())), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }) +} diff --git a/lib/rust/k8s/util_test.rs b/lib/rust/k8s/test/util_test.rs similarity index 77% rename from lib/rust/k8s/util_test.rs rename to lib/rust/k8s/test/util_test.rs index 7c1ca171..43268894 100644 --- a/lib/rust/k8s/util_test.rs +++ b/lib/rust/k8s/test/util_test.rs @@ -1,13 +1,11 @@ -use std::collections::BTreeMap; - use chrono::Utc; -use k8s_openapi::api::core::v1 as corev1; -use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; use kube::api::DynamicObject; -use rstest::*; use serde_json::json; -use super::util::*; +use super::{ + pod, + *, +}; #[rstest] fn test_sanitize_obj() { @@ -16,11 +14,11 @@ fn test_sanitize_obj() { name: Some("test".into()), namespace: Some("test".into()), - annotations: Some(BTreeMap::from([ - ("some_random_annotation".into(), "blah".into()), - (LAST_APPLIED_CONFIG_LABEL_KEY.into(), "foo".into()), - (DEPL_REVISION_LABEL_KEY.into(), "42.5".into()), - ])), + annotations: klabel!( + "some_random_annotation" = "blah", + LAST_APPLIED_CONFIG_LABEL_KEY = "foo", + DEPL_REVISION_LABEL_KEY = "42.5", + ), creation_timestamp: Some(metav1::Time(Utc::now())), deletion_timestamp: Some(metav1::Time(Utc::now())), @@ -67,7 +65,7 @@ fn test_sanitize_obj() { assert_eq!(obj.metadata.resource_version, None); assert_eq!(obj.metadata.uid, None); - assert_eq!(obj.metadata.annotations, Some(BTreeMap::from([("some_random_annotation".into(), "blah".into())]))); + assert_eq!(obj.metadata.annotations, klabel!("some_random_annotation" = "blah")); assert!(obj .types .is_some_and(|tm| tm.api_version == "bar.blah.sh/v2" && tm.kind == "Stuff")); @@ -90,15 +88,6 @@ fn test_sanitize_obj() { ); } -#[fixture] -fn pod() -> corev1::Pod { - let labels = Some(BTreeMap::from([("foo".into(), "bar".to_string())])); - corev1::Pod { - metadata: metav1::ObjectMeta { labels, ..Default::default() }, - ..Default::default() - } -} - fn make_label_sel(key: &str, op: &str, value: Option<&str>) -> metav1::LabelSelector { metav1::LabelSelector { match_expressions: Some(vec![metav1::LabelSelectorRequirement { @@ -118,7 +107,7 @@ fn make_label_sel(key: &str, op: &str, value: Option<&str>) -> metav1::LabelSele #[case::op_not_in(OPERATOR_NOT_IN)] fn test_label_expr_match(pod: corev1::Pod, #[case] op: &str) { let sel = make_label_sel("foo", op, Some("bar")); - let res = obj_matches_selector(&pod, &sel).unwrap(); + let res = pod.matches(&sel).unwrap(); assert_eq!(res, op == OPERATOR_IN); } @@ -127,7 +116,7 @@ fn test_label_expr_match(pod: corev1::Pod, #[case] op: &str) { #[case::op_not_in(OPERATOR_NOT_IN)] fn test_label_expr_no_values(pod: corev1::Pod, #[case] op: &str) { let sel = make_label_sel("foo", op, None); - let res = obj_matches_selector(&pod, &sel).unwrap_err().downcast().unwrap(); + let res = pod.matches(&sel).unwrap_err().downcast().unwrap(); assert!(matches!(res, KubernetesError::MalformedLabelSelector(_))); } @@ -136,7 +125,7 @@ fn test_label_expr_no_values(pod: corev1::Pod, #[case] op: &str) { #[case::op_not_in(OPERATOR_NOT_IN)] fn test_label_expr_no_match(pod: corev1::Pod, #[case] op: &str) { let sel = make_label_sel("baz", op, Some("qux")); - let res = obj_matches_selector(&pod, &sel).unwrap(); + let res = pod.matches(&sel).unwrap(); assert_eq!(res, op == OPERATOR_NOT_IN); } @@ -145,7 +134,7 @@ fn test_label_expr_no_match(pod: corev1::Pod, #[case] op: &str) { #[case::op_exists(OPERATOR_DOES_NOT_EXIST)] fn test_label_expr_exists(pod: corev1::Pod, #[case] op: &str) { let sel = make_label_sel("foo", op, None); - let res = obj_matches_selector(&pod, &sel).unwrap(); + let res = pod.matches(&sel).unwrap(); assert_eq!(res, op == OPERATOR_EXISTS); } @@ -154,7 +143,7 @@ fn test_label_expr_exists(pod: corev1::Pod, #[case] op: &str) { #[case::op_not_exists(OPERATOR_DOES_NOT_EXIST)] fn test_label_expr_exists_values(pod: corev1::Pod, #[case] op: &str) { let sel = make_label_sel("foo", op, Some("bar")); - let res = obj_matches_selector(&pod, &sel).unwrap_err().downcast().unwrap(); + let res = pod.matches(&sel).unwrap_err().downcast().unwrap(); assert!(matches!(res, KubernetesError::MalformedLabelSelector(_))); } @@ -163,7 +152,7 @@ fn test_label_expr_exists_values(pod: corev1::Pod, #[case] op: &str) { #[case::op_not_in(OPERATOR_DOES_NOT_EXIST)] fn test_label_expr_not_exists(pod: corev1::Pod, #[case] op: &str) { let sel = make_label_sel("baz", op, None); - let res = obj_matches_selector(&pod, &sel).unwrap(); + let res = pod.matches(&sel).unwrap(); assert_eq!(res, op == OPERATOR_DOES_NOT_EXIST); } @@ -172,9 +161,9 @@ fn test_label_expr_not_exists(pod: corev1::Pod, #[case] op: &str) { #[case::label_no_match("baz".into())] fn test_label_match(pod: corev1::Pod, #[case] label_key: String) { let sel = metav1::LabelSelector { - match_labels: Some(BTreeMap::from([(label_key.clone(), "bar".into())])), + match_labels: klabel!(label_key = "bar"), ..Default::default() }; - let res = obj_matches_selector(&pod, &sel).unwrap(); + let res = pod.matches(&sel).unwrap(); assert_eq!(res, &label_key == "foo"); } diff --git a/lib/rust/k8s/util.rs b/lib/rust/k8s/util.rs index 8a3d2dfb..20de2984 100644 --- a/lib/rust/k8s/util.rs +++ b/lib/rust/k8s/util.rs @@ -1,6 +1,5 @@ use std::collections::BTreeMap; -use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; use kube::api::{ DynamicObject, ListParams, @@ -9,24 +8,13 @@ use kube::api::{ TypeMeta, }; use serde_json as json; -use thiserror::Error; use tracing::*; +use super::*; use crate::constants::SIMULATION_LABEL_KEY; use crate::errors::*; use crate::jsonutils; -pub(super) const LAST_APPLIED_CONFIG_LABEL_KEY: &str = "kubectl.kubernetes.io/last-applied-configuration"; -pub(super) const DEPL_REVISION_LABEL_KEY: &str = "deployment.kubernetes.io/revision"; - -err_impl! {KubernetesError, - #[error("field not found in struct: {0}")] - FieldNotFound(String), - - #[error("malformed label selector: {0:?}")] - MalformedLabelSelector(metav1::LabelSelectorRequirement), -} - pub fn add_common_fields(sim_name: &str, owner: &K, obj: &mut impl Resource) -> EmptyResult where K: Resource, @@ -67,31 +55,6 @@ pub fn make_deletable(ns_name: &str) -> DynamicObject { } } -pub fn namespaced_name(obj: &impl Resource) -> String { - match obj.namespace() { - Some(ns) => format!("{}/{}", ns, obj.name_any()), - None => obj.name_any().clone(), - } -} - -pub fn obj_matches_selector(obj: &impl Resource, sel: &metav1::LabelSelector) -> anyhow::Result { - if let Some(exprs) = &sel.match_expressions { - for expr in exprs { - if !label_expr_match(obj.labels(), expr)? { - return Ok(false); - } - } - } - - if let Some(labels) = &sel.match_labels { - for (k, v) in labels { - if obj.labels().get(k) != Some(v) { - return Ok(false); - } - } - } - Ok(true) -} pub fn prefixed_ns(prefix: &str, obj: &impl Resource) -> String { format!("{}-{}", prefix, obj.namespace().unwrap()) @@ -114,7 +77,7 @@ pub fn sanitize_obj(obj: &mut DynamicObject, pod_spec_path: &str, api_version: & for key in &["nodeName", "serviceAccount", "serviceAccountName"] { if let Err(e) = jsonutils::patch_ext::remove(pod_spec_path, key, &mut obj.data) { - debug!("could not patch object {}, skipping: {}", namespaced_name(obj), e); + debug!("could not patch object {}, skipping: {}", obj.namespaced_name(), e); } } @@ -128,6 +91,34 @@ pub fn split_namespaced_name(name: &str) -> (String, String) { } } +impl KubeResourceExt for T { + fn namespaced_name(&self) -> String { + match self.namespace() { + Some(ns) => format!("{}/{}", ns, self.name_any()), + None => self.name_any().clone(), + } + } + + fn matches(&self, sel: &metav1::LabelSelector) -> anyhow::Result { + if let Some(exprs) = &sel.match_expressions { + for expr in exprs { + if !label_expr_match(self.labels(), expr)? { + return Ok(false); + } + } + } + + if let Some(labels) = &sel.match_labels { + for (k, v) in labels { + if self.labels().get(k) != Some(v) { + return Ok(false); + } + } + } + Ok(true) + } +} + // The meanings of these operators is explained here: // https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#set-based-requirement pub(super) const OPERATOR_IN: &str = "In"; diff --git a/lib/rust/lib.rs b/lib/rust/lib.rs index 3bda37eb..2175d2cb 100644 --- a/lib/rust/lib.rs +++ b/lib/rust/lib.rs @@ -4,8 +4,10 @@ mod errors; pub mod jsonutils; pub mod k8s; pub mod logging; +mod macros; pub mod time; pub mod trace; +pub mod util; pub mod watch; use kube::CustomResource; @@ -38,5 +40,9 @@ pub mod prelude { pub use crate::config::*; pub use crate::constants::*; pub use crate::errors::EmptyResult; + pub use crate::k8s::{ + KubeResourceExt, + PodExt, + }; pub use crate::logging; } diff --git a/lib/rust/macros.rs b/lib/rust/macros.rs new file mode 100644 index 00000000..683ce13c --- /dev/null +++ b/lib/rust/macros.rs @@ -0,0 +1,29 @@ +macro_rules! partial_ord_eq_ref { + ($type:ty) => { + impl<'a> PartialEq<&'a $type> for $type { + fn eq(&self, other: &&'a $type) -> bool { + self == *other + } + } + + impl<'a> PartialEq<$type> for &'a $type { + fn eq(&self, other: &$type) -> bool { + *self == other + } + } + + impl<'a> PartialOrd<&'a $type> for $type { + fn partial_cmp(&self, other: &&'a $type) -> Option { + self.partial_cmp(*other) + } + } + + impl<'a> PartialOrd<$type> for &'a $type { + fn partial_cmp(&self, other: &$type) -> Option { + (*self).partial_cmp(other) + } + } + }; +} + +pub(crate) use partial_ord_eq_ref; diff --git a/lib/rust/trace/mod.rs b/lib/rust/trace/mod.rs index 6c257ba8..3845ccdf 100644 --- a/lib/rust/trace/mod.rs +++ b/lib/rust/trace/mod.rs @@ -2,11 +2,15 @@ pub mod storage; mod trace_filter; mod tracer; +use std::collections::HashMap; + pub use self::trace_filter::TraceFilter; pub use self::tracer::{ TraceEvent, Tracer, }; +type OwnedPodMap = HashMap>>; + #[cfg(test)] -mod tracer_test; +mod test; diff --git a/lib/rust/trace/test/mod.rs b/lib/rust/trace/test/mod.rs new file mode 100644 index 00000000..7d00e3bd --- /dev/null +++ b/lib/rust/trace/test/mod.rs @@ -0,0 +1,5 @@ +mod tracer; + +use rstest::*; + +use super::*; diff --git a/lib/rust/trace/tracer_test.rs b/lib/rust/trace/test/tracer.rs similarity index 64% rename from lib/rust/trace/tracer_test.rs rename to lib/rust/trace/test/tracer.rs index 60c03b68..7e507274 100644 --- a/lib/rust/trace/tracer_test.rs +++ b/lib/rust/trace/test/tracer.rs @@ -1,28 +1,16 @@ -use std::collections::{ - HashMap, - VecDeque, -}; - use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; use kube::api::DynamicObject; -use rstest::*; use serde_json::json; use super::*; -use crate::config::TracerConfig; -use crate::k8s::namespaced_name; +use crate::k8s::KubeResourceExt; const TESTING_NAMESPACE: &str = "test"; const EMPTY_SPEC_HASH: u64 = 15130871412783076140; #[fixture] fn tracer() -> Tracer { - return Tracer { - config: TracerConfig { tracked_objects: HashMap::new() }, - events: VecDeque::new(), - tracked_objs: HashMap::new(), - version: 0, - }; + Default::default() } #[fixture] @@ -41,15 +29,15 @@ fn test_obj(#[default(TESTING_NAMESPACE)] namespace: &str, #[default("obj")] nam #[rstest] #[tokio::test] async fn test_create_or_update_obj(mut tracer: Tracer, test_obj: DynamicObject) { - let ns_name = namespaced_name(&test_obj); + let ns_name = test_obj.namespaced_name(); let ts: i64 = 1234; // test idempotency, if we create the same obj twice nothing should change - tracer.create_or_update_obj(&test_obj, ts); - tracer.create_or_update_obj(&test_obj, 2445); + tracer.create_or_update_obj(&test_obj, ts, None); + tracer.create_or_update_obj(&test_obj, 2445, None); - assert_eq!(tracer.tracked_objs.len(), 1); - assert_eq!(tracer.tracked_objs[&ns_name].0, EMPTY_SPEC_HASH); + assert_eq!(tracer.index.len(), 1); + assert_eq!(tracer.index[&ns_name], EMPTY_SPEC_HASH); assert_eq!(tracer.events.len(), 1); assert_eq!(tracer.events[0].applied_objs.len(), 1); assert_eq!(tracer.events[0].deleted_objs.len(), 0); @@ -64,13 +52,13 @@ async fn test_create_or_update_objs(mut tracer: Tracer) { let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); for i in 0..objs.len() { - tracer.create_or_update_obj(&objs[i], ts[i]); + tracer.create_or_update_obj(&objs[i], ts[i], None); } - assert_eq!(tracer.tracked_objs.len(), objs.len()); + assert_eq!(tracer.index.len(), objs.len()); for p in objs.iter() { - let ns_name = namespaced_name(p); - assert_eq!(tracer.tracked_objs[&ns_name].0, EMPTY_SPEC_HASH); + let ns_name = p.namespaced_name(); + assert_eq!(tracer.index[&ns_name], EMPTY_SPEC_HASH); } assert_eq!(tracer.events.len(), 2); @@ -84,16 +72,14 @@ async fn test_create_or_update_objs(mut tracer: Tracer) { #[rstest] #[tokio::test] async fn test_delete_obj(mut tracer: Tracer, test_obj: DynamicObject) { - let ns_name = namespaced_name(&test_obj); + let ns_name = test_obj.namespaced_name(); let ts: i64 = 1234; - tracer.tracked_objs.insert(ns_name.clone(), (EMPTY_SPEC_HASH, 0)); + tracer.index.insert(ns_name.clone(), EMPTY_SPEC_HASH); - // test idempotency, if we delete the same obj twice nothing should change tracer.delete_obj(&test_obj, ts); - tracer.delete_obj(&test_obj, 2445); - assert_eq!(tracer.tracked_objs.len(), 0); + assert_eq!(tracer.index.len(), 0); assert_eq!(tracer.events.len(), 1); assert_eq!(tracer.events[0].applied_objs.len(), 0); assert_eq!(tracer.events[0].deleted_objs.len(), 1); @@ -102,44 +88,43 @@ async fn test_delete_obj(mut tracer: Tracer, test_obj: DynamicObject) { #[rstest] #[tokio::test] -async fn test_recreate_tracked_objs_all_new(mut tracer: Tracer) { +async fn test_recreate_index_all_new(mut tracer: Tracer) { let obj_names = vec!["obj1", "obj2", "obj3"]; let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); let ts: i64 = 1234; - // Calling it twice shouldn't change the tracked objs, but should increase the version twice + // Calling it twice shouldn't change the tracked objs tracer.update_all_objs(&objs, ts); tracer.update_all_objs(&objs, 2445); - assert_eq!(tracer.tracked_objs.len(), objs.len()); + assert_eq!(tracer.index.len(), objs.len()); for p in objs.iter() { - let ns_name = namespaced_name(p); - assert_eq!(tracer.tracked_objs[&ns_name].0, EMPTY_SPEC_HASH); + let ns_name = p.namespaced_name(); + assert_eq!(tracer.index[&ns_name], EMPTY_SPEC_HASH); } assert_eq!(tracer.events.len(), 1); assert_eq!(tracer.events[0].applied_objs.len(), 3); assert_eq!(tracer.events[0].deleted_objs.len(), 0); assert_eq!(tracer.events[0].ts, ts); - assert_eq!(tracer.version, 2); } #[rstest] #[tokio::test] -async fn test_recreate_tracked_objs_with_created_obj(mut tracer: Tracer) { +async fn test_recreate_index_with_created_obj(mut tracer: Tracer) { let obj_names = vec!["obj1", "obj2", "obj3", "obj4"]; let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); let ts = vec![1234, 2445]; - // Calling it twice shouldn't change the tracked objs, but should increase the version twice + // Calling it twice shouldn't change the tracked objs let mut fewer_objs = objs.clone(); fewer_objs.pop(); tracer.update_all_objs(&fewer_objs, ts[0]); tracer.update_all_objs(&objs, ts[1]); - assert_eq!(tracer.tracked_objs.len(), objs.len()); + assert_eq!(tracer.index.len(), objs.len()); for p in fewer_objs.iter() { - let ns_name = namespaced_name(p); - assert_eq!(tracer.tracked_objs[&ns_name].0, EMPTY_SPEC_HASH); + let ns_name = p.namespaced_name(); + assert_eq!(tracer.index[&ns_name], EMPTY_SPEC_HASH); } assert_eq!(tracer.events.len(), 2); assert_eq!(tracer.events[0].applied_objs.len(), 3); @@ -148,26 +133,25 @@ async fn test_recreate_tracked_objs_with_created_obj(mut tracer: Tracer) { assert_eq!(tracer.events[1].applied_objs.len(), 1); assert_eq!(tracer.events[1].deleted_objs.len(), 0); assert_eq!(tracer.events[1].ts, ts[1]); - assert_eq!(tracer.version, 2); } #[rstest] #[tokio::test] -async fn test_recreate_tracked_objs_with_deleted_obj(mut tracer: Tracer) { +async fn test_recreate_index_with_deleted_obj(mut tracer: Tracer) { let obj_names = vec!["obj1", "obj2", "obj3"]; let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); let ts = vec![1234, 2445]; - // Calling it twice shouldn't change the tracked objs, but should increase the version twice + // Calling it twice shouldn't change the tracked objs tracer.update_all_objs(&objs, ts[0]); let mut fewer_objs = objs.clone(); fewer_objs.pop(); tracer.update_all_objs(&fewer_objs, ts[1]); - assert_eq!(tracer.tracked_objs.len(), fewer_objs.len()); + assert_eq!(tracer.index.len(), fewer_objs.len()); for p in fewer_objs.iter() { - let ns_name = namespaced_name(p); - assert_eq!(tracer.tracked_objs[&ns_name].0, EMPTY_SPEC_HASH); + let ns_name = p.namespaced_name(); + assert_eq!(tracer.index[&ns_name], EMPTY_SPEC_HASH); } assert_eq!(tracer.events.len(), 2); assert_eq!(tracer.events[0].applied_objs.len(), 3); @@ -176,5 +160,4 @@ async fn test_recreate_tracked_objs_with_deleted_obj(mut tracer: Tracer) { assert_eq!(tracer.events[1].applied_objs.len(), 0); assert_eq!(tracer.events[1].deleted_objs.len(), 1); assert_eq!(tracer.events[1].ts, ts[1]); - assert_eq!(tracer.version, 2); } diff --git a/lib/rust/trace/trace_filter.rs b/lib/rust/trace/trace_filter.rs index 558122c3..72bc888f 100644 --- a/lib/rust/trace/trace_filter.rs +++ b/lib/rust/trace/trace_filter.rs @@ -3,7 +3,7 @@ use kube::api::DynamicObject; use serde::Deserialize; use super::TraceEvent; -use crate::k8s::obj_matches_selector; +use crate::k8s::KubeResourceExt; #[derive(Deserialize, Debug, Clone)] pub struct TraceFilter { @@ -56,5 +56,5 @@ fn obj_matches_filter(obj: &DynamicObject, f: &TraceFilter) -> bool { .owner_references .as_ref() .is_some_and(|owners| owners.iter().any(|owner| &owner.kind == "DaemonSet")) - || f.excluded_labels.iter().any(|sel| obj_matches_selector(obj, sel).unwrap()) + || f.excluded_labels.iter().any(|sel| obj.matches(sel).unwrap()) } diff --git a/lib/rust/trace/tracer.rs b/lib/rust/trace/tracer.rs index 582ab5fc..154d1073 100644 --- a/lib/rust/trace/tracer.rs +++ b/lib/rust/trace/tracer.rs @@ -3,12 +3,13 @@ use std::collections::{ HashSet, VecDeque, }; +use std::mem::take; use std::sync::{ Arc, Mutex, }; -use k8s_openapi::api::core::v1 as corev1; +use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; use kube::api::DynamicObject; use serde::{ Deserialize, @@ -16,19 +17,16 @@ use serde::{ }; use tracing::*; -use super::trace_filter::{ - filter_event, - TraceFilter, -}; +use super::trace_filter::filter_event; +use super::*; use crate::config::TracerConfig; use crate::jsonutils; use crate::k8s::{ make_deletable, - namespaced_name, + KubeResourceExt, + PodLifecycleData, }; -type ObjectTracker = HashMap; - #[derive(Debug)] enum TraceAction { ObjectApplied, @@ -42,34 +40,25 @@ pub struct TraceEvent { pub deleted_objs: Vec, } +#[derive(Default)] pub struct Tracer { pub(super) config: TracerConfig, pub(super) events: VecDeque, - pub(super) tracked_objs: ObjectTracker, - pub(super) version: u64, + pub(super) _pod_owners: OwnedPodMap, + pub(super) index: HashMap, } impl Tracer { pub fn new(config: TracerConfig) -> Arc> { - Arc::new(Mutex::new(Tracer { - config, - events: VecDeque::new(), - tracked_objs: HashMap::new(), - version: 0, - })) + Arc::new(Mutex::new(Tracer { config, ..Default::default() })) } pub fn import(data: Vec) -> anyhow::Result { let (config, events): (TracerConfig, VecDeque) = rmp_serde::from_slice(&data)?; - let mut tracer = Tracer { - config, - events, - tracked_objs: HashMap::new(), - version: 0, - }; - let (_, tracked_objs) = tracer.collect_events(0, i64::MAX, &TraceFilter::blank()); - tracer.tracked_objs = tracked_objs; + let mut tracer = Tracer { config, events, ..Default::default() }; + let (_, index) = tracer.collect_events(0, i64::MAX, &TraceFilter::blank()); + tracer.index = index; Ok(tracer) } @@ -88,12 +77,12 @@ impl Tracer { } pub fn objs(&self) -> HashSet { - self.tracked_objs.keys().cloned().collect() + self.index.keys().cloned().collect() } pub fn objs_at(&self, end_ts: i64, filter: &TraceFilter) -> HashSet { - let (_, tracked_objs) = self.collect_events(0, end_ts, filter); - tracked_objs.keys().cloned().collect() + let (_, index) = self.collect_events(0, end_ts, filter); + index.keys().cloned().collect() } pub fn start_ts(&self) -> Option { @@ -103,52 +92,46 @@ impl Tracer { } } - pub(crate) fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64) { - let ns_name = namespaced_name(obj); + pub(crate) fn create_or_update_obj(&mut self, obj: &DynamicObject, ts: i64, maybe_old_hash: Option) { + let ns_name = obj.namespaced_name(); let new_hash = jsonutils::hash(obj.data.get("spec")); + let old_hash = if maybe_old_hash.is_some() { maybe_old_hash } else { self.index.get(&ns_name).cloned() }; - if !self.tracked_objs.contains_key(&ns_name) || self.tracked_objs[&ns_name].0 != new_hash { + if Some(new_hash) != old_hash { self.append_event(ts, obj, TraceAction::ObjectApplied); } - self.tracked_objs.insert(ns_name, (new_hash, self.version)); + self.index.insert(ns_name, new_hash); } pub(crate) fn delete_obj(&mut self, obj: &DynamicObject, ts: i64) { - let ns_name = namespaced_name(obj); - if self.tracked_objs.contains_key(&ns_name) { - self.append_event(ts, obj, TraceAction::ObjectDeleted); - } - self.tracked_objs.remove(&ns_name); + let ns_name = obj.namespaced_name(); + self.append_event(ts, obj, TraceAction::ObjectDeleted); + self.index.remove(&ns_name); } pub(crate) fn update_all_objs(&mut self, objs: &Vec, ts: i64) { - self.version += 1; + let mut old_index = take(&mut self.index); for obj in objs { - self.create_or_update_obj(obj, ts); + let ns_name = obj.namespaced_name(); + let old_hash = old_index.remove(&ns_name); + self.create_or_update_obj(obj, ts, old_hash); } - let to_delete: Vec<_> = self - .tracked_objs - .iter() - .filter_map(|(k, (_, v))| match v { - v if *v < self.version => Some(make_deletable(k)), - _ => None, - }) - .collect(); - - for obj in to_delete { - self.delete_obj(&obj, ts) + for ns_name in old_index.keys() { + self.delete_obj(&make_deletable(ns_name), ts); } } - pub(crate) fn record_pod_lifecycle(&mut self, _pod: &corev1::Pod, _ts: i64) {} - - pub(crate) fn record_pod_deleted(&mut self, _pod: &corev1::Pod, _ts: i64) {} - - pub(crate) fn update_pod_lifecycles(&mut self, _pods: &[corev1::Pod], _ts: i64) {} + pub(crate) fn record_pod_lifecycle( + &mut self, + _ns_name: &str, + _owners: Vec, + _lifecycle_data: &PodLifecycleData, + ) { + } fn append_event(&mut self, ts: i64, obj: &DynamicObject, action: TraceAction) { - info!("{} - {:?} @ {}", namespaced_name(obj), action, ts); + info!("{} - {:?} @ {}", obj.namespaced_name(), action, ts); let obj = obj.clone(); match self.events.back_mut() { @@ -166,10 +149,15 @@ impl Tracer { } } - fn collect_events(&self, start_ts: i64, end_ts: i64, filter: &TraceFilter) -> (Vec, ObjectTracker) { + fn collect_events( + &self, + start_ts: i64, + end_ts: i64, + filter: &TraceFilter, + ) -> (Vec, HashMap) { let mut events = vec![TraceEvent { ts: start_ts, ..Default::default() }]; - let mut flattened_obj_objects = HashMap::new(); - let mut tracked_objs = HashMap::new(); + let mut flattened_objects = HashMap::new(); + let mut index = HashMap::new(); for (evt, _) in self.iter() { // trace should be end-exclusive, so we use >= here: anything that is at the // end_ts or greater gets discarded. The event list is stored in @@ -180,29 +168,30 @@ impl Tracer { if let Some(new_evt) = filter_event(&evt, filter) { for obj in &new_evt.applied_objs { - let ns_name = namespaced_name(obj); + let ns_name = obj.namespaced_name(); if new_evt.ts < start_ts { - flattened_obj_objects.insert(ns_name.clone(), obj.clone()); + flattened_objects.insert(ns_name.clone(), obj.clone()); } let hash = jsonutils::hash(obj.data.get("spec")); - tracked_objs.insert(ns_name, (hash, self.version)); + index.insert(ns_name, hash); } for obj in &evt.deleted_objs { - let ns_name = namespaced_name(obj); + let ns_name = obj.namespaced_name(); if new_evt.ts < start_ts { - flattened_obj_objects.remove(&ns_name); + flattened_objects.remove(&ns_name); } - tracked_objs.remove(&ns_name); + index.remove(&ns_name); } + if new_evt.ts >= start_ts { events.push(new_evt.clone()); } } } - events[0].applied_objs = flattened_obj_objects.values().cloned().collect(); - (events, tracked_objs) + events[0].applied_objs = flattened_objects.values().cloned().collect(); + (events, index) } } diff --git a/lib/rust/util.rs b/lib/rust/util.rs new file mode 100644 index 00000000..e765aa57 --- /dev/null +++ b/lib/rust/util.rs @@ -0,0 +1,30 @@ +use std::cmp::{ + min, + Ord, +}; + +pub fn min_some(o1: Option, o2: Option) -> Option { + if o1.is_none() { + o2 + } else if o2.is_none() { + o1 + } else { + min(o1, o2) + } +} + +#[cfg(test)] +mod test { + use rstest::*; + + use super::*; + + #[rstest] + #[case::both_none(None, None, None)] + #[case::left_some(Some(1), None, Some(1))] + #[case::right_some(None, Some(1), Some(1))] + #[case::both_some(Some(2), Some(1), Some(1))] + fn test_min_some(#[case] o1: Option, #[case] o2: Option, #[case] expected: Option) { + assert_eq!(min_some(o1, o2), expected); + } +} diff --git a/lib/rust/watch/dyn_obj_watcher.rs b/lib/rust/watch/dyn_obj_watcher.rs index 71448e6e..2b0e5445 100644 --- a/lib/rust/watch/dyn_obj_watcher.rs +++ b/lib/rust/watch/dyn_obj_watcher.rs @@ -78,7 +78,7 @@ impl DynObjWatcher { fn handle_obj_event(&self, evt: Event, ts: i64) { let mut tracer = self.tracer.lock().unwrap(); match evt { - Event::Applied(obj) => tracer.create_or_update_obj(&obj, ts), + Event::Applied(obj) => tracer.create_or_update_obj(&obj, ts, None), Event::Deleted(obj) => tracer.delete_obj(&obj, ts), Event::Restarted(objs) => tracer.update_all_objs(&objs, ts), }; diff --git a/lib/rust/watch/pod_watcher.rs b/lib/rust/watch/pod_watcher.rs index 70b824ea..263e4106 100644 --- a/lib/rust/watch/pod_watcher.rs +++ b/lib/rust/watch/pod_watcher.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; +use std::mem::take; use std::sync::{ Arc, Mutex, @@ -18,71 +20,165 @@ use kube::runtime::watcher::{ watcher, Event, }; -use kube::runtime::WatchStreamExt; use kube::{ Resource, ResourceExt, }; -use tokio::runtime::Handle; -use tokio::task::block_in_place; use tracing::*; use super::PodStream; use crate::errors::*; use crate::k8s::{ list_params_for, - namespaced_name, ApiSet, + KubeResourceExt, + PodLifecycleData, GVK, }; -use crate::time::{ - Clockable, - UtcClock, -}; use crate::trace::Tracer; type OwnerCache = SizedCache>; +const CACHE_SIZE: usize = 10000; pub struct PodWatcher { - clock: Box, + apiset: ApiSet, pod_stream: PodStream, tracer: Arc>, + owned_pods: HashMap, + owners_cache: SizedCache>, } impl PodWatcher { pub fn new(tracer: Arc>, apiset: ApiSet) -> PodWatcher { - let pod_stream = build_stream_for_pods(apiset); - PodWatcher { clock: Box::new(UtcClock), pod_stream, tracer } + let pod_api: kube::Api = kube::Api::all(apiset.client().clone()); + let pod_stream = watcher(pod_api, Default::default()).map_err(|e| e.into()).boxed(); + PodWatcher { + apiset, + pod_stream, + tracer, + owned_pods: HashMap::new(), + owners_cache: SizedCache::with_size(CACHE_SIZE), + } } pub async fn start(mut self) { while let Some(res) = self.pod_stream.next().await { - let ts = self.clock.now(); - match res { - Ok(evt) => self.handle_pod_event(evt, ts), + Ok(mut evt) => { + let _ = self.handle_pod_event(&mut evt).await; + }, Err(e) => error!("pod watcher received error on stream: {}", e), } } } - fn handle_pod_event(&self, evt: Event, ts: i64) { - let mut tracer = self.tracer.lock().unwrap(); + async fn handle_pod_event(&mut self, evt: &mut Event) -> EmptyResult { match evt { - Event::Applied(pod) => tracer.record_pod_lifecycle(&pod, ts), - Event::Deleted(pod) => tracer.record_pod_deleted(&pod, ts), - Event::Restarted(pods) => tracer.update_pod_lifecycles(&pods, ts), + Event::Applied(pod) => { + let ns_name = pod.namespaced_name(); + let current_lifecycle_data = self.owned_pods.get(&ns_name).cloned(); + self.handle_pod_applied(&ns_name, pod, current_lifecycle_data).await?; + }, + Event::Deleted(pod) => { + let ns_name = pod.namespaced_name(); + let current_lifecycle_data = match self.owned_pods.get(&ns_name) { + None => { + warn!("pod {} deleted but not tracked, may have already been processed", ns_name); + return Ok(()); + }, + Some(data) => data.clone(), + }; + self.handle_pod_deleted(&ns_name, Some(pod), current_lifecycle_data).await?; + }, + Event::Restarted(pods) => { + let mut old_owned_pods = take(&mut self.owned_pods); + for pod in pods { + let ns_name = &pod.namespaced_name(); + let old_entry = old_owned_pods.remove(ns_name); + self.handle_pod_applied(ns_name, pod, old_entry).await?; + } + + for (ns_name, current_lifecycle_data) in &old_owned_pods { + self.handle_pod_deleted(ns_name, None, current_lifecycle_data.clone()).await?; + } + }, }; + + Ok(()) + } + + async fn handle_pod_applied( + &mut self, + ns_name: &str, + pod: &corev1::Pod, + current_lifecycle_data: Option, + ) -> EmptyResult { + let lifecycle_data = PodLifecycleData::new_for(pod)?; + + if lifecycle_data > current_lifecycle_data { + self.owned_pods.insert(ns_name.into(), lifecycle_data.clone()); + self.store_pod_lifecycle_data(ns_name, Some(pod), &lifecycle_data).await?; + } else if lifecycle_data != current_lifecycle_data { + warn!( + "new lifecycle data for {} does not match stored data, cowardly refusing to update: {:?} !>= {:?}", + ns_name, lifecycle_data, current_lifecycle_data + ); + } + + Ok(()) + } + + async fn handle_pod_deleted( + &mut self, + ns_name: &str, + maybe_pod: Option<&corev1::Pod>, + current_lifecycle_data: PodLifecycleData, + ) -> EmptyResult { + let new_lifecycle_data = PodLifecycleData::guess_finished(maybe_pod, ¤t_lifecycle_data); + + if current_lifecycle_data.finished() { + // do nothing + if new_lifecycle_data != current_lifecycle_data { + warn!( + "new lifecycle data for {} does not match stored data, cowardly refusing to update: {:?} !>= {:?}", + ns_name, current_lifecycle_data, new_lifecycle_data, + ); + } + } else if new_lifecycle_data.finished() && new_lifecycle_data > current_lifecycle_data { + self.store_pod_lifecycle_data(ns_name, maybe_pod, &new_lifecycle_data).await?; + } else { + bail!("could not determine lifecycle data for pod {}", ns_name); + } + + Ok(()) + } + + async fn store_pod_lifecycle_data( + &mut self, + ns_name: &str, + maybe_pod: Option<&corev1::Pod>, + lifecycle_data: &PodLifecycleData, + ) -> EmptyResult { + let owners = match (self.owners_cache.cache_get(ns_name), maybe_pod) { + (Some(o), _) => o.clone(), + (None, Some(pod)) => compute_owner_chain(&mut self.apiset, pod, &mut self.owners_cache).await?, + _ => bail!("could not store lifecycle data for {}", ns_name), + }; + + let mut tracer = self.tracer.lock().unwrap(); + tracer.record_pod_lifecycle(ns_name, owners, lifecycle_data); + + Ok(()) } } -#[async_recursion(?Send)] +#[async_recursion] async fn compute_owner_chain( apiset: &mut ApiSet, - obj: &impl Resource, + obj: &(impl Resource + Sync), cache: &mut OwnerCache, ) -> anyhow::Result> { - let ns_name = namespaced_name(obj); + let ns_name = obj.namespaced_name(); if let Some(owners) = cache.cache_get(&ns_name) { info!("found owners for {} in cache", ns_name); @@ -97,7 +193,7 @@ async fn compute_owner_chain( let api = apiset.api_for(gvk).await?; let resp = api.list(&list_params_for(&obj.namespace().unwrap(), &rf.name)).await?; if resp.items.len() != 1 { - bail!("could not find single owner for {}, found {:?}", namespaced_name(obj), resp.items); + bail!("could not find single owner for {}, found {:?}", obj.namespaced_name(), resp.items); } let owner = &resp.items[0]; @@ -107,19 +203,3 @@ async fn compute_owner_chain( cache.cache_set(ns_name, owners.clone()); Ok(owners) } - -fn build_stream_for_pods(mut apiset: ApiSet) -> PodStream { - let pod_api: kube::Api = kube::Api::all(apiset.client().clone()); - let mut cache: SizedCache> = SizedCache::with_size(100); - watcher(pod_api, Default::default()) - .modify(move |pod| { - block_in_place(|| { - Handle::current().block_on(async { - let owners = compute_owner_chain(&mut apiset, pod, &mut cache).await; - pod.metadata.owner_references = owners.ok(); - }) - }); - }) - .map_err(|e| e.into()) - .boxed() -} diff --git a/tests/rust/test_tracer.rs b/tests/rust/test_tracer.rs index bda8ef2a..cc4ab2f5 100644 --- a/tests/rust/test_tracer.rs +++ b/tests/rust/test_tracer.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeMap; use std::sync::atomic::{ AtomicI64, Ordering, @@ -12,6 +11,7 @@ use kube::api::DynamicObject; use kube::runtime::watcher::Event; use kube::ResourceExt; use serde_json::json; +use simkube::k8s::macros::*; use simkube::time::Clockable; use simkube::trace::{ TraceFilter, @@ -163,7 +163,7 @@ async fn test_export() { let filter = TraceFilter { excluded_namespaces: vec!["kube-system".into()], excluded_labels: vec![metav1::LabelSelector { - match_labels: Some(BTreeMap::from([("foo".into(), "bar".into())])), + match_labels: klabel!("foo" = "bar"), ..Default::default() }], exclude_daemonsets: true,