Skip to content

Commit

Permalink
apply lifecycle annotations (wip)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmorr0 committed Oct 18, 2023
1 parent deb301b commit ca08ce1
Show file tree
Hide file tree
Showing 28 changed files with 377 additions and 186 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ path = "lib/rust/lib.rs"

[dependencies]
# These are actually dev dependencies of the testutils feature
http = { version = "0.2.9", optional = true }
httpmock = { version = "0.6.8", optional = true }
mockall = { version = "0.11.4", optional = true }
rstest = { version = "0.18.2", optional = true }
http = { version = "0.2.9", optional = true }

anyhow = { version = "1.0.75", features = ["backtrace"] }
async-recursion = "1.0.5"
Expand Down Expand Up @@ -61,8 +62,7 @@ default-features = false
[dev-dependencies]
assertables = "7.0.1"
hyper = "0.14.27"
mockall = "0.11.4"
tracing-test = "0.2.4"

[features]
testutils = ["dep:http", "dep:httpmock", "dep:rstest"]
testutils = ["dep:http", "dep:httpmock", "dep:mockall", "dep:rstest"]
3 changes: 0 additions & 3 deletions ctrl/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ struct Options {
#[arg(long, default_value = DRIVER_ADMISSION_WEBHOOK_PORT)]
driver_port: i32,

#[arg(long)]
sim_svc_account: String,

// TODO: should support non-cert-manager for configuring certs as well
#[arg(long)]
use_cert_manager: bool,
Expand Down
11 changes: 10 additions & 1 deletion ctrl/objects.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::env;

use k8s_openapi::api::admissionregistration::v1 as admissionv1;
use k8s_openapi::api::batch::v1 as batchv1;
use k8s_openapi::api::core::v1 as corev1;
Expand Down Expand Up @@ -102,6 +104,8 @@ pub(super) fn build_driver_job(
};
let (cert_vm, cert_volume, cert_mount_path) = build_certificate_volumes(cert_secret_name);

let service_account = Some(env::var("POD_SVC_ACCOUNT")?);

Ok(batchv1::Job {
metadata: build_object_meta(&ctx.driver_ns, &ctx.driver_name, &ctx.name, owner)?,
spec: Some(batchv1::JobSpec {
Expand All @@ -113,12 +117,17 @@ pub(super) fn build_driver_job(
command: Some(vec!["/sk-driver".into()]),
args: Some(build_driver_args(ctx, cert_mount_path, trace_mount_path)),
image: Some(ctx.opts.driver_image.clone()),
env: Some(vec![corev1::EnvVar {
name: "RUST_BACKTRACE".into(),
value: Some("1".into()),
..Default::default()
}]),
volume_mounts: Some(vec![trace_vm, cert_vm]),
..Default::default()
}],
restart_policy: Some("Never".into()),
volumes: Some(vec![trace_volume, cert_volume]),
service_account: Some(ctx.opts.sim_svc_account.clone()),
service_account,
..Default::default()
}),
..Default::default()
Expand Down
18 changes: 14 additions & 4 deletions driver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use simkube::k8s::{
OwnersCache,
};
use simkube::prelude::*;
use simkube::store::TraceStore;
use simkube::store::{
TraceStorable,
TraceStore,
};
use tokio::sync::Mutex;
use tokio::time::sleep;
use tracing::*;
Expand Down Expand Up @@ -56,7 +59,7 @@ pub struct DriverContext {
sim_root: String,
virtual_ns_prefix: String,
owners_cache: Arc<Mutex<OwnersCache>>,
store: Arc<TraceStore>,
store: Arc<dyn TraceStorable + Send + Sync>,
}

async fn run(opts: Options) -> EmptyResult {
Expand Down Expand Up @@ -88,14 +91,21 @@ async fn run(opts: Options) -> EmptyResult {

let server_task = tokio::spawn(server.launch());

// Give the mutation handler a bit of time to come online before starting the sim:w
// Give the mutation handler a bit of time to come online before starting the sim
sleep(Duration::from_secs(5)).await;

let runner = TraceRunner::new(ctx.clone()).await?;

tokio::select! {
_ = server_task => warn!("server terminated"),
res = tokio::spawn(runner.run()) => info!("simulation runner completed: {res:?}"),
res = tokio::spawn(runner.run()) => {
let flattened_res = match res {
Ok(r) => r,
Err(err) => Err(err.into()),
};

info!("simulation runner completed: {flattened_res:?}");
},
};

Ok(())
Expand Down
83 changes: 61 additions & 22 deletions driver/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use json_patch::{
PatchOperation,
};
use k8s_openapi::api::core::v1 as corev1;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1;
use kube::core::admission::{
AdmissionRequest,
AdmissionResponse,
AdmissionReview,
};
use kube::ResourceExt;
use rocket::serde::json::Json;
use serde_json::{
json,
Expand Down Expand Up @@ -55,44 +57,81 @@ pub async fn handler(
// TODO when we get the pod object, the final name hasn't been filled in yet; make sure this
// doesn't cause any problems
pub(super) async fn mutate_pod(
ctx: &rocket::State<DriverContext>,
ctx: &DriverContext,
resp: AdmissionResponse,
pod: &corev1::Pod,
) -> anyhow::Result<AdmissionResponse> {
{
// enclose in a block so we release the mutex when we're done
// enclose in a block so we release the mutex when we're done
let owners = {
let mut owners_cache = ctx.owners_cache.lock().await;
let owners = owners_cache.compute_owner_chain(pod).await?;
owners_cache.compute_owner_chain(pod).await?
};

if owners.iter().all(|o| o.name != ctx.sim_root) {
return Ok(resp);
}
if !owners.iter().any(|o| o.name == ctx.sim_root) {
return Ok(resp);
}

let mut patches = vec![];
add_simulation_labels(ctx, pod, &mut patches)?;
add_lifecycle_annotation(ctx, pod, &owners, &mut patches)?;
add_node_selector_tolerations(pod, &mut patches)?;

Ok(resp.with_patch(Patch(patches))?)
}

fn add_simulation_labels(ctx: &DriverContext, pod: &corev1::Pod, patches: &mut Vec<PatchOperation>) -> EmptyResult {
if pod.metadata.labels.is_none() {
patches.push(PatchOperation::Add(AddOperation { path: "/metadata/labels".into(), value: json!({}) }));
}
patches.push(PatchOperation::Add(AddOperation {
path: format!("/metadata/labels/{}", jsonutils::escape(SIMULATION_LABEL_KEY)),
value: Value::String(ctx.name.clone()),
}));

Ok(())
}

fn add_lifecycle_annotation(
ctx: &DriverContext,
pod: &corev1::Pod,
owners: &Vec<metav1::OwnerReference>,
patches: &mut Vec<PatchOperation>,
) -> EmptyResult {
if let Some(orig_ns) = pod.annotations().get(ORIG_NAMESPACE_ANNOTATION_KEY) {
for owner in owners {
let owner_ns_name = format!("{}/{}", orig_ns, owner.name);
let lifecycle = ctx.store.lookup_pod_lifecycle(pod, &owner_ns_name, 0)?;
if let Some(patch) = lifecycle.to_annotation_patch() {
if pod.metadata.annotations.is_none() {
patches.push(PatchOperation::Add(AddOperation {
path: "/metadata/annotations".into(),
value: json!({}),
}));
}
patches.push(patch);
break;
}
}
}

warn!("no pod lifecycle data found for {}", pod.namespaced_name());
Ok(())
}

fn add_node_selector_tolerations(pod: &corev1::Pod, patches: &mut Vec<PatchOperation>) -> EmptyResult {
if pod.spec()?.tolerations.is_none() {
patches.push(PatchOperation::Add(AddOperation { path: "/spec/tolerations".into(), value: json!([]) }));
}
patches.push(PatchOperation::Add(AddOperation {
path: "/spec/nodeSelector".into(),
value: json!({"type": "virtual"}),
}));
patches.push(PatchOperation::Add(AddOperation {
path: "/spec/tolerations/-".into(),
value: json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "value": "true"}),
}));

patches.extend(vec![
PatchOperation::Add(AddOperation {
path: format!("/metadata/labels/{}", jsonutils::escape(SIMULATION_LABEL_KEY)),
value: Value::String(ctx.name.clone()),
}),
PatchOperation::Add(AddOperation {
path: "/spec/nodeSelector".into(),
value: json!({"type": "virtual"}),
}),
PatchOperation::Add(AddOperation {
path: "/spec/tolerations/-".into(),
value: json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "value": "true"}),
}),
]);
Ok(resp.with_patch(Patch(patches))?)
Ok(())
}

// Have to duplicate this fn because AdmissionResponse::into_review uses the dynamic API
Expand Down
59 changes: 39 additions & 20 deletions driver/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ use kube::api::{
Patch,
PatchParams,
};
use kube::{
Resource,
ResourceExt,
};
use kube::ResourceExt;
use serde_json::json;
use simkube::jsonutils;
use simkube::k8s::{
add_common_metadata,
Expand Down Expand Up @@ -41,24 +39,36 @@ fn build_virtual_ns(ctx: &DriverContext, owner: &SimulationRoot, namespace: &str
fn build_virtual_obj(
ctx: &DriverContext,
owner: &SimulationRoot,
namespace: &str,
original_ns: &str,
virtual_ns: &str,
obj: &DynamicObject,
pod_spec_template_path: &str,
) -> anyhow::Result<DynamicObject> {
let mut vobj = obj.clone();
add_common_metadata(&ctx.name, owner, &mut vobj.metadata)?;
vobj.metadata.namespace = Some(virtual_ns.into());
klabel_insert!(vobj, VIRTUAL_LABEL_KEY = "true");

vobj.metadata.namespace = Some(namespace.into());
jsonutils::patch_ext::add(
&format!("{}/metadata", pod_spec_template_path),
"annotations",
&json!({}),
&mut vobj.data,
false,
)?;
jsonutils::patch_ext::add(
&format!("{}/metadata/annotations", pod_spec_template_path),
ORIG_NAMESPACE_ANNOTATION_KEY,
&json!(original_ns),
&mut vobj.data,
true,
)?;
jsonutils::patch_ext::remove("", "status", &mut vobj.data)?;
klabel_insert!(vobj, VIRTUAL_LABEL_KEY = "true");

add_common_metadata(&ctx.name, owner, &mut vobj.metadata)?;

Ok(vobj)
}

fn prefixed_ns(prefix: &str, obj: &impl Resource) -> String {
format!("{}-{}", prefix, obj.namespace().unwrap())
}

pub struct TraceRunner {
ctx: DriverContext,
client: kube::Client,
Expand All @@ -84,18 +94,27 @@ impl TraceRunner {
// this will panic/fail if that is not true.
for obj in &evt.applied_objs {
let gvk = GVK::from_dynamic_obj(&obj)?;
let vns_name = prefixed_ns(&self.ctx.virtual_ns_prefix, obj);
let vobj = build_virtual_obj(&self.ctx, &self.root, &vns_name, obj)?;
let original_ns = obj.namespace().unwrap();
let virtual_ns = format!("{}-{}", self.ctx.virtual_ns_prefix, original_ns);

if ns_api.get_opt(&vns_name).await?.is_none() {
info!("creating virtual namespace: {vns_name}");
let vns = build_virtual_ns(&self.ctx, &self.root, &vns_name)?;
if ns_api.get_opt(&virtual_ns).await?.is_none() {
info!("creating virtual namespace: {virtual_ns}");
let vns = build_virtual_ns(&self.ctx, &self.root, &virtual_ns)?;
ns_api.create(&Default::default(), &vns).await?;
}

let pod_spec_template_path = self
.ctx
.store
.config()
.pod_spec_template_path(&gvk)
.ok_or(anyhow!("unknown simulated object: {:?}", gvk))?;
let vobj =
build_virtual_obj(&self.ctx, &self.root, &original_ns, &virtual_ns, obj, pod_spec_template_path)?;

info!("applying object {}", vobj.namespaced_name());
apiset
.namespaced_api_for(&gvk, vns_name)
.namespaced_api_for(&gvk, virtual_ns)
.await?
.patch(&vobj.name_any(), &PatchParams::apply("simkube"), &Patch::Apply(&vobj))
.await?;
Expand All @@ -104,9 +123,9 @@ impl TraceRunner {
for obj in &evt.deleted_objs {
info!("deleting object {}", obj.namespaced_name());
let gvk = GVK::from_dynamic_obj(obj)?;
let vns_name = prefixed_ns(&self.ctx.virtual_ns_prefix, obj);
let virtual_ns = format!("{}-{}", self.ctx.virtual_ns_prefix, obj.namespace().unwrap());
apiset
.namespaced_api_for(&gvk, vns_name)
.namespaced_api_for(&gvk, virtual_ns)
.await?
.delete(&obj.name_any(), &Default::default())
.await?;
Expand Down
Loading

0 comments on commit ca08ce1

Please sign in to comment.