Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply lifecycle annotations (wip) #30

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ path = "lib/rust/lib.rs"

[dependencies]
# These are actually dev dependencies of the testutils feature
http = { version = "0.2.9", optional = true }
httpmock = { version = "0.6.8", optional = true }
mockall = { version = "0.11.4", optional = true }
rstest = { version = "0.18.2", optional = true }
http = { version = "0.2.9", optional = true }

anyhow = { version = "1.0.75", features = ["backtrace"] }
async-recursion = "1.0.5"
Expand Down Expand Up @@ -61,8 +62,7 @@ default-features = false
[dev-dependencies]
assertables = "7.0.1"
hyper = "0.14.27"
mockall = "0.11.4"
tracing-test = "0.2.4"

[features]
testutils = ["dep:http", "dep:httpmock", "dep:rstest"]
testutils = ["dep:http", "dep:httpmock", "dep:mockall", "dep:rstest"]
3 changes: 0 additions & 3 deletions ctrl/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ struct Options {
#[arg(long, default_value = DRIVER_ADMISSION_WEBHOOK_PORT)]
driver_port: i32,

#[arg(long)]
sim_svc_account: String,

// TODO: should support non-cert-manager for configuring certs as well
#[arg(long)]
use_cert_manager: bool,
Expand Down
11 changes: 10 additions & 1 deletion ctrl/objects.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::env;

use k8s_openapi::api::admissionregistration::v1 as admissionv1;
use k8s_openapi::api::batch::v1 as batchv1;
use k8s_openapi::api::core::v1 as corev1;
Expand Down Expand Up @@ -102,6 +104,8 @@
};
let (cert_vm, cert_volume, cert_mount_path) = build_certificate_volumes(cert_secret_name);

let service_account = Some(env::var("POD_SVC_ACCOUNT")?);

Check warning on line 107 in ctrl/objects.rs

View check run for this annotation

Codecov / codecov/patch

ctrl/objects.rs#L107

Added line #L107 was not covered by tests

Ok(batchv1::Job {
metadata: build_object_meta(&ctx.driver_ns, &ctx.driver_name, &ctx.name, owner)?,
spec: Some(batchv1::JobSpec {
Expand All @@ -113,12 +117,17 @@
command: Some(vec!["/sk-driver".into()]),
args: Some(build_driver_args(ctx, cert_mount_path, trace_mount_path)),
image: Some(ctx.opts.driver_image.clone()),
env: Some(vec![corev1::EnvVar {
name: "RUST_BACKTRACE".into(),
value: Some("1".into()),
..Default::default()
}]),

Check warning on line 124 in ctrl/objects.rs

View check run for this annotation

Codecov / codecov/patch

ctrl/objects.rs#L120-L124

Added lines #L120 - L124 were not covered by tests
volume_mounts: Some(vec![trace_vm, cert_vm]),
..Default::default()
}],
restart_policy: Some("Never".into()),
volumes: Some(vec![trace_volume, cert_volume]),
service_account: Some(ctx.opts.sim_svc_account.clone()),
service_account,

Check warning on line 130 in ctrl/objects.rs

View check run for this annotation

Codecov / codecov/patch

ctrl/objects.rs#L130

Added line #L130 was not covered by tests
..Default::default()
}),
..Default::default()
Expand Down
18 changes: 14 additions & 4 deletions driver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
OwnersCache,
};
use simkube::prelude::*;
use simkube::store::TraceStore;
use simkube::store::{
TraceStorable,
TraceStore,
};
use tokio::sync::Mutex;
use tokio::time::sleep;
use tracing::*;
Expand Down Expand Up @@ -56,7 +59,7 @@
sim_root: String,
virtual_ns_prefix: String,
owners_cache: Arc<Mutex<OwnersCache>>,
store: Arc<TraceStore>,
store: Arc<dyn TraceStorable + Send + Sync>,
}

