From 8545a297d449f5f584d9b8d4b99f1b609d2a8da0 Mon Sep 17 00:00:00 2001 From: David Morrison Date: Thu, 28 Sep 2023 14:21:30 -0700 Subject: [PATCH] s/tracer/store/g --- ctrl/controller.rs | 2 +- driver/main.rs | 10 +- lib/rust/k8s/gvk.rs | 2 +- lib/rust/k8s/mod.rs | 2 +- lib/rust/k8s/test/mod.rs | 19 -- lib/rust/k8s/test/pod_lifecycle_test.rs | 99 ---------- lib/rust/k8s/test/util_test.rs | 169 ------------------ lib/rust/lib.rs | 2 +- lib/rust/{trace => store}/mod.rs | 8 +- lib/rust/{trace => store}/storage.rs | 0 lib/rust/{trace/test => store/tests}/mod.rs | 2 +- .../tracer.rs => store/tests/trace_store.rs} | 14 +- lib/rust/{trace => store}/trace_filter.rs | 0 .../{trace/tracer.rs => store/trace_store.rs} | 14 +- lib/rust/trace/tests/mod.rs | 5 - lib/rust/trace/tests/tracer.rs | 163 ----------------- lib/rust/watch/dyn_obj_watcher.rs | 20 +-- lib/rust/watch/pod_watcher.rs | 12 +- tests/rust/test_tracer.rs | 20 +-- tracer/main.rs | 16 +- 20 files changed, 62 insertions(+), 517 deletions(-) delete mode 100644 lib/rust/k8s/test/mod.rs delete mode 100644 lib/rust/k8s/test/pod_lifecycle_test.rs delete mode 100644 lib/rust/k8s/test/util_test.rs rename lib/rust/{trace => store}/mod.rs (74%) rename lib/rust/{trace => store}/storage.rs (100%) rename lib/rust/{trace/test => store/tests}/mod.rs (64%) rename lib/rust/{trace/test/tracer.rs => store/tests/trace_store.rs} (91%) rename lib/rust/{trace => store}/trace_filter.rs (100%) rename lib/rust/{trace/tracer.rs => store/trace_store.rs} (95%) delete mode 100644 lib/rust/trace/tests/mod.rs delete mode 100644 lib/rust/trace/tests/tracer.rs diff --git a/ctrl/controller.rs b/ctrl/controller.rs index 0553c35b..24c7d7b4 100644 --- a/ctrl/controller.rs +++ b/ctrl/controller.rs @@ -9,7 +9,7 @@ use kube::ResourceExt; use reqwest::Url; use simkube::k8s::add_common_fields; use simkube::prelude::*; -use simkube::trace::storage; +use simkube::store::storage; use tokio::time::Duration; use tracing::*; diff --git a/driver/main.rs b/driver/main.rs index bf0e480b..c152b484 100644 --- a/driver/main.rs +++ b/driver/main.rs @@ -22,7 +22,7 @@ use simkube::k8s::{ GVK, }; use simkube::prelude::*; -use simkube::trace::Tracer; +use simkube::store::TraceStore; use tokio::time::sleep; use tracing::*; @@ -91,7 +91,7 @@ async fn run(args: &Options) -> EmptyResult { info!("Simulation driver starting"); let trace_data = fs::read(&args.trace_path)?; - let tracer = Tracer::import(trace_data)?; + let trace_store = TraceStore::import(trace_data)?; let client = kube::Client::try_default().await?; let mut apiset = ApiSet::new(&client); @@ -100,12 +100,12 @@ async fn run(args: &Options) -> EmptyResult { let root = roots_api.get(&args.sim_root).await?; - let mut sim_ts = tracer.start_ts().ok_or(anyhow!("no trace data"))?; - for (evt, next_ts) in tracer.iter() { + let mut sim_ts = trace_store.start_ts().ok_or(anyhow!("no trace data"))?; + for (evt, next_ts) in trace_store.iter() { for obj in evt.applied_objs { let gvk = GVK::from_dynamic_obj(&obj)?; let vns_name = prefixed_ns(&args.sim_namespace_prefix, &obj); - let vobj = build_virtual_obj(&obj, &vns_name, &args.sim_name, &root, tracer.config())?; + let vobj = build_virtual_obj(&obj, &vns_name, &args.sim_name, &root, trace_store.config())?; let vns = build_virtual_ns(&args.sim_name, &vns_name, &root)?; ns_api.create(&Default::default(), &vns).await?; diff --git a/lib/rust/k8s/gvk.rs b/lib/rust/k8s/gvk.rs index b957c25e..4631bdb6 100644 --- a/lib/rust/k8s/gvk.rs +++ b/lib/rust/k8s/gvk.rs @@ -1,7 +1,6 @@ use std::fmt; use std::ops::Deref; -use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; use kube::api::{ DynamicObject, GroupVersionKind, @@ -14,6 +13,7 @@ use serde::{ Serializer, }; +use super::*; use crate::errors::*; #[derive(Clone, Debug, Hash, Eq, PartialEq)] diff --git a/lib/rust/k8s/mod.rs b/lib/rust/k8s/mod.rs index 28e9f979..df9c20b5 100644 --- a/lib/rust/k8s/mod.rs +++ b/lib/rust/k8s/mod.rs @@ -55,4 +55,4 @@ trait StartEndTimeable { } #[cfg(test)] -mod test; +mod tests; diff --git a/lib/rust/k8s/test/mod.rs b/lib/rust/k8s/test/mod.rs deleted file mode 100644 index ae532418..00000000 --- a/lib/rust/k8s/test/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -mod pod_lifecycle_test; -mod util_test; - -use rstest::*; - -use super::macros::*; -use super::*; - -#[fixture] -fn pod() -> corev1::Pod { - corev1::Pod { - metadata: metav1::ObjectMeta { - labels: klabel!("foo" = "bar"), - ..Default::default() - }, - spec: Some(corev1::PodSpec { ..Default::default() }), - status: Some(corev1::PodStatus { ..Default::default() }), - } -} diff --git a/lib/rust/k8s/test/pod_lifecycle_test.rs b/lib/rust/k8s/test/pod_lifecycle_test.rs deleted file mode 100644 index 4ae3ff9a..00000000 --- a/lib/rust/k8s/test/pod_lifecycle_test.rs +++ /dev/null @@ -1,99 +0,0 @@ -use chrono::{ - DateTime, - Duration, - Utc, -}; - -use super::{ - pod, - *, -}; - -#[rstest] -fn test_pod_lifecycle_data_for_empty(pod: corev1::Pod) { - let res = PodLifecycleData::new_for(&pod).unwrap(); - assert_eq!(res, PodLifecycleData::Empty); -} - -#[rstest] -#[case::with_init_container(true)] -#[case::without_init_container(false)] -fn test_pod_lifecycle_data_for_start_time_only(mut pod: corev1::Pod, #[case] init_container: bool) { - let t1 = Utc::now(); - - add_container_status_running(&mut pod.status_mut().container_statuses, &(t1 + Duration::seconds(10))); - if init_container { - add_container_status_running(&mut pod.status_mut().init_container_statuses, &t1); - add_container_status_running(&mut pod.status_mut().container_statuses, &(t1 + Duration::seconds(5))); - } else { - add_container_status_running(&mut pod.status_mut().init_container_statuses, &t1); - } - - let res = PodLifecycleData::new_for(&pod).unwrap(); - assert_eq!(res, PodLifecycleData::Running(t1.timestamp())); -} - -#[rstest] -fn test_pod_lifecycle_data_for_with_some_end_times(mut pod: corev1::Pod) { - let t1 = Utc::now(); - let tmid = t1 + Duration::seconds(5); - let t2 = t1 + Duration::seconds(10); - - add_container_status_finished(&mut pod.status_mut().init_container_statuses, &t1, &tmid); - add_container_status_running(&mut pod.status_mut().container_statuses, &tmid); - add_container_status_finished(&mut pod.status_mut().container_statuses, &tmid, &t2); - - let res = PodLifecycleData::new_for(&pod).unwrap(); - assert_eq!(res, PodLifecycleData::Running(t1.timestamp())); -} - -#[rstest] -fn test_pod_lifecycle_data_for_with_end_times(mut pod: corev1::Pod) { - let t1 = Utc::now(); - let tmid = t1 + Duration::seconds(5); - let t2 = t1 + Duration::seconds(10); - - pod.spec_mut().containers.extend(vec![Default::default(), Default::default()]); - add_container_status_finished(&mut pod.status_mut().init_container_statuses, &t1, &tmid); - add_container_status_finished(&mut pod.status_mut().container_statuses, &tmid, &(tmid + Duration::seconds(1))); - add_container_status_finished(&mut pod.status_mut().container_statuses, &tmid, &t2); - - let res = PodLifecycleData::new_for(&pod).unwrap(); - assert_eq!(res, PodLifecycleData::Finished(t1.timestamp(), t2.timestamp())); -} - -fn add_container_status_running(container_statuses: &mut Option>, t: &DateTime) { - if container_statuses.is_none() { - *container_statuses = Some(vec![]) - } - - container_statuses.as_mut().unwrap().push(corev1::ContainerStatus { - state: Some(corev1::ContainerState { - running: Some(corev1::ContainerStateRunning { started_at: Some(metav1::Time(t.clone())) }), - ..Default::default() - }), - ..Default::default() - }) -} - -fn add_container_status_finished( - container_statuses: &mut Option>, - t1: &DateTime, - t2: &DateTime, -) { - if container_statuses.is_none() { - *container_statuses = Some(vec![]) - } - - container_statuses.as_mut().unwrap().push(corev1::ContainerStatus { - state: Some(corev1::ContainerState { - terminated: Some(corev1::ContainerStateTerminated { - started_at: Some(metav1::Time(t1.clone())), - finished_at: Some(metav1::Time(t2.clone())), - ..Default::default() - }), - ..Default::default() - }), - ..Default::default() - }) -} diff --git a/lib/rust/k8s/test/util_test.rs b/lib/rust/k8s/test/util_test.rs deleted file mode 100644 index 43268894..00000000 --- a/lib/rust/k8s/test/util_test.rs +++ /dev/null @@ -1,169 +0,0 @@ -use chrono::Utc; -use kube::api::DynamicObject; -use serde_json::json; - -use super::{ - pod, - *, -}; - -#[rstest] -fn test_sanitize_obj() { - let mut obj = DynamicObject { - metadata: metav1::ObjectMeta { - name: Some("test".into()), - namespace: Some("test".into()), - - annotations: klabel!( - "some_random_annotation" = "blah", - LAST_APPLIED_CONFIG_LABEL_KEY = "foo", - DEPL_REVISION_LABEL_KEY = "42.5", - ), - - creation_timestamp: Some(metav1::Time(Utc::now())), - deletion_timestamp: Some(metav1::Time(Utc::now())), - deletion_grace_period_seconds: Some(123), - generation: Some(456), - managed_fields: Some(vec![Default::default()]), - owner_references: Some(vec![Default::default()]), - resource_version: Some("1234".into()), - uid: Some("abcd".into()), - - ..Default::default() - }, - types: None, - data: json!({ - "foo": { - "bars": [{ - "spec": { - "nodeName": "foo", - "serviceAccountName": "bar", - "nodeSelector": {"buz": "biz"}, - }, - }, - { - "spec": {}, - }, - { - "spec": { - "serviceAccount": "flumm", - }, - }, - ], - }, - }), - }; - - sanitize_obj(&mut obj, "/foo/bars/*/spec", "bar.blah.sh/v2", "Stuff"); - - assert_eq!(obj.metadata.creation_timestamp, None); - assert_eq!(obj.metadata.deletion_timestamp, None); - assert_eq!(obj.metadata.deletion_grace_period_seconds, None); - assert_eq!(obj.metadata.generation, None); - assert_eq!(obj.metadata.managed_fields, None); - assert_eq!(obj.metadata.owner_references, None); - assert_eq!(obj.metadata.resource_version, None); - assert_eq!(obj.metadata.uid, None); - - assert_eq!(obj.metadata.annotations, klabel!("some_random_annotation" = "blah")); - assert!(obj - .types - .is_some_and(|tm| tm.api_version == "bar.blah.sh/v2" && tm.kind == "Stuff")); - - assert_eq!( - obj.data, - json!({ - "foo": { - "bars": [ - { - "spec": { - "nodeSelector": {"buz": "biz"}, - }, - }, - { "spec": {} }, - { "spec": {} }, - ], - }, - }) - ); -} - -fn make_label_sel(key: &str, op: &str, value: Option<&str>) -> metav1::LabelSelector { - metav1::LabelSelector { - match_expressions: Some(vec![metav1::LabelSelectorRequirement { - key: key.into(), - operator: op.into(), - values: match value { - Some(s) => Some(vec![s.into()]), - _ => None, - }, - }]), - ..Default::default() - } -} - -#[rstest] -#[case::op_in(OPERATOR_IN)] -#[case::op_not_in(OPERATOR_NOT_IN)] -fn test_label_expr_match(pod: corev1::Pod, #[case] op: &str) { - let sel = make_label_sel("foo", op, Some("bar")); - let res = pod.matches(&sel).unwrap(); - assert_eq!(res, op == OPERATOR_IN); -} - -#[rstest] -#[case::op_in(OPERATOR_IN)] -#[case::op_not_in(OPERATOR_NOT_IN)] -fn test_label_expr_no_values(pod: corev1::Pod, #[case] op: &str) { - let sel = make_label_sel("foo", op, None); - let res = pod.matches(&sel).unwrap_err().downcast().unwrap(); - assert!(matches!(res, KubernetesError::MalformedLabelSelector(_))); -} - -#[rstest] -#[case::op_in(OPERATOR_IN)] -#[case::op_not_in(OPERATOR_NOT_IN)] -fn test_label_expr_no_match(pod: corev1::Pod, #[case] op: &str) { - let sel = make_label_sel("baz", op, Some("qux")); - let res = pod.matches(&sel).unwrap(); - assert_eq!(res, op == OPERATOR_NOT_IN); -} - -#[rstest] -#[case::op_exists(OPERATOR_EXISTS)] -#[case::op_exists(OPERATOR_DOES_NOT_EXIST)] -fn test_label_expr_exists(pod: corev1::Pod, #[case] op: &str) { - let sel = make_label_sel("foo", op, None); - let res = pod.matches(&sel).unwrap(); - assert_eq!(res, op == OPERATOR_EXISTS); -} - -#[rstest] -#[case::op_exists(OPERATOR_EXISTS)] -#[case::op_not_exists(OPERATOR_DOES_NOT_EXIST)] -fn test_label_expr_exists_values(pod: corev1::Pod, #[case] op: &str) { - let sel = make_label_sel("foo", op, Some("bar")); - let res = pod.matches(&sel).unwrap_err().downcast().unwrap(); - assert!(matches!(res, KubernetesError::MalformedLabelSelector(_))); -} - -#[rstest] -#[case::op_in(OPERATOR_EXISTS)] -#[case::op_not_in(OPERATOR_DOES_NOT_EXIST)] -fn test_label_expr_not_exists(pod: corev1::Pod, #[case] op: &str) { - let sel = make_label_sel("baz", op, None); - let res = pod.matches(&sel).unwrap(); - assert_eq!(res, op == OPERATOR_DOES_NOT_EXIST); -} - -#[rstest] -#[case::label_match("foo".into())] -#[case::label_no_match("baz".into())] -fn test_label_match(pod: corev1::Pod, #[case] label_key: String) { - let sel = metav1::LabelSelector { - match_labels: klabel!(label_key = "bar"), - ..Default::default() - }; - let res = pod.matches(&sel).unwrap(); - assert_eq!(res, &label_key == "foo"); -} diff --git a/lib/rust/lib.rs b/lib/rust/lib.rs index 2175d2cb..a99e3695 100644 --- a/lib/rust/lib.rs +++ b/lib/rust/lib.rs @@ -5,8 +5,8 @@ pub mod jsonutils; pub mod k8s; pub mod logging; mod macros; +pub mod store; pub mod time; -pub mod trace; pub mod util; pub mod watch; diff --git a/lib/rust/trace/mod.rs b/lib/rust/store/mod.rs similarity index 74% rename from lib/rust/trace/mod.rs rename to lib/rust/store/mod.rs index 3845ccdf..155c93a5 100644 --- a/lib/rust/trace/mod.rs +++ b/lib/rust/store/mod.rs @@ -1,16 +1,16 @@ pub mod storage; mod trace_filter; -mod tracer; +mod trace_store; use std::collections::HashMap; pub use self::trace_filter::TraceFilter; -pub use self::tracer::{ +pub use self::trace_store::{ TraceEvent, - Tracer, + TraceStore, }; type OwnedPodMap = HashMap>>; #[cfg(test)] -mod test; +mod tests; diff --git a/lib/rust/trace/storage.rs b/lib/rust/store/storage.rs similarity index 100% rename from lib/rust/trace/storage.rs rename to lib/rust/store/storage.rs diff --git a/lib/rust/trace/test/mod.rs b/lib/rust/store/tests/mod.rs similarity index 64% rename from lib/rust/trace/test/mod.rs rename to lib/rust/store/tests/mod.rs index 7d00e3bd..327f08de 100644 --- a/lib/rust/trace/test/mod.rs +++ b/lib/rust/store/tests/mod.rs @@ -1,4 +1,4 @@ -mod tracer; +mod trace_store; use rstest::*; diff --git a/lib/rust/trace/test/tracer.rs b/lib/rust/store/tests/trace_store.rs similarity index 91% rename from lib/rust/trace/test/tracer.rs rename to lib/rust/store/tests/trace_store.rs index 7e507274..4878021d 100644 --- a/lib/rust/trace/test/tracer.rs +++ b/lib/rust/store/tests/trace_store.rs @@ -9,7 +9,7 @@ const TESTING_NAMESPACE: &str = "test"; const EMPTY_SPEC_HASH: u64 = 15130871412783076140; #[fixture] -fn tracer() -> Tracer { +fn tracer() -> TraceStore { Default::default() } @@ -28,7 +28,7 @@ fn test_obj(#[default(TESTING_NAMESPACE)] namespace: &str, #[default("obj")] nam #[rstest] #[tokio::test] -async fn test_create_or_update_obj(mut tracer: Tracer, test_obj: DynamicObject) { +async fn test_create_or_update_obj(mut tracer: TraceStore, test_obj: DynamicObject) { let ns_name = test_obj.namespaced_name(); let ts: i64 = 1234; @@ -46,7 +46,7 @@ async fn test_create_or_update_obj(mut tracer: Tracer, test_obj: DynamicObject) #[rstest] #[tokio::test] -async fn test_create_or_update_objs(mut tracer: Tracer) { +async fn test_create_or_update_objs(mut tracer: TraceStore) { let obj_names = vec!["obj1", "obj2"]; let ts = vec![1234, 3445]; let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); @@ -71,7 +71,7 @@ async fn test_create_or_update_objs(mut tracer: Tracer) { #[rstest] #[tokio::test] -async fn test_delete_obj(mut tracer: Tracer, test_obj: DynamicObject) { +async fn test_delete_obj(mut tracer: TraceStore, test_obj: DynamicObject) { let ns_name = test_obj.namespaced_name(); let ts: i64 = 1234; @@ -88,7 +88,7 @@ async fn test_delete_obj(mut tracer: Tracer, test_obj: DynamicObject) { #[rstest] #[tokio::test] -async fn test_recreate_index_all_new(mut tracer: Tracer) { +async fn test_recreate_index_all_new(mut tracer: TraceStore) { let obj_names = vec!["obj1", "obj2", "obj3"]; let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); let ts: i64 = 1234; @@ -110,7 +110,7 @@ async fn test_recreate_index_all_new(mut tracer: Tracer) { #[rstest] #[tokio::test] -async fn test_recreate_index_with_created_obj(mut tracer: Tracer) { +async fn test_recreate_index_with_created_obj(mut tracer: TraceStore) { let obj_names = vec!["obj1", "obj2", "obj3", "obj4"]; let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); let ts = vec![1234, 2445]; @@ -137,7 +137,7 @@ async fn test_recreate_index_with_created_obj(mut tracer: Tracer) { #[rstest] #[tokio::test] -async fn test_recreate_index_with_deleted_obj(mut tracer: Tracer) { +async fn test_recreate_index_with_deleted_obj(mut tracer: TraceStore) { let obj_names = vec!["obj1", "obj2", "obj3"]; let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); let ts = vec![1234, 2445]; diff --git a/lib/rust/trace/trace_filter.rs b/lib/rust/store/trace_filter.rs similarity index 100% rename from lib/rust/trace/trace_filter.rs rename to lib/rust/store/trace_filter.rs diff --git a/lib/rust/trace/tracer.rs b/lib/rust/store/trace_store.rs similarity index 95% rename from lib/rust/trace/tracer.rs rename to lib/rust/store/trace_store.rs index 154d1073..d8aae3f0 100644 --- a/lib/rust/trace/tracer.rs +++ b/lib/rust/store/trace_store.rs @@ -41,22 +41,22 @@ pub struct TraceEvent { } #[derive(Default)] -pub struct Tracer { +pub struct TraceStore { pub(super) config: TracerConfig, pub(super) events: VecDeque, pub(super) _pod_owners: OwnedPodMap, pub(super) index: HashMap, } -impl Tracer { - pub fn new(config: TracerConfig) -> Arc> { - Arc::new(Mutex::new(Tracer { config, ..Default::default() })) +impl TraceStore { + pub fn new(config: TracerConfig) -> Arc> { + Arc::new(Mutex::new(TraceStore { config, ..Default::default() })) } - pub fn import(data: Vec) -> anyhow::Result { + pub fn import(data: Vec) -> anyhow::Result { let (config, events): (TracerConfig, VecDeque) = rmp_serde::from_slice(&data)?; - let mut tracer = Tracer { config, events, ..Default::default() }; + let mut tracer = TraceStore { config, events, ..Default::default() }; let (_, index) = tracer.collect_events(0, i64::MAX, &TraceFilter::blank()); tracer.index = index; @@ -200,7 +200,7 @@ pub struct TraceIterator<'a> { idx: usize, } -impl<'a> Tracer { +impl<'a> TraceStore { pub fn iter(&'a self) -> TraceIterator<'a> { TraceIterator { events: &self.events, idx: 0 } } diff --git a/lib/rust/trace/tests/mod.rs b/lib/rust/trace/tests/mod.rs deleted file mode 100644 index 7d00e3bd..00000000 --- a/lib/rust/trace/tests/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod tracer; - -use rstest::*; - -use super::*; diff --git a/lib/rust/trace/tests/tracer.rs b/lib/rust/trace/tests/tracer.rs deleted file mode 100644 index 7e507274..00000000 --- a/lib/rust/trace/tests/tracer.rs +++ /dev/null @@ -1,163 +0,0 @@ -use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; -use kube::api::DynamicObject; -use serde_json::json; - -use super::*; -use crate::k8s::KubeResourceExt; - -const TESTING_NAMESPACE: &str = "test"; -const EMPTY_SPEC_HASH: u64 = 15130871412783076140; - -#[fixture] -fn tracer() -> Tracer { - Default::default() -} - -#[fixture] -fn test_obj(#[default(TESTING_NAMESPACE)] namespace: &str, #[default("obj")] name: &str) -> DynamicObject { - DynamicObject { - metadata: metav1::ObjectMeta { - namespace: Some(namespace.into()), - name: Some(name.into()), - ..Default::default() - }, - types: None, - data: json!({"spec": {}}), - } -} - -#[rstest] -#[tokio::test] -async fn test_create_or_update_obj(mut tracer: Tracer, test_obj: DynamicObject) { - let ns_name = test_obj.namespaced_name(); - let ts: i64 = 1234; - - // test idempotency, if we create the same obj twice nothing should change - tracer.create_or_update_obj(&test_obj, ts, None); - tracer.create_or_update_obj(&test_obj, 2445, None); - - assert_eq!(tracer.index.len(), 1); - assert_eq!(tracer.index[&ns_name], EMPTY_SPEC_HASH); - assert_eq!(tracer.events.len(), 1); - assert_eq!(tracer.events[0].applied_objs.len(), 1); - assert_eq!(tracer.events[0].deleted_objs.len(), 0); - assert_eq!(tracer.events[0].ts, ts); -} - -#[rstest] -#[tokio::test] -async fn test_create_or_update_objs(mut tracer: Tracer) { - let obj_names = vec!["obj1", "obj2"]; - let ts = vec![1234, 3445]; - let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); - - for i in 0..objs.len() { - tracer.create_or_update_obj(&objs[i], ts[i], None); - } - - assert_eq!(tracer.index.len(), objs.len()); - for p in objs.iter() { - let ns_name = p.namespaced_name(); - assert_eq!(tracer.index[&ns_name], EMPTY_SPEC_HASH); - } - assert_eq!(tracer.events.len(), 2); - - for i in 0..objs.len() { - assert_eq!(tracer.events[i].applied_objs.len(), 1); - assert_eq!(tracer.events[i].deleted_objs.len(), 0); - assert_eq!(tracer.events[i].ts, ts[i]); - } -} - -#[rstest] -#[tokio::test] -async fn test_delete_obj(mut tracer: Tracer, test_obj: DynamicObject) { - let ns_name = test_obj.namespaced_name(); - let ts: i64 = 1234; - - tracer.index.insert(ns_name.clone(), EMPTY_SPEC_HASH); - - tracer.delete_obj(&test_obj, ts); - - assert_eq!(tracer.index.len(), 0); - assert_eq!(tracer.events.len(), 1); - assert_eq!(tracer.events[0].applied_objs.len(), 0); - assert_eq!(tracer.events[0].deleted_objs.len(), 1); - assert_eq!(tracer.events[0].ts, ts); -} - -#[rstest] -#[tokio::test] -async fn test_recreate_index_all_new(mut tracer: Tracer) { - let obj_names = vec!["obj1", "obj2", "obj3"]; - let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); - let ts: i64 = 1234; - - // Calling it twice shouldn't change the tracked objs - tracer.update_all_objs(&objs, ts); - tracer.update_all_objs(&objs, 2445); - - assert_eq!(tracer.index.len(), objs.len()); - for p in objs.iter() { - let ns_name = p.namespaced_name(); - assert_eq!(tracer.index[&ns_name], EMPTY_SPEC_HASH); - } - assert_eq!(tracer.events.len(), 1); - assert_eq!(tracer.events[0].applied_objs.len(), 3); - assert_eq!(tracer.events[0].deleted_objs.len(), 0); - assert_eq!(tracer.events[0].ts, ts); -} - -#[rstest] -#[tokio::test] -async fn test_recreate_index_with_created_obj(mut tracer: Tracer) { - let obj_names = vec!["obj1", "obj2", "obj3", "obj4"]; - let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); - let ts = vec![1234, 2445]; - - // Calling it twice shouldn't change the tracked objs - let mut fewer_objs = objs.clone(); - fewer_objs.pop(); - tracer.update_all_objs(&fewer_objs, ts[0]); - tracer.update_all_objs(&objs, ts[1]); - - assert_eq!(tracer.index.len(), objs.len()); - for p in fewer_objs.iter() { - let ns_name = p.namespaced_name(); - assert_eq!(tracer.index[&ns_name], EMPTY_SPEC_HASH); - } - assert_eq!(tracer.events.len(), 2); - assert_eq!(tracer.events[0].applied_objs.len(), 3); - assert_eq!(tracer.events[0].deleted_objs.len(), 0); - assert_eq!(tracer.events[0].ts, ts[0]); - assert_eq!(tracer.events[1].applied_objs.len(), 1); - assert_eq!(tracer.events[1].deleted_objs.len(), 0); - assert_eq!(tracer.events[1].ts, ts[1]); -} - -#[rstest] -#[tokio::test] -async fn test_recreate_index_with_deleted_obj(mut tracer: Tracer) { - let obj_names = vec!["obj1", "obj2", "obj3"]; - let objs: Vec<_> = obj_names.iter().map(|p| test_obj("test", p)).collect(); - let ts = vec![1234, 2445]; - - // Calling it twice shouldn't change the tracked objs - tracer.update_all_objs(&objs, ts[0]); - let mut fewer_objs = objs.clone(); - fewer_objs.pop(); - tracer.update_all_objs(&fewer_objs, ts[1]); - - assert_eq!(tracer.index.len(), fewer_objs.len()); - for p in fewer_objs.iter() { - let ns_name = p.namespaced_name(); - assert_eq!(tracer.index[&ns_name], EMPTY_SPEC_HASH); - } - assert_eq!(tracer.events.len(), 2); - assert_eq!(tracer.events[0].applied_objs.len(), 3); - assert_eq!(tracer.events[0].deleted_objs.len(), 0); - assert_eq!(tracer.events[0].ts, ts[0]); - assert_eq!(tracer.events[1].applied_objs.len(), 0); - assert_eq!(tracer.events[1].deleted_objs.len(), 1); - assert_eq!(tracer.events[1].ts, ts[1]); -} diff --git a/lib/rust/watch/dyn_obj_watcher.rs b/lib/rust/watch/dyn_obj_watcher.rs index 2b0e5445..8c4d4840 100644 --- a/lib/rust/watch/dyn_obj_watcher.rs +++ b/lib/rust/watch/dyn_obj_watcher.rs @@ -25,21 +25,21 @@ use crate::k8s::{ GVK, }; use crate::prelude::*; +use crate::store::TraceStore; use crate::time::{ Clockable, UtcClock, }; -use crate::trace::Tracer; pub struct DynObjWatcher { clock: Box, obj_stream: SelectAll, - tracer: Arc>, + store: Arc>, } impl DynObjWatcher { pub async fn new( - tracer: Arc>, + store: Arc>, apiset: &mut ApiSet, tracked_objects: &HashMap, ) -> anyhow::Result { @@ -52,16 +52,16 @@ impl DynObjWatcher { Ok(DynObjWatcher { clock: Box::new(UtcClock), obj_stream: select_all(apis), - tracer, + store, }) } pub fn new_from_parts( objs: KubeObjectStream, - tracer: Arc>, + store: Arc>, clock: Box, ) -> DynObjWatcher { - DynObjWatcher { obj_stream: select_all(vec![objs]), tracer, clock } + DynObjWatcher { obj_stream: select_all(vec![objs]), store, clock } } pub async fn start(mut self) { @@ -76,11 +76,11 @@ impl DynObjWatcher { } fn handle_obj_event(&self, evt: Event, ts: i64) { - let mut tracer = self.tracer.lock().unwrap(); + let mut store = self.store.lock().unwrap(); match evt { - Event::Applied(obj) => tracer.create_or_update_obj(&obj, ts, None), - Event::Deleted(obj) => tracer.delete_obj(&obj, ts), - Event::Restarted(objs) => tracer.update_all_objs(&objs, ts), + Event::Applied(obj) => store.create_or_update_obj(&obj, ts, None), + Event::Deleted(obj) => store.delete_obj(&obj, ts), + Event::Restarted(objs) => store.update_all_objs(&objs, ts), }; } } diff --git a/lib/rust/watch/pod_watcher.rs b/lib/rust/watch/pod_watcher.rs index 263e4106..74080ebd 100644 --- a/lib/rust/watch/pod_watcher.rs +++ b/lib/rust/watch/pod_watcher.rs @@ -35,7 +35,7 @@ use crate::k8s::{ PodLifecycleData, GVK, }; -use crate::trace::Tracer; +use crate::store::TraceStore; type OwnerCache = SizedCache>; const CACHE_SIZE: usize = 10000; @@ -43,19 +43,19 @@ const CACHE_SIZE: usize = 10000; pub struct PodWatcher { apiset: ApiSet, pod_stream: PodStream, - tracer: Arc>, + store: Arc>, owned_pods: HashMap, owners_cache: SizedCache>, } impl PodWatcher { - pub fn new(tracer: Arc>, apiset: ApiSet) -> PodWatcher { + pub fn new(store: Arc>, apiset: ApiSet) -> PodWatcher { let pod_api: kube::Api = kube::Api::all(apiset.client().clone()); let pod_stream = watcher(pod_api, Default::default()).map_err(|e| e.into()).boxed(); PodWatcher { apiset, pod_stream, - tracer, + store, owned_pods: HashMap::new(), owners_cache: SizedCache::with_size(CACHE_SIZE), } @@ -165,8 +165,8 @@ impl PodWatcher { _ => bail!("could not store lifecycle data for {}", ns_name), }; - let mut tracer = self.tracer.lock().unwrap(); - tracer.record_pod_lifecycle(ns_name, owners, lifecycle_data); + let mut store = self.store.lock().unwrap(); + store.record_pod_lifecycle(ns_name, owners, lifecycle_data); Ok(()) } diff --git a/tests/rust/test_tracer.rs b/tests/rust/test_tracer.rs index cc4ab2f5..03b62b5c 100644 --- a/tests/rust/test_tracer.rs +++ b/tests/rust/test_tracer.rs @@ -12,11 +12,11 @@ use kube::runtime::watcher::Event; use kube::ResourceExt; use serde_json::json; use simkube::k8s::macros::*; -use simkube::time::Clockable; -use simkube::trace::{ +use simkube::store::{ TraceFilter, - Tracer, + TraceStore, }; +use simkube::time::Clockable; use simkube::watch::{ DynObjWatcher, KubeObjectStream, @@ -153,10 +153,10 @@ async fn test_export() { // Since we're just generating the results from the stream and not actually querying any // Kubernetes internals or whatever, the TracerConfig is empty. - let t = Tracer::new(Default::default()); + let s = TraceStore::new(Default::default()); // First build up the stream of test data and run the watcher (this advances time to the "end") - let w = DynObjWatcher::new_from_parts(test_stream(*clock.clone()), t.clone(), clock); + let w = DynObjWatcher::new_from_parts(test_stream(*clock.clone()), s.clone(), clock); w.start().await; // Next export the data with the chosen filters @@ -169,14 +169,14 @@ async fn test_export() { exclude_daemonsets: true, }; - let tracer = t.lock().unwrap(); + let store = s.lock().unwrap(); let (start_ts, end_ts) = (15, 46); - match tracer.export(start_ts, end_ts, &filter) { + match store.export(start_ts, end_ts, &filter) { Ok(data) => { // Confirm that the results match what we expect - let new_tracer = Tracer::import(data).unwrap(); - let expected_pods = tracer.objs_at(end_ts, &filter); - let actual_pods = new_tracer.objs(); + let new_store = TraceStore::import(data).unwrap(); + let expected_pods = store.objs_at(end_ts, &filter); + let actual_pods = new_store.objs(); println!("Expected pods: {:?}", expected_pods); println!("Actual pods: {:?}", actual_pods); assert_eq!(actual_pods, expected_pods); diff --git a/tracer/main.rs b/tracer/main.rs index 9f99a543..f01589fd 100644 --- a/tracer/main.rs +++ b/tracer/main.rs @@ -9,9 +9,9 @@ use rocket::serde::json::Json; use serde::Deserialize; use simkube::k8s::ApiSet; use simkube::prelude::*; -use simkube::trace::{ +use simkube::store::{ TraceFilter, - Tracer, + TraceStore, }; use simkube::watch::{ DynObjWatcher, @@ -39,9 +39,9 @@ struct ExportRequest { } #[rocket::post("/export", data = "")] -async fn export(req: Json, tracer: &rocket::State>>) -> Result, String> { +async fn export(req: Json, store: &rocket::State>>) -> Result, String> { debug!("export called with {:?}", req); - tracer + store .lock() .unwrap() .export(req.start_ts, req.end_ts, &req.filter) @@ -55,14 +55,14 @@ async fn run(args: &Options) -> EmptyResult { let client = Client::try_default().await.expect("failed to create kube client"); let mut apiset = ApiSet::new(&client); - let tracer = Tracer::new(config.clone()); - let dyn_obj_watcher = DynObjWatcher::new(tracer.clone(), &mut apiset, &config.tracked_objects).await?; - let pod_watcher = PodWatcher::new(tracer.clone(), apiset); + let store = TraceStore::new(config.clone()); + let dyn_obj_watcher = DynObjWatcher::new(store.clone(), &mut apiset, &config.tracked_objects).await?; + let pod_watcher = PodWatcher::new(store.clone(), apiset); let rkt_config = rocket::Config { port: args.server_port, ..Default::default() }; let server = rocket::custom(&rkt_config) .mount("/", rocket::routes![export]) - .manage(tracer.clone()); + .manage(store.clone()); tokio::select! { _ = tokio::spawn(dyn_obj_watcher.start()) => warn!("object watcher terminated"),