Skip to content

Commit

Permalink
apply lifecycle annotations (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Oct 17, 2023
1 parent deb301b commit db39f4c
Show file tree
Hide file tree
Showing 23 changed files with 275 additions and 146 deletions.
3 changes: 0 additions & 3 deletions ctrl/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ struct Options {
#[arg(long, default_value = DRIVER_ADMISSION_WEBHOOK_PORT)]
driver_port: i32,

#[arg(long)]
sim_svc_account: String,

// TODO: should support non-cert-manager for configuring certs as well
#[arg(long)]
use_cert_manager: bool,
Expand Down
11 changes: 10 additions & 1 deletion ctrl/objects.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::env;

use k8s_openapi::api::admissionregistration::v1 as admissionv1;
use k8s_openapi::api::batch::v1 as batchv1;
use k8s_openapi::api::core::v1 as corev1;
Expand Down Expand Up @@ -102,6 +104,8 @@ pub(super) fn build_driver_job(
};
let (cert_vm, cert_volume, cert_mount_path) = build_certificate_volumes(cert_secret_name);

let service_account = Some(env::var("POD_SVC_ACCOUNT")?);

Check warning on line 107 in ctrl/objects.rs

View check run for this annotation

Codecov / codecov/patch

ctrl/objects.rs#L107

Added line #L107 was not covered by tests

Ok(batchv1::Job {
metadata: build_object_meta(&ctx.driver_ns, &ctx.driver_name, &ctx.name, owner)?,
spec: Some(batchv1::JobSpec {
Expand All @@ -113,12 +117,17 @@ pub(super) fn build_driver_job(
command: Some(vec!["/sk-driver".into()]),
args: Some(build_driver_args(ctx, cert_mount_path, trace_mount_path)),
image: Some(ctx.opts.driver_image.clone()),
env: Some(vec![corev1::EnvVar {
name: "RUST_BACKTRACE".into(),
value: Some("1".into()),
..Default::default()
}]),

Check warning on line 124 in ctrl/objects.rs

View check run for this annotation

Codecov / codecov/patch

ctrl/objects.rs#L120-L124

Added lines #L120 - L124 were not covered by tests
volume_mounts: Some(vec![trace_vm, cert_vm]),
..Default::default()
}],
restart_policy: Some("Never".into()),
volumes: Some(vec![trace_volume, cert_volume]),
service_account: Some(ctx.opts.sim_svc_account.clone()),
service_account,

Check warning on line 130 in ctrl/objects.rs

View check run for this annotation

Codecov / codecov/patch

ctrl/objects.rs#L130

Added line #L130 was not covered by tests
..Default::default()
}),
..Default::default()
Expand Down
11 changes: 9 additions & 2 deletions driver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,21 @@ async fn run(opts: Options) -> EmptyResult {

let server_task = tokio::spawn(server.launch());

// Give the mutation handler a bit of time to come online before starting the sim:w
// Give the mutation handler a bit of time to come online before starting the sim

Check warning on line 91 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L91

Added line #L91 was not covered by tests
sleep(Duration::from_secs(5)).await;

let runner = TraceRunner::new(ctx.clone()).await?;

tokio::select! {
_ = server_task => warn!("server terminated"),
res = tokio::spawn(runner.run()) => info!("simulation runner completed: {res:?}"),
res = tokio::spawn(runner.run()) => {

Check warning on line 98 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L98

Added line #L98 was not covered by tests
let flattened_res = match res {
Ok(r) => r,
Err(err) => Err(err.into()),
};

info!("simulation runner completed: {flattened_res:?}");
},
};

Ok(())
Expand Down
83 changes: 61 additions & 22 deletions driver/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use json_patch::{
PatchOperation,
};
use k8s_openapi::api::core::v1 as corev1;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1;
use kube::core::admission::{
AdmissionRequest,
AdmissionResponse,
AdmissionReview,
};
use kube::ResourceExt;
use rocket::serde::json::Json;
use serde_json::{
json,
Expand Down Expand Up @@ -55,44 +57,81 @@ pub async fn handler(
// TODO when we get the pod object, the final name hasn't been filled in yet; make sure this
// doesn't cause any problems
pub(super) async fn mutate_pod(
ctx: &rocket::State<DriverContext>,
ctx: &DriverContext,
resp: AdmissionResponse,
pod: &corev1::Pod,
) -> anyhow::Result<AdmissionResponse> {
{
// enclose in a block so we release the mutex when we're done
// enclose in a block so we release the mutex when we're done
let owners = {
let mut owners_cache = ctx.owners_cache.lock().await;
let owners = owners_cache.compute_owner_chain(pod).await?;
owners_cache.compute_owner_chain(pod).await?
};

if owners.iter().all(|o| o.name != ctx.sim_root) {
return Ok(resp);
}
if !owners.iter().any(|o| o.name == ctx.sim_root) {
return Ok(resp);
}

let mut patches = vec![];
add_simulation_labels(ctx, pod, &mut patches)?;
add_lifecycle_annotation(ctx, pod, &owners, &mut patches)?;
add_node_selector_tolerations(pod, &mut patches)?;

Ok(resp.with_patch(Patch(patches))?)
}

fn add_simulation_labels(ctx: &DriverContext, pod: &corev1::Pod, patches: &mut Vec<PatchOperation>) -> EmptyResult {
if pod.metadata.labels.is_none() {
patches.push(PatchOperation::Add(AddOperation { path: "/metadata/labels".into(), value: json!({}) }));
}
patches.push(PatchOperation::Add(AddOperation {
path: format!("/metadata/labels/{}", jsonutils::escape(SIMULATION_LABEL_KEY)),
value: Value::String(ctx.name.clone()),
}));

Ok(())
}

fn add_lifecycle_annotation(
ctx: &DriverContext,
pod: &corev1::Pod,
owners: &Vec<metav1::OwnerReference>,
patches: &mut Vec<PatchOperation>,
) -> EmptyResult {
if let Some(orig_ns) = pod.annotations().get(ORIG_NAMESPACE_ANNOTATION_KEY) {
for owner in owners {
let owner_ns_name = format!("{}/{}", orig_ns, owner.name);
let lifecycle = ctx.store.lookup_pod_lifecycle(pod, &owner_ns_name, 0)?;
if let Some(patch) = lifecycle.to_annotation_patch() {
if pod.metadata.annotations.is_none() {
patches.push(PatchOperation::Add(AddOperation {
path: "/metadata/annotations".into(),
value: json!({}),
}));
}
patches.push(patch);
break;
}
}
}

warn!("no pod lifecycle data found for {}", pod.namespaced_name());
Ok(())
}

fn add_node_selector_tolerations(pod: &corev1::Pod, patches: &mut Vec<PatchOperation>) -> EmptyResult {
if pod.spec()?.tolerations.is_none() {
patches.push(PatchOperation::Add(AddOperation { path: "/spec/tolerations".into(), value: json!([]) }));
}
patches.push(PatchOperation::Add(AddOperation {
path: "/spec/nodeSelector".into(),
value: json!({"type": "virtual"}),
}));
patches.push(PatchOperation::Add(AddOperation {
path: "/spec/tolerations/-".into(),
value: json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "value": "true"}),
}));

patches.extend(vec![
PatchOperation::Add(AddOperation {
path: format!("/metadata/labels/{}", jsonutils::escape(SIMULATION_LABEL_KEY)),
value: Value::String(ctx.name.clone()),
}),
PatchOperation::Add(AddOperation {
path: "/spec/nodeSelector".into(),
value: json!({"type": "virtual"}),
}),
PatchOperation::Add(AddOperation {
path: "/spec/tolerations/-".into(),
value: json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "value": "true"}),
}),
]);
Ok(resp.with_patch(Patch(patches))?)
Ok(())
}

// Have to duplicate this fn because AdmissionResponse::into_review uses the dynamic API
Expand Down
59 changes: 39 additions & 20 deletions driver/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ use kube::api::{
Patch,
PatchParams,
};
use kube::{
Resource,
ResourceExt,
};
use kube::ResourceExt;
use serde_json::json;
use simkube::jsonutils;
use simkube::k8s::{
add_common_metadata,
Expand Down Expand Up @@ -41,24 +39,36 @@ fn build_virtual_ns(ctx: &DriverContext, owner: &SimulationRoot, namespace: &str
fn build_virtual_obj(
ctx: &DriverContext,
owner: &SimulationRoot,
namespace: &str,
original_ns: &str,
virtual_ns: &str,

Check warning on line 43 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L42-L43

Added lines #L42 - L43 were not covered by tests
obj: &DynamicObject,
pod_spec_template_path: &str,

Check warning on line 45 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L45

Added line #L45 was not covered by tests
) -> anyhow::Result<DynamicObject> {
let mut vobj = obj.clone();
add_common_metadata(&ctx.name, owner, &mut vobj.metadata)?;
vobj.metadata.namespace = Some(virtual_ns.into());
klabel_insert!(vobj, VIRTUAL_LABEL_KEY = "true");

Check warning on line 50 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L48-L50

Added lines #L48 - L50 were not covered by tests

vobj.metadata.namespace = Some(namespace.into());
jsonutils::patch_ext::add(
&format!("{}/metadata", pod_spec_template_path),
"annotations",
&json!({}),
&mut vobj.data,
false,
)?;
jsonutils::patch_ext::add(
&format!("{}/metadata/annotations", pod_spec_template_path),
ORIG_NAMESPACE_ANNOTATION_KEY,
&json!(original_ns),
&mut vobj.data,
true,
)?;

Check warning on line 65 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L52-L65

Added lines #L52 - L65 were not covered by tests
jsonutils::patch_ext::remove("", "status", &mut vobj.data)?;
klabel_insert!(vobj, VIRTUAL_LABEL_KEY = "true");

add_common_metadata(&ctx.name, owner, &mut vobj.metadata)?;

Ok(vobj)
}

fn prefixed_ns(prefix: &str, obj: &impl Resource) -> String {
format!("{}-{}", prefix, obj.namespace().unwrap())
}

pub struct TraceRunner {
ctx: DriverContext,
client: kube::Client,
Expand All @@ -84,18 +94,27 @@ impl TraceRunner {
// this will panic/fail if that is not true.
for obj in &evt.applied_objs {
let gvk = GVK::from_dynamic_obj(&obj)?;
let vns_name = prefixed_ns(&self.ctx.virtual_ns_prefix, obj);
let vobj = build_virtual_obj(&self.ctx, &self.root, &vns_name, obj)?;
let original_ns = obj.namespace().unwrap();
let virtual_ns = format!("{}-{}", self.ctx.virtual_ns_prefix, original_ns);

Check warning on line 98 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L97-L98

Added lines #L97 - L98 were not covered by tests

if ns_api.get_opt(&vns_name).await?.is_none() {
info!("creating virtual namespace: {vns_name}");
let vns = build_virtual_ns(&self.ctx, &self.root, &vns_name)?;
if ns_api.get_opt(&virtual_ns).await?.is_none() {
info!("creating virtual namespace: {virtual_ns}");
let vns = build_virtual_ns(&self.ctx, &self.root, &virtual_ns)?;

Check warning on line 102 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L100-L102

Added lines #L100 - L102 were not covered by tests
ns_api.create(&Default::default(), &vns).await?;
}

let pod_spec_template_path = self
.ctx
.store
.config()
.pod_spec_template_path(&gvk)
.ok_or(anyhow!("unknown simulated object: {:?}", gvk))?;
let vobj =
build_virtual_obj(&self.ctx, &self.root, &original_ns, &virtual_ns, obj, pod_spec_template_path)?;

Check warning on line 113 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L106-L113

Added lines #L106 - L113 were not covered by tests

info!("applying object {}", vobj.namespaced_name());
apiset
.namespaced_api_for(&gvk, vns_name)
.namespaced_api_for(&gvk, virtual_ns)

Check warning on line 117 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L117

Added line #L117 was not covered by tests
.await?
.patch(&vobj.name_any(), &PatchParams::apply("simkube"), &Patch::Apply(&vobj))
.await?;
Expand All @@ -104,9 +123,9 @@ impl TraceRunner {
for obj in &evt.deleted_objs {
info!("deleting object {}", obj.namespaced_name());
let gvk = GVK::from_dynamic_obj(obj)?;
let vns_name = prefixed_ns(&self.ctx.virtual_ns_prefix, obj);
let virtual_ns = format!("{}-{}", self.ctx.virtual_ns_prefix, obj.namespace().unwrap());

Check warning on line 126 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L126

Added line #L126 was not covered by tests
apiset
.namespaced_api_for(&gvk, vns_name)
.namespaced_api_for(&gvk, virtual_ns)

Check warning on line 128 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L128

Added line #L128 was not covered by tests
.await?
.delete(&obj.name_any(), &Default::default())
.await?;
Expand Down
5 changes: 4 additions & 1 deletion driver/tests/mutation_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use kube::core::{
use kube::ResourceExt;
use rocket::serde::json::Json;
use simkube::testutils::fake::make_fake_apiserver;
use simkube::testutils::test_pod;
use simkube::testutils::*;
use tracing_test::traced_test;

use super::*;
Expand Down Expand Up @@ -124,6 +124,9 @@ async fn test_mutate_pod_not_owned_by_sim(mut test_pod: corev1::Pod, mut adm_res
#[rstest]
#[tokio::test]
async fn test_mutate_pod(mut test_pod: corev1::Pod, mut adm_resp: AdmissionResponse) {
test_pod
.annotations_mut()
.insert(ORIG_NAMESPACE_ANNOTATION_KEY.into(), TEST_NAMESPACE.into());
let owner = metav1::OwnerReference {
name: TEST_SIM_ROOT_NAME.into(),
..Default::default()
Expand Down
1 change: 0 additions & 1 deletion k8s/sk_ctrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def __init__(self, scope: Construct, namespace: str):
args=[
"/sk-ctrl",
"--driver-image", driver_image,
"--sim-svc-account", "$POD_SVC_ACCOUNT",
"--use-cert-manager",
"--cert-manager-issuer", "selfsigned",
],
Expand Down
4 changes: 2 additions & 2 deletions k8s/sk_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
TRACER_CONFIG_YML = """---
trackedObjects:
apps/v1.Deployment:
podSpecPath: /spec/template/spec
podSpecTemplatePath: /spec/template
batch.volcano.sh/v1alpha1.Job:
podSpecPath: /spec/tasks/*/template/spec
podSpecTemplatePath: /spec/tasks/*/template
"""
CONFIGMAP_NAME = "tracer-config"

Expand Down
6 changes: 5 additions & 1 deletion lib/rust/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::k8s::GVK;
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TrackedObjectConfig {
pub pod_spec_path: String,
pub pod_spec_template_path: String,
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
Expand All @@ -24,4 +24,8 @@ impl TracerConfig {
pub fn load(filename: &str) -> anyhow::Result<TracerConfig> {
Ok(serde_yaml::from_reader(File::open(filename)?)?)
}

pub fn pod_spec_template_path(&self, gvk: &GVK) -> Option<&str> {
Some(&self.tracked_objects.get(gvk)?.pod_spec_template_path)
}

Check warning on line 30 in lib/rust/config.rs

View check run for this annotation

Codecov / codecov/patch

lib/rust/config.rs#L28-L30

Added lines #L28 - L30 were not covered by tests
}
2 changes: 2 additions & 0 deletions lib/rust/constants.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub const DRIVER_ADMISSION_WEBHOOK_PORT: &str = "8888";
pub const LIFETIME_ANNOTATION_KEY: &str = "simkube.io/lifetime-seconds";
pub const ORIG_NAMESPACE_ANNOTATION_KEY: &str = "simkube.io/original-namespace";
pub const SIMULATION_LABEL_KEY: &str = "simkube.io/simulation";
pub const VIRTUAL_LABEL_KEY: &str = "simkube.io/virtual";
pub const VIRTUAL_NODE_TOLERATION_KEY: &str = "simkube.io/virtual-node";
1 change: 1 addition & 0 deletions lib/rust/k8s/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub trait KubeResourceExt {

pub trait PodExt {
fn spec(&self) -> anyhow::Result<&corev1::PodSpec>;
fn stable_spec(&self) -> anyhow::Result<corev1::PodSpec>;
fn status(&self) -> anyhow::Result<&corev1::PodStatus>;
}

Expand Down
4 changes: 2 additions & 2 deletions lib/rust/k8s/owners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ impl OwnersCache {
obj: &(impl Resource + Sync),
) -> anyhow::Result<Vec<metav1::OwnerReference>> {
let ns_name = obj.namespaced_name();
info!("computing owner references for {ns_name}");
debug!("computing owner references for {ns_name}");

if let Some(owners) = self.owners.get(&ns_name) {
info!("found owners for {ns_name} in cache");
debug!("found owners for {ns_name} in cache");
return Ok(owners.clone());
}

Expand Down
Loading

0 comments on commit db39f4c

Please sign in to comment.