async fn run(opts: Options) -> EmptyResult {
Expand Down Expand Up @@ -88,14 +91,21 @@

let server_task = tokio::spawn(server.launch());

// Give the mutation handler a bit of time to come online before starting the sim:w
// Give the mutation handler a bit of time to come online before starting the sim

Check warning on line 94 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L94

Added line #L94 was not covered by tests
sleep(Duration::from_secs(5)).await;

let runner = TraceRunner::new(ctx.clone()).await?;

tokio::select! {
_ = server_task => warn!("server terminated"),
res = tokio::spawn(runner.run()) => info!("simulation runner completed: {res:?}"),
res = tokio::spawn(runner.run()) => {

Check warning on line 101 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L101

Added line #L101 was not covered by tests
let flattened_res = match res {
Ok(r) => r,
Err(err) => Err(err.into()),
};

info!("simulation runner completed: {flattened_res:?}");

Check warning on line 107 in driver/main.rs

View check run for this annotation

Codecov / codecov/patch

driver/main.rs#L107

Added line #L107 was not covered by tests
},
};

Ok(())
Expand Down
83 changes: 61 additions & 22 deletions driver/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
PatchOperation,
};
use k8s_openapi::api::core::v1 as corev1;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as metav1;
use kube::core::admission::{
AdmissionRequest,
AdmissionResponse,
AdmissionReview,
};
use kube::ResourceExt;
use rocket::serde::json::Json;
use serde_json::{
json,
Expand Down Expand Up @@ -55,44 +57,81 @@
// TODO when we get the pod object, the final name hasn't been filled in yet; make sure this
// doesn't cause any problems
pub(super) async fn mutate_pod(
ctx: &rocket::State<DriverContext>,
ctx: &DriverContext,
resp: AdmissionResponse,
pod: &corev1::Pod,
) -> anyhow::Result<AdmissionResponse> {
{
// enclose in a block so we release the mutex when we're done
// enclose in a block so we release the mutex when we're done
let owners = {
let mut owners_cache = ctx.owners_cache.lock().await;
let owners = owners_cache.compute_owner_chain(pod).await?;
owners_cache.compute_owner_chain(pod).await?
};

if owners.iter().all(|o| o.name != ctx.sim_root) {
return Ok(resp);
}
if !owners.iter().any(|o| o.name == ctx.sim_root) {
return Ok(resp);
}

let mut patches = vec![];
add_simulation_labels(ctx, pod, &mut patches)?;
add_lifecycle_annotation(ctx, pod, &owners, &mut patches)?;
add_node_selector_tolerations(pod, &mut patches)?;

Ok(resp.with_patch(Patch(patches))?)
}

fn add_simulation_labels(ctx: &DriverContext, pod: &corev1::Pod, patches: &mut Vec<PatchOperation>) -> EmptyResult {
if pod.metadata.labels.is_none() {
patches.push(PatchOperation::Add(AddOperation { path: "/metadata/labels".into(), value: json!({}) }));
}
patches.push(PatchOperation::Add(AddOperation {
path: format!("/metadata/labels/{}", jsonutils::escape(SIMULATION_LABEL_KEY)),
value: Value::String(ctx.name.clone()),
}));

Ok(())
}

fn add_lifecycle_annotation(
ctx: &DriverContext,
pod: &corev1::Pod,
owners: &Vec<metav1::OwnerReference>,
patches: &mut Vec<PatchOperation>,
) -> EmptyResult {
if let Some(orig_ns) = pod.annotations().get(ORIG_NAMESPACE_ANNOTATION_KEY) {
for owner in owners {
let owner_ns_name = format!("{}/{}", orig_ns, owner.name);
let lifecycle = ctx.store.lookup_pod_lifecycle(pod, &owner_ns_name, 0)?;
if let Some(patch) = lifecycle.to_annotation_patch() {
if pod.metadata.annotations.is_none() {
patches.push(PatchOperation::Add(AddOperation {
path: "/metadata/annotations".into(),
value: json!({}),
}));

Check warning on line 109 in driver/mutation.rs

View check run for this annotation

Codecov / codecov/patch

driver/mutation.rs#L106-L109

Added lines #L106 - L109 were not covered by tests
}
patches.push(patch);
break;
}
}
}

warn!("no pod lifecycle data found for {}", pod.namespaced_name());
Ok(())
}

fn add_node_selector_tolerations(pod: &corev1::Pod, patches: &mut Vec<PatchOperation>) -> EmptyResult {
if pod.spec()?.tolerations.is_none() {
patches.push(PatchOperation::Add(AddOperation { path: "/spec/tolerations".into(), value: json!([]) }));
}
patches.push(PatchOperation::Add(AddOperation {
path: "/spec/nodeSelector".into(),
value: json!({"type": "virtual"}),
}));
patches.push(PatchOperation::Add(AddOperation {
path: "/spec/tolerations/-".into(),
value: json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "value": "true"}),
}));

patches.extend(vec![
PatchOperation::Add(AddOperation {
path: format!("/metadata/labels/{}", jsonutils::escape(SIMULATION_LABEL_KEY)),
value: Value::String(ctx.name.clone()),
}),
PatchOperation::Add(AddOperation {
path: "/spec/nodeSelector".into(),
value: json!({"type": "virtual"}),
}),
PatchOperation::Add(AddOperation {
path: "/spec/tolerations/-".into(),
value: json!({"key": VIRTUAL_NODE_TOLERATION_KEY, "value": "true"}),
}),
]);
Ok(resp.with_patch(Patch(patches))?)
Ok(())
}

