diff --git a/ctrl/main.rs b/ctrl/main.rs index b4d4add0..af017573 100644 --- a/ctrl/main.rs +++ b/ctrl/main.rs @@ -30,9 +30,6 @@ struct Options { #[arg(long, default_value = DRIVER_ADMISSION_WEBHOOK_PORT)] driver_port: i32, - #[arg(long)] - sim_svc_account: String, - // TODO: should support non-cert-manager for configuring certs as well #[arg(long)] use_cert_manager: bool, diff --git a/ctrl/objects.rs b/ctrl/objects.rs index cf0697dc..238e0d36 100644 --- a/ctrl/objects.rs +++ b/ctrl/objects.rs @@ -1,3 +1,5 @@ +use std::env; + use k8s_openapi::api::admissionregistration::v1 as admissionv1; use k8s_openapi::api::batch::v1 as batchv1; use k8s_openapi::api::core::v1 as corev1; @@ -102,6 +104,8 @@ pub(super) fn build_driver_job( }; let (cert_vm, cert_volume, cert_mount_path) = build_certificate_volumes(cert_secret_name); + let service_account = Some(env::var("POD_SVC_ACCOUNT")?); + Ok(batchv1::Job { metadata: build_object_meta(&ctx.driver_ns, &ctx.driver_name, &ctx.name, owner)?, spec: Some(batchv1::JobSpec { @@ -113,12 +117,17 @@ pub(super) fn build_driver_job( command: Some(vec!["/sk-driver".into()]), args: Some(build_driver_args(ctx, cert_mount_path, trace_mount_path)), image: Some(ctx.opts.driver_image.clone()), + env: Some(vec![corev1::EnvVar { + name: "RUST_BACKTRACE".into(), + value: Some("1".into()), + ..Default::default() + }]), volume_mounts: Some(vec![trace_vm, cert_vm]), ..Default::default() }], restart_policy: Some("Never".into()), volumes: Some(vec![trace_volume, cert_volume]), - service_account: Some(ctx.opts.sim_svc_account.clone()), + service_account, ..Default::default() }), ..Default::default() diff --git a/driver/main.rs b/driver/main.rs index 87228ca6..80380cb6 100644 --- a/driver/main.rs +++ b/driver/main.rs @@ -88,14 +88,21 @@ async fn run(opts: Options) -> EmptyResult { let server_task = tokio::spawn(server.launch()); - // Give the mutation handler a bit of time to come online before starting the sim:w + // Give the mutation handler a bit of time to come online before starting the sim sleep(Duration::from_secs(5)).await; let runner = TraceRunner::new(ctx.clone()).await?; tokio::select! { _ = server_task => warn!("server terminated"), - res = tokio::spawn(runner.run()) => info!("simulation runner completed: {res:?}"), + res = tokio::spawn(runner.run()) => { + let flattened_res = match res { + Ok(r) => r, + Err(err) => Err(err.into()), + }; + + info!("simulation runner completed: {flattened_res:?}"); + }, }; Ok(()) diff --git a/driver/mutation.rs b/driver/mutation.rs index 79865dab..d5672eef 100644 --- a/driver/mutation.rs +++ b/driver/mutation.rs @@ -4,11 +4,13 @@ use json_patch::{ PatchOperation, }; use k8s_openapi::api::core::v1 as corev1; +use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1; use kube::core::admission::{ AdmissionRequest, AdmissionResponse, AdmissionReview, }; +use kube::ResourceExt; use rocket::serde::json::Json; use serde_json::{ json, @@ -55,44 +57,81 @@ pub async fn handler( // TODO when we get the pod object, the final name hasn't been filled in yet; make sure this // doesn't cause any problems pub(super) async fn mutate_pod( - ctx: &rocket::State, + ctx: &DriverContext, resp: AdmissionResponse, pod: &corev1::Pod, ) -> anyhow::Result { - { - // enclose in a block so we release the mutex when we're done + // enclose in a block so we release the mutex when we're done + let owners = { let mut owners_cache = ctx.owners_cache.lock().await; - let owners = owners_cache.compute_owner_chain(pod).await?; + owners_cache.compute_owner_chain(pod).await? + }; - if owners.iter().all(|o| o.name != ctx.sim_root) { - return Ok(resp); - } + if !owners.iter().any(|o| o.name == ctx.sim_root) { + return Ok(resp); } let mut patches = vec![]; + add_simulation_labels(ctx, pod, &mut patches)?; + add_lifecycle_annotation(ctx, pod, &owners, &mut patches)?; + add_node_selector_tolerations(pod, &mut patches)?; + + Ok(resp.with_patch(Patch(patches))?) +} + +fn add_simulation_labels(ctx: &DriverContext, pod: &corev1::Pod, patches: &mut Vec) -> EmptyResult { if pod.metadata.labels.is_none() { patches.push(PatchOperation::Add(AddOperation { path: "/metadata/labels".into(), value: json!({}) })); } + patches.push(PatchOperation::Add(AddOperation { + path: format!("/metadata/labels/{}", jsonutils::escape(SIMULATION_LABEL_KEY)), + value: Value::String(ctx.name.clone()), + })); + + Ok(()) +} + +fn add_lifecycle_annotation( + ctx: &DriverContext, + pod: &corev1::Pod, + owners: &Vec, + patches: &mut Vec, +) -> EmptyResult { + if let Some(orig_ns) = pod.annotations().get(ORIG_NAMESPACE_ANNOTATION_KEY) { + for owner in owners { + let owner_ns_name = format!("{}/{}", orig_ns, owner.name); + let lifecycle = ctx.store.lookup_pod_lifecycle(pod, &owner_ns_name, 0)?; + if let Some(patch) = lifecycle.to_annotation_patch() { + if pod.metadata.annotations.is_none() { + patches.push(PatchOperation::Add(AddOperation { + path: "/metadata/annotations".into(), + value: json!({}), + })); + } + patches.push(patch); + break; + } + } + } + + warn!("no pod lifecycle data found for {}", pod.namespaced_name()); + Ok(()) +} +fn add_node_selector_tolerations(pod: &corev1::Pod, patches: &mut Vec) -> EmptyResult { if pod.spec()?.tolerations.is_none() { patches.push(PatchOperation::Add(AddOperation { path: "/spec/tolerations".into(), value: json!([]) })); } + patches.push(PatchOperation::Add(AddOperation { + path: "/spec/nodeSelector".into(), + value: json!({"type": "virtual"}), + })); + patches.push(PatchOperation::Add(AddOperation { + path: "/spec/tolerations/-".into(), + value: json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "value": "true"}), + })); - patches.extend(vec![ - PatchOperation::Add(AddOperation { - path: format!("/metadata/labels/{}", jsonutils::escape(SIMULATION_LABEL_KEY)), - value: Value::String(ctx.name.clone()), - }), - PatchOperation::Add(AddOperation { - path: "/spec/nodeSelector".into(), - value: json!({"type": "virtual"}), - }), - PatchOperation::Add(AddOperation { - path: "/spec/tolerations/-".into(), - value: json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "value": "true"}), - }), - ]); - Ok(resp.with_patch(Patch(patches))?) + Ok(()) } // Have to duplicate this fn because AdmissionResponse::into_review uses the dynamic API diff --git a/driver/runner.rs b/driver/runner.rs index 99651a8a..965f7bc5 100644 --- a/driver/runner.rs +++ b/driver/runner.rs @@ -8,10 +8,8 @@ use kube::api::{ Patch, PatchParams, }; -use kube::{ - Resource, - ResourceExt, -}; +use kube::ResourceExt; +use serde_json::json; use simkube::jsonutils; use simkube::k8s::{ add_common_metadata, @@ -41,24 +39,36 @@ fn build_virtual_ns(ctx: &DriverContext, owner: &SimulationRoot, namespace: &str fn build_virtual_obj( ctx: &DriverContext, owner: &SimulationRoot, - namespace: &str, + original_ns: &str, + virtual_ns: &str, obj: &DynamicObject, + pod_spec_template_path: &str, ) -> anyhow::Result { let mut vobj = obj.clone(); + add_common_metadata(&ctx.name, owner, &mut vobj.metadata)?; + vobj.metadata.namespace = Some(virtual_ns.into()); + klabel_insert!(vobj, VIRTUAL_LABEL_KEY = "true"); - vobj.metadata.namespace = Some(namespace.into()); + jsonutils::patch_ext::add( + &format!("{}/metadata", pod_spec_template_path), + "annotations", + &json!({}), + &mut vobj.data, + false, + )?; + jsonutils::patch_ext::add( + &format!("{}/metadata/annotations", pod_spec_template_path), + ORIG_NAMESPACE_ANNOTATION_KEY, + &json!(original_ns), + &mut vobj.data, + true, + )?; jsonutils::patch_ext::remove("", "status", &mut vobj.data)?; - klabel_insert!(vobj, VIRTUAL_LABEL_KEY = "true"); - add_common_metadata(&ctx.name, owner, &mut vobj.metadata)?; Ok(vobj) } -fn prefixed_ns(prefix: &str, obj: &impl Resource) -> String { - format!("{}-{}", prefix, obj.namespace().unwrap()) -} - pub struct TraceRunner { ctx: DriverContext, client: kube::Client, @@ -84,18 +94,27 @@ impl TraceRunner { // this will panic/fail if that is not true. for obj in &evt.applied_objs { let gvk = GVK::from_dynamic_obj(&obj)?; - let vns_name = prefixed_ns(&self.ctx.virtual_ns_prefix, obj); - let vobj = build_virtual_obj(&self.ctx, &self.root, &vns_name, obj)?; + let original_ns = obj.namespace().unwrap(); + let virtual_ns = format!("{}-{}", self.ctx.virtual_ns_prefix, original_ns); - if ns_api.get_opt(&vns_name).await?.is_none() { - info!("creating virtual namespace: {vns_name}"); - let vns = build_virtual_ns(&self.ctx, &self.root, &vns_name)?; + if ns_api.get_opt(&virtual_ns).await?.is_none() { + info!("creating virtual namespace: {virtual_ns}"); + let vns = build_virtual_ns(&self.ctx, &self.root, &virtual_ns)?; ns_api.create(&Default::default(), &vns).await?; } + let pod_spec_template_path = self + .ctx + .store + .config() + .pod_spec_template_path(&gvk) + .ok_or(anyhow!("unknown simulated object: {:?}", gvk))?; + let vobj = + build_virtual_obj(&self.ctx, &self.root, &original_ns, &virtual_ns, obj, pod_spec_template_path)?; + info!("applying object {}", vobj.namespaced_name()); apiset - .namespaced_api_for(&gvk, vns_name) + .namespaced_api_for(&gvk, virtual_ns) .await? .patch(&vobj.name_any(), &PatchParams::apply("simkube"), &Patch::Apply(&vobj)) .await?; @@ -104,9 +123,9 @@ impl TraceRunner { for obj in &evt.deleted_objs { info!("deleting object {}", obj.namespaced_name()); let gvk = GVK::from_dynamic_obj(obj)?; - let vns_name = prefixed_ns(&self.ctx.virtual_ns_prefix, obj); + let virtual_ns = format!("{}-{}", self.ctx.virtual_ns_prefix, obj.namespace().unwrap()); apiset - .namespaced_api_for(&gvk, vns_name) + .namespaced_api_for(&gvk, virtual_ns) .await? .delete(&obj.name_any(), &Default::default()) .await?; diff --git a/driver/tests/mutation_test.rs b/driver/tests/mutation_test.rs index 19ab5ffe..9a2f8c77 100644 --- a/driver/tests/mutation_test.rs +++ b/driver/tests/mutation_test.rs @@ -20,7 +20,7 @@ use kube::core::{ use kube::ResourceExt; use rocket::serde::json::Json; use simkube::testutils::fake::make_fake_apiserver; -use simkube::testutils::test_pod; +use simkube::testutils::*; use tracing_test::traced_test; use super::*; @@ -124,6 +124,9 @@ async fn test_mutate_pod_not_owned_by_sim(mut test_pod: corev1::Pod, mut adm_res #[rstest] #[tokio::test] async fn test_mutate_pod(mut test_pod: corev1::Pod, mut adm_resp: AdmissionResponse) { + test_pod + .annotations_mut() + .insert(ORIG_NAMESPACE_ANNOTATION_KEY.into(), TEST_NAMESPACE.into()); let owner = metav1::OwnerReference { name: TEST_SIM_ROOT_NAME.into(), ..Default::default() diff --git a/k8s/sk_ctrl.py b/k8s/sk_ctrl.py index ad1b5a49..eeecb132 100644 --- a/k8s/sk_ctrl.py +++ b/k8s/sk_ctrl.py @@ -29,7 +29,6 @@ def __init__(self, scope: Construct, namespace: str): args=[ "/sk-ctrl", "--driver-image", driver_image, - "--sim-svc-account", "$POD_SVC_ACCOUNT", "--use-cert-manager", "--cert-manager-issuer", "selfsigned", ], diff --git a/k8s/sk_tracer.py b/k8s/sk_tracer.py index ac9fa954..8678e6bc 100644 --- a/k8s/sk_tracer.py +++ b/k8s/sk_tracer.py @@ -10,9 +10,9 @@ TRACER_CONFIG_YML = """--- trackedObjects: apps/v1.Deployment: - podSpecPath: /spec/template/spec + podSpecTemplatePath: /spec/template batch.volcano.sh/v1alpha1.Job: - podSpecPath: /spec/tasks/*/template/spec + podSpecTemplatePath: /spec/tasks/*/template """ CONFIGMAP_NAME = "tracer-config" diff --git a/lib/rust/config.rs b/lib/rust/config.rs index e7b528fa..f576ee83 100644 --- a/lib/rust/config.rs +++ b/lib/rust/config.rs @@ -11,7 +11,7 @@ use crate::k8s::GVK; #[derive(Clone, Debug, Default, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct TrackedObjectConfig { - pub pod_spec_path: String, + pub pod_spec_template_path: String, } #[derive(Clone, Debug, Default, Deserialize, Serialize)] @@ -24,4 +24,8 @@ impl TracerConfig { pub fn load(filename: &str) -> anyhow::Result { Ok(serde_yaml::from_reader(File::open(filename)?)?) } + + pub fn pod_spec_template_path(&self, gvk: &GVK) -> Option<&str> { + Some(&self.tracked_objects.get(gvk)?.pod_spec_template_path) + } } diff --git a/lib/rust/constants.rs b/lib/rust/constants.rs index b4c354c6..1884ce2e 100644 --- a/lib/rust/constants.rs +++ b/lib/rust/constants.rs @@ -1,4 +1,6 @@ pub const DRIVER_ADMISSION_WEBHOOK_PORT: &str = "8888"; +pub const LIFETIME_ANNOTATION_KEY: &str = "simkube.io/lifetime-seconds"; +pub const ORIG_NAMESPACE_ANNOTATION_KEY: &str = "simkube.io/original-namespace"; pub const SIMULATION_LABEL_KEY: &str = "simkube.io/simulation"; pub const VIRTUAL_LABEL_KEY: &str = "simkube.io/virtual"; pub const VIRTUAL_NODE_TOLERATION_KEY: &str = "simkube.io/virtual-node"; diff --git a/lib/rust/k8s/mod.rs b/lib/rust/k8s/mod.rs index 0879116e..6db811bb 100644 --- a/lib/rust/k8s/mod.rs +++ b/lib/rust/k8s/mod.rs @@ -49,6 +49,7 @@ pub trait KubeResourceExt { pub trait PodExt { fn spec(&self) -> anyhow::Result<&corev1::PodSpec>; + fn stable_spec(&self) -> anyhow::Result; fn status(&self) -> anyhow::Result<&corev1::PodStatus>; } diff --git a/lib/rust/k8s/owners.rs b/lib/rust/k8s/owners.rs index b888c3f5..8a60d7b4 100644 --- a/lib/rust/k8s/owners.rs +++ b/lib/rust/k8s/owners.rs @@ -32,10 +32,10 @@ impl OwnersCache { obj: &(impl Resource + Sync), ) -> anyhow::Result> { let ns_name = obj.namespaced_name(); - info!("computing owner references for {ns_name}"); + debug!("computing owner references for {ns_name}"); if let Some(owners) = self.owners.get(&ns_name) { - info!("found owners for {ns_name} in cache"); + debug!("found owners for {ns_name} in cache"); return Ok(owners.clone()); } diff --git a/lib/rust/k8s/pod_ext.rs b/lib/rust/k8s/pod_ext.rs index 92e5a351..0539fd9c 100644 --- a/lib/rust/k8s/pod_ext.rs +++ b/lib/rust/k8s/pod_ext.rs @@ -1,5 +1,19 @@ use super::*; +const KUBE_SVC_ACCOUNT_VOLUME_NAME_PREFIX: &str = "kube-api-access"; + +macro_rules! filter_volumes { + ($vols:expr) => { + $vols + .as_ref() + .unwrap_or(&vec![]) + .iter() + .filter(|v| !v.name.starts_with(KUBE_SVC_ACCOUNT_VOLUME_NAME_PREFIX)) + .cloned() + .collect() + }; +} + // Helper functions to get references to a pod's spec and status objects impl PodExt for corev1::Pod { fn spec(&self) -> anyhow::Result<&corev1::PodSpec> { @@ -9,6 +23,26 @@ impl PodExt for corev1::Pod { } } + fn stable_spec(&self) -> anyhow::Result { + let mut spec = self.spec()?.clone(); + spec.volumes = Some(filter_volumes!(spec.volumes)); + spec.node_name = None; + spec.service_account = None; + spec.service_account_name = None; + + if let Some(containers) = spec.init_containers.as_mut() { + for container in containers { + (*container).volume_mounts = Some(filter_volumes!(container.volume_mounts)); + } + } + + for container in &mut spec.containers { + (*container).volume_mounts = Some(filter_volumes!(container.volume_mounts)); + } + + Ok(spec) + } + fn status(&self) -> anyhow::Result<&corev1::PodStatus> { match self.status.as_ref() { None => bail!(KubernetesError::field_not_found("pod status")), diff --git a/lib/rust/k8s/pod_lifecycle.rs b/lib/rust/k8s/pod_lifecycle.rs index 4e7fa878..0379bd86 100644 --- a/lib/rust/k8s/pod_lifecycle.rs +++ b/lib/rust/k8s/pod_lifecycle.rs @@ -3,9 +3,16 @@ use std::cmp::{ Ordering, }; +use json_patch::{ + AddOperation, + PatchOperation, +}; use kube::ResourceExt; +use serde_json::Value; use super::*; +use crate::constants::*; +use crate::jsonutils; use crate::time::Clockable; use crate::util::min_some; @@ -137,6 +144,16 @@ impl PodLifecycleData { pub fn finished(&self) -> bool { matches!(self, PodLifecycleData::Finished(..)) } + + pub fn to_annotation_patch(&self) -> Option { + match self { + PodLifecycleData::Empty | PodLifecycleData::Running(_) => None, + PodLifecycleData::Finished(start_ts, end_ts) => Some(PatchOperation::Add(AddOperation { + path: format!("/metadata/annotations/{}", jsonutils::escape(LIFETIME_ANNOTATION_KEY)), + value: Value::String(format!("{}", end_ts - start_ts)), + })), + } + } } // We implement PartialOrd and PartialEq for PodLifecycleData; this is maybe a little bit magic, diff --git a/lib/rust/k8s/tests/util_test.rs b/lib/rust/k8s/tests/util_test.rs index 9520c8ac..8da55c78 100644 --- a/lib/rust/k8s/tests/util_test.rs +++ b/lib/rust/k8s/tests/util_test.rs @@ -1,6 +1,6 @@ use chrono::Utc; use kube::api::DynamicObject; -use serde_json::json; +use serde_json as json; use super::*; use crate::testutils::*; @@ -30,29 +30,10 @@ fn test_sanitize_obj() { ..Default::default() }, types: None, - data: json!({ - "foo": { - "bars": [{ - "spec": { - "nodeName": "foo", - "serviceAccountName": "bar", - "nodeSelector": {"buz": "biz"}, - }, - }, - { - "spec": {}, - }, - { - "spec": { - "serviceAccount": "flumm", - }, - }, - ], - }, - }), + data: json::Value::Null, }; - sanitize_obj(&mut obj, "/foo/bars/*/spec", "bar.blah.sh/v2", "Stuff"); + sanitize_obj(&mut obj, "bar.blah.sh/v2", "Stuff"); assert_eq!(obj.metadata.creation_timestamp, None); assert_eq!(obj.metadata.deletion_timestamp, None); @@ -67,23 +48,6 @@ fn test_sanitize_obj() { assert!(obj .types .is_some_and(|tm| tm.api_version == "bar.blah.sh/v2" && tm.kind == "Stuff")); - - assert_eq!( - obj.data, - json!({ - "foo": { - "bars": [ - { - "spec": { - "nodeSelector": {"buz": "biz"}, - }, - }, - { "spec": {} }, - { "spec": {} }, - ], - }, - }) - ); } fn build_label_sel(key: &str, op: &str, value: Option<&str>) -> metav1::LabelSelector { diff --git a/lib/rust/k8s/util.rs b/lib/rust/k8s/util.rs index 90d4e65c..685e22f2 100644 --- a/lib/rust/k8s/util.rs +++ b/lib/rust/k8s/util.rs @@ -8,12 +8,10 @@ use kube::api::{ TypeMeta, }; use serde_json as json; -use tracing::*; use super::*; use crate::constants::SIMULATION_LABEL_KEY; use crate::errors::*; -use crate::jsonutils; pub fn add_common_metadata(sim_name: &str, owner: &K, meta: &mut metav1::ObjectMeta) -> EmptyResult where @@ -78,7 +76,7 @@ pub fn list_params_for(namespace: &str, name: &str) -> ListParams { } } -pub fn sanitize_obj(obj: &mut DynamicObject, pod_spec_path: &str, api_version: &str, kind: &str) { +pub fn sanitize_obj(obj: &mut DynamicObject, api_version: &str, kind: &str) { obj.metadata.creation_timestamp = None; obj.metadata.deletion_timestamp = None; obj.metadata.deletion_grace_period_seconds = None; @@ -93,12 +91,6 @@ pub fn sanitize_obj(obj: &mut DynamicObject, pod_spec_path: &str, api_version: & a.remove(DEPL_REVISION_LABEL_KEY); } - for key in &["nodeName", "serviceAccount", "serviceAccountName"] { - if let Err(e) = jsonutils::patch_ext::remove(pod_spec_path, key, &mut obj.data) { - debug!("could not patch object {}, skipping: {}", obj.namespaced_name(), e); - } - } - obj.types = Some(TypeMeta { api_version: api_version.into(), kind: kind.into() }); } diff --git a/lib/rust/lib.rs b/lib/rust/lib.rs index 71f07aae..c59014f7 100644 --- a/lib/rust/lib.rs +++ b/lib/rust/lib.rs @@ -43,6 +43,7 @@ pub mod prelude { pub use crate::k8s::{ KubeResourceExt, PodExt, + PodLifecycleData, }; pub use crate::logging; } diff --git a/lib/rust/store/pod_owners_map.rs b/lib/rust/store/pod_owners_map.rs index 2b43f2bd..7efd2316 100644 --- a/lib/rust/store/pod_owners_map.rs +++ b/lib/rust/store/pod_owners_map.rs @@ -82,6 +82,14 @@ impl PodOwnersMap { self.index.insert(ns_name.into(), (owner_ns_name.into(), hash, idx)); } + pub(super) fn lifecycle_data_for<'a>( + &'a self, + owner_ns_name: &str, + pod_hash: u64, + ) -> Option<&'a Vec> { + Some(self.m.get(owner_ns_name)?.get(&pod_hash)?) + } + pub(super) fn update_pod_lifecycle(&mut self, ns_name: &str, lifecycle_data: PodLifecycleData) -> EmptyResult { match self.index.get(ns_name) { None => bail!("pod {} not present in index", ns_name), @@ -156,10 +164,6 @@ pub(super) fn filter_lifecycles_map( #[cfg(test)] impl PodOwnersMap { - pub(super) fn lifecycle_data_for(&self, owner_ns_name: &str, pod_hash: &u64) -> Option> { - Some(self.m.get(owner_ns_name)?.get(pod_hash)?.clone()) - } - pub(super) fn pod_owner_meta(&self, ns_name: &str) -> Option<&(String, u64, usize)> { self.index.get(ns_name) } diff --git a/lib/rust/store/tests/import_export_test.rs b/lib/rust/store/tests/import_export_test.rs index b23751df..2d952ff4 100644 --- a/lib/rust/store/tests/import_export_test.rs +++ b/lib/rust/store/tests/import_export_test.rs @@ -157,7 +157,7 @@ async fn itest_export() { // Confirm that the results match what we expect let new_store = TraceStore::import(data).unwrap(); let expected_pods = store.objs_at(end_ts, &filter); - let actual_pods = new_store.objs(); + let actual_pods = new_store.objs_at(end_ts, &filter); println!("Expected pods: {:?}", expected_pods); println!("Actual pods: {:?}", actual_pods); assert_eq!(actual_pods, expected_pods); diff --git a/lib/rust/store/tests/pod_owners_map_test.rs b/lib/rust/store/tests/pod_owners_map_test.rs index a602bd65..753951f7 100644 --- a/lib/rust/store/tests/pod_owners_map_test.rs +++ b/lib/rust/store/tests/pod_owners_map_test.rs @@ -15,11 +15,11 @@ fn test_store_new_pod_lifecycle(mut owners_map: PodOwnersMap) { owners_map.store_new_pod_lifecycle("podC", "deployment1", 5678, PodLifecycleData::Running(9)); owners_map.store_new_pod_lifecycle("podD", "deployment2", 5678, PodLifecycleData::Running(13)); assert_eq!( - owners_map.lifecycle_data_for("deployment1", &1234).unwrap(), - vec![PodLifecycleData::Running(5), PodLifecycleData::Running(7)] + owners_map.lifecycle_data_for("deployment1", 1234).unwrap(), + &vec![PodLifecycleData::Running(5), PodLifecycleData::Running(7)] ); - assert_eq!(owners_map.lifecycle_data_for("deployment1", &5678).unwrap(), vec![PodLifecycleData::Running(9)]); - assert_eq!(owners_map.lifecycle_data_for("deployment2", &5678).unwrap(), vec![PodLifecycleData::Running(13)]); + assert_eq!(owners_map.lifecycle_data_for("deployment1", 5678).unwrap(), &vec![PodLifecycleData::Running(9)]); + assert_eq!(owners_map.lifecycle_data_for("deployment2", 5678).unwrap(), &vec![PodLifecycleData::Running(13)]); assert_eq!(*owners_map.pod_owner_meta("podA").unwrap(), ("deployment1".to_string(), 1234, 0)); assert_eq!(*owners_map.pod_owner_meta("podB").unwrap(), ("deployment1".to_string(), 1234, 1)); diff --git a/lib/rust/store/tests/trace_store_test.rs b/lib/rust/store/tests/trace_store_test.rs index 9cfa27bc..ceaed6a3 100644 --- a/lib/rust/store/tests/trace_store_test.rs +++ b/lib/rust/store/tests/trace_store_test.rs @@ -8,7 +8,7 @@ use crate::k8s::KubeResourceExt; use crate::testutils::*; const EMPTY_OBJ_HASH: u64 = 15130871412783076140; -const EMPTY_POD_SPEC_HASH: u64 = 16349339464234908611; +const EMPTY_POD_SPEC_HASH: u64 = 17506812802394981455; const DEPLOYMENT_NAME: &str = "the-deployment"; #[fixture] @@ -34,6 +34,34 @@ fn owner_ref() -> metav1::OwnerReference { metav1::OwnerReference { name: DEPLOYMENT_NAME.into(), ..Default::default() } } +#[rstest] +fn test_lookup_pod_lifecycle_no_owner(tracer: TraceStore, test_pod: corev1::Pod) { + let res = tracer.lookup_pod_lifecycle(&test_pod, DEPLOYMENT_NAME, 0).unwrap(); + assert_eq!(res, PodLifecycleData::Empty); +} + +#[rstest] +fn test_lookup_pod_lifecycle_no_hash(mut tracer: TraceStore, test_pod: corev1::Pod) { + tracer.index.insert(DEPLOYMENT_NAME.into(), 1234); + let res = tracer.lookup_pod_lifecycle(&test_pod, DEPLOYMENT_NAME, 0).unwrap(); + assert_eq!(res, PodLifecycleData::Empty); +} + +#[rstest] +fn test_lookup_pod_lifecycle(mut tracer: TraceStore, test_pod: corev1::Pod) { + let owner_ns_name = format!("{TEST_NAMESPACE}/{DEPLOYMENT_NAME}"); + let pod_lifecycle = PodLifecycleData::Finished(1, 2); + + tracer.index.insert(owner_ns_name.clone(), 1234); + tracer.pod_owners = PodOwnersMap::new_from_parts( + HashMap::from([(owner_ns_name.clone(), HashMap::from([(EMPTY_POD_SPEC_HASH, vec![pod_lifecycle.clone()])]))]), + HashMap::new(), + ); + + let res = tracer.lookup_pod_lifecycle(&test_pod, &owner_ns_name, 0).unwrap(); + assert_eq!(res, pod_lifecycle); +} + #[rstest] fn test_collect_events_filtered(mut tracer: TraceStore) { tracer.events = [("obj1", 0), ("obj2", 1), ("obj3", 5), ("obj4", 10), ("obj5", 15)] @@ -255,8 +283,8 @@ fn test_record_pod_lifecycle_already_stored_no_pod(mut tracer: TraceStore, owner .unwrap(); assert_eq!( - tracer.pod_owners.lifecycle_data_for(&owner_ns_name, &EMPTY_POD_SPEC_HASH), - Some(expected_lifecycle_data) + tracer.pod_owners.lifecycle_data_for(&owner_ns_name, EMPTY_POD_SPEC_HASH), + Some(&expected_lifecycle_data) ); } @@ -274,7 +302,7 @@ fn test_record_pod_lifecycle_with_new_pod_no_tracked_owner( .unwrap(); let unused_hash = 0; - assert_eq!(tracer.pod_owners.lifecycle_data_for(&owner_ns_name, &unused_hash), None); + assert_eq!(tracer.pod_owners.lifecycle_data_for(&owner_ns_name, unused_hash), None); } #[rstest] @@ -292,8 +320,8 @@ fn test_record_pod_lifecycle_with_new_pod_type( .unwrap(); assert_eq!( - tracer.pod_owners.lifecycle_data_for(&owner_ns_name, &EMPTY_POD_SPEC_HASH), - Some(vec![new_lifecycle_data]) + tracer.pod_owners.lifecycle_data_for(&owner_ns_name, EMPTY_POD_SPEC_HASH), + Some(&vec![new_lifecycle_data]) ); } @@ -322,8 +350,8 @@ fn test_record_pod_lifecycle_with_new_pod_existing_hash( .unwrap(); assert_eq!( - tracer.pod_owners.lifecycle_data_for(&owner_ns_name, &EMPTY_POD_SPEC_HASH), - Some(expected_lifecycle_data) + tracer.pod_owners.lifecycle_data_for(&owner_ns_name, EMPTY_POD_SPEC_HASH), + Some(&expected_lifecycle_data) ); } @@ -351,7 +379,7 @@ fn test_record_pod_lifecycle_with_existing_pod( .unwrap(); assert_eq!( - tracer.pod_owners.lifecycle_data_for(&owner_ns_name, &EMPTY_POD_SPEC_HASH), - Some(expected_lifecycle_data) + tracer.pod_owners.lifecycle_data_for(&owner_ns_name, EMPTY_POD_SPEC_HASH), + Some(&expected_lifecycle_data) ); } diff --git a/lib/rust/store/trace_store.rs b/lib/rust/store/trace_store.rs index f05373d3..76e7126c 100644 --- a/lib/rust/store/trace_store.rs +++ b/lib/rust/store/trace_store.rs @@ -13,6 +13,7 @@ use crate::jsonutils; use crate::k8s::{ build_deletable, KubeResourceExt, + PodExt, PodLifecycleData, }; @@ -44,7 +45,7 @@ impl TraceStore { // Collect all pod lifecycle data that is a) between the start and end times, and b) is // owned by some object contained in the trace let lifecycle_data = self.pod_owners.filter(start_ts, end_ts, &index); - let data = rmp_serde::to_vec_named(&(&self.config, &events, &lifecycle_data))?; + let data = rmp_serde::to_vec_named(&(&self.config, &events, &index, &lifecycle_data))?; info!("Exported {} events.", events.len()); Ok(data) @@ -54,24 +55,38 @@ impl TraceStore { // the metadata necessary to pick up a trace and continue. Instead, we just re-import enough // information to be able to run a simulation off the trace store. pub fn import(data: Vec) -> anyhow::Result { - let (config, events, lifecycle_data): (TracerConfig, VecDeque, HashMap) = - rmp_serde::from_slice(&data)?; - - let mut tracer = TraceStore { + let (config, events, index, lifecycle_data): ( + TracerConfig, + VecDeque, + HashMap, + HashMap, + ) = rmp_serde::from_slice(&data)?; + + Ok(TraceStore { config, events, + index, pod_owners: PodOwnersMap::new_from_parts(lifecycle_data, HashMap::new()), ..Default::default() - }; - - let (_, index) = tracer.collect_events(0, i64::MAX, &Default::default(), false); - tracer.index = index; - - Ok(tracer) + }) } - pub fn objs(&self) -> HashSet { - self.index.keys().cloned().collect() + pub fn lookup_pod_lifecycle( + &self, + pod: &corev1::Pod, + owner_ns_name: &str, + seq: usize, + ) -> anyhow::Result { + if !self.index.contains_key(owner_ns_name) { + return Ok(PodLifecycleData::Empty); + } + + let hash = jsonutils::hash(&serde_json::to_value(&pod.stable_spec()?)?); + let maybe_lifecycle_data = self.pod_owners.lifecycle_data_for(&owner_ns_name, hash); + match maybe_lifecycle_data { + Some(data) if seq < data.len() => Ok(data[seq].clone()), + _ => Ok(PodLifecycleData::Empty), + } } pub fn objs_at(&self, end_ts: i64, filter: &TraceFilter) -> HashSet { @@ -231,7 +246,7 @@ impl TraceStorable for TraceStore { // of data that are unique to each pod that won't materially impact the behaviour? // This does occur for example with coredns's volume mounts. We may need to filter // more things out from this and/or allow users to specify what is filtered out. - let hash = jsonutils::hash(&serde_json::to_value(&pod.spec)?); + let hash = jsonutils::hash(&serde_json::to_value(&pod.stable_spec()?)?); self.pod_owners .store_new_pod_lifecycle(ns_name, &owner_ns_name, hash, lifecycle_data); break; diff --git a/lib/rust/watch/dyn_obj_watcher.rs b/lib/rust/watch/dyn_obj_watcher.rs index c377bc6e..a5dd69b7 100644 --- a/lib/rust/watch/dyn_obj_watcher.rs +++ b/lib/rust/watch/dyn_obj_watcher.rs @@ -52,8 +52,8 @@ impl DynObjWatcher { tracked_objects: &HashMap, ) -> anyhow::Result { let mut apis = vec![]; - for (gvk, obj_cfg) in tracked_objects { - let stream = build_stream_for_tracked_obj(apiset, gvk, &obj_cfg.pod_spec_path).await?; + for gvk in tracked_objects.keys() { + let stream = build_stream_for_tracked_obj(apiset, gvk).await?; apis.push(stream); } @@ -88,15 +88,9 @@ impl DynObjWatcher { } } -async fn build_stream_for_tracked_obj( - apiset: &mut ApiSet, - gvk: &GVK, - pod_spec_path: &str, -) -> anyhow::Result { +async fn build_stream_for_tracked_obj(apiset: &mut ApiSet, gvk: &GVK) -> anyhow::Result { // TODO if this fails (e.g., because some custom resource isn't present in the cluster) // it will prevent the tracer from starting up - let pod_spec_path = pod_spec_path.to_owned(); - let api_version = gvk.api_version().clone(); let kind = gvk.kind.clone(); let (api, _) = apiset.api_for(gvk).await?; @@ -104,7 +98,7 @@ async fn build_stream_for_tracked_obj( Ok(watcher(api.clone(), Default::default()) // All these objects need to be cloned because they're moved into the stream here .modify(move |obj| { - sanitize_obj(obj, &pod_spec_path, &api_version, &kind); + sanitize_obj(obj, &api_version, &kind); }) .map_err(|e| e.into()) .boxed())