// Have to duplicate this fn because AdmissionResponse::into_review uses the dynamic API
Expand Down
59 changes: 39 additions & 20 deletions driver/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
Patch,
PatchParams,
};
use kube::{
Resource,
ResourceExt,
};
use kube::ResourceExt;
use serde_json::json;
use simkube::jsonutils;
use simkube::k8s::{
add_common_metadata,
Expand Down Expand Up @@ -41,24 +39,36 @@
fn build_virtual_obj(
ctx: &DriverContext,
owner: &SimulationRoot,
namespace: &str,
original_ns: &str,
virtual_ns: &str,

Check warning on line 43 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L42-L43

Added lines #L42 - L43 were not covered by tests
obj: &DynamicObject,
pod_spec_template_path: &str,

Check warning on line 45 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L45

Added line #L45 was not covered by tests
) -> anyhow::Result<DynamicObject> {
let mut vobj = obj.clone();
add_common_metadata(&ctx.name, owner, &mut vobj.metadata)?;
vobj.metadata.namespace = Some(virtual_ns.into());
klabel_insert!(vobj, VIRTUAL_LABEL_KEY = "true");

Check warning on line 50 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L48-L50

Added lines #L48 - L50 were not covered by tests

vobj.metadata.namespace = Some(namespace.into());
jsonutils::patch_ext::add(
&format!("{}/metadata", pod_spec_template_path),
"annotations",
&json!({}),
&mut vobj.data,
false,
)?;
jsonutils::patch_ext::add(
&format!("{}/metadata/annotations", pod_spec_template_path),
ORIG_NAMESPACE_ANNOTATION_KEY,
&json!(original_ns),
&mut vobj.data,
true,
)?;

Check warning on line 65 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L52-L65

Added lines #L52 - L65 were not covered by tests
jsonutils::patch_ext::remove("", "status", &mut vobj.data)?;
klabel_insert!(vobj, VIRTUAL_LABEL_KEY = "true");

add_common_metadata(&ctx.name, owner, &mut vobj.metadata)?;

Ok(vobj)
}

fn prefixed_ns(prefix: &str, obj: &impl Resource) -> String {
format!("{}-{}", prefix, obj.namespace().unwrap())
}

pub struct TraceRunner {
ctx: DriverContext,
client: kube::Client,
Expand All @@ -84,18 +94,27 @@
// this will panic/fail if that is not true.
for obj in &evt.applied_objs {
let gvk = GVK::from_dynamic_obj(&obj)?;
let vns_name = prefixed_ns(&self.ctx.virtual_ns_prefix, obj);
let vobj = build_virtual_obj(&self.ctx, &self.root, &vns_name, obj)?;
let original_ns = obj.namespace().unwrap();
let virtual_ns = format!("{}-{}", self.ctx.virtual_ns_prefix, original_ns);

Check warning on line 98 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L97-L98

Added lines #L97 - L98 were not covered by tests

if ns_api.get_opt(&vns_name).await?.is_none() {
info!("creating virtual namespace: {vns_name}");
let vns = build_virtual_ns(&self.ctx, &self.root, &vns_name)?;
if ns_api.get_opt(&virtual_ns).await?.is_none() {
info!("creating virtual namespace: {virtual_ns}");
let vns = build_virtual_ns(&self.ctx, &self.root, &virtual_ns)?;

Check warning on line 102 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L100-L102

Added lines #L100 - L102 were not covered by tests
ns_api.create(&Default::default(), &vns).await?;
}

let pod_spec_template_path = self
.ctx
.store
.config()
.pod_spec_template_path(&gvk)
.ok_or(anyhow!("unknown simulated object: {:?}", gvk))?;
let vobj =
build_virtual_obj(&self.ctx, &self.root, &original_ns, &virtual_ns, obj, pod_spec_template_path)?;

Check warning on line 113 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L106-L113

Added lines #L106 - L113 were not covered by tests

info!("applying object {}", vobj.namespaced_name());
apiset
.namespaced_api_for(&gvk, vns_name)
.namespaced_api_for(&gvk, virtual_ns)

Check warning on line 117 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L117

Added line #L117 was not covered by tests
.await?
.patch(&vobj.name_any(), &PatchParams::apply("simkube"), &Patch::Apply(&vobj))
.await?;
Expand All @@ -104,9 +123,9 @@
for obj in &evt.deleted_objs {
info!("deleting object {}", obj.namespaced_name());
let gvk = GVK::from_dynamic_obj(obj)?;
let vns_name = prefixed_ns(&self.ctx.virtual_ns_prefix, obj);
let virtual_ns = format!("{}-{}", self.ctx.virtual_ns_prefix, obj.namespace().unwrap());

Check warning on line 126 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L126

Added line #L126 was not covered by tests
apiset
.namespaced_api_for(&gvk, vns_name)
.namespaced_api_for(&gvk, virtual_ns)

Check warning on line 128 in driver/runner.rs

View check run for this annotation

Codecov / codecov/patch

driver/runner.rs#L128

Added line #L128 was not covered by tests
.await?
.delete(&obj.name_any(), &Default::default())
.await?;
Expand Down
Loading
Loading