From 0ec241e6410d567f6d6d7d8342c0a3d125a33dd3 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Fri, 3 May 2024 18:52:40 +0100 Subject: [PATCH] Canary pods need to be created via Jobs For the pia canary to work well, the pod creation needs to be on the same apiserver instance that the controller manager uses. I really hope that controller manager requests are not load balanced across all 3. We can use a Job object so that the controller manager makes the pods. --- .../restate-operator-helm/templates/rbac.yaml | 3 +- src/controller.rs | 15 +- src/reconcilers/compute.rs | 155 ++++++++++++++---- 3 files changed, 133 insertions(+), 40 deletions(-) diff --git a/charts/restate-operator-helm/templates/rbac.yaml b/charts/restate-operator-helm/templates/rbac.yaml index 837c3ab..d174e87 100644 --- a/charts/restate-operator-helm/templates/rbac.yaml +++ b/charts/restate-operator-helm/templates/rbac.yaml @@ -47,6 +47,7 @@ rules: - statefulsets - persistentvolumeclaims - pods + - jobs - securitygrouppolicies - secretproviderclasses verbs: @@ -64,7 +65,7 @@ rules: - resources: - statefulsets - networkpolicies - - pods + - jobs - securitygrouppolicies - secretproviderclasses verbs: diff --git a/src/controller.rs b/src/controller.rs index 128ea8e..b34e8f3 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -8,13 +8,15 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use futures::StreamExt; use k8s_openapi::api::apps::v1::StatefulSet; +use k8s_openapi::api::batch::v1::Job; use k8s_openapi::api::core::v1::{ - EnvVar, Namespace, PersistentVolumeClaim, Pod, PodDNSConfig, ResourceRequirements, Service, + EnvVar, Namespace, PersistentVolumeClaim, PodDNSConfig, ResourceRequirements, Service, ServiceAccount, }; use k8s_openapi::api::networking::v1; use k8s_openapi::api::networking::v1::{NetworkPolicy, NetworkPolicyPeer, NetworkPolicyPort}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::{APIGroup, ObjectMeta}; + use kube::core::object::HasStatus; use kube::core::PartialObjectMeta; use kube::runtime::reflector::{ObjectRef, Store}; @@ -417,7 +419,7 @@ async fn reconcile(rc: Arc, ctx: Arc) -> Result } } -fn error_policy(_rc: Arc, _error: &Error, _ctx: Arc) -> Action { +fn error_policy(_rc: Arc, _error: &Error, _ctx: C) -> Action { Action::requeue(Duration::from_secs(30)) } @@ -724,7 +726,7 @@ pub async fn run(state: State) { let svcacc_api = Api::::all(client.clone()); let np_api = Api::::all(client.clone()); let pia_api = Api::::all(client.clone()); - let pod_api = Api::::all(client.clone()); + let job_api = Api::::all(client.clone()); let sgp_api = Api::::all(client.clone()); let spc_api = Api::::all(client.clone()); @@ -785,9 +787,10 @@ pub async fn run(state: State) { // avoid apply loops that seem to happen with crds .predicate_filter(changed_predicate.combine(status_predicate)); - controller - .owns_stream(pia_watcher) - .owns(pod_api, cfg.clone()) + controller.owns_stream(pia_watcher).owns( + job_api, + Config::default().labels("app.kubernetes.io/name=restate-pia-canary"), + ) } else { controller }; diff --git a/src/reconcilers/compute.rs b/src/reconcilers/compute.rs index 0547105..b154c51 100644 --- a/src/reconcilers/compute.rs +++ b/src/reconcilers/compute.rs @@ -1,9 +1,9 @@ use std::collections::{BTreeMap, HashSet}; use std::convert::Into; use std::path::PathBuf; -use std::time::Duration; use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetStatus}; +use k8s_openapi::api::batch::v1::{Job, JobSpec}; use k8s_openapi::api::core::v1::{ Container, ContainerPort, EnvVar, HTTPGetAction, PersistentVolumeClaim, PersistentVolumeClaimSpec, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, Probe, @@ -13,14 +13,14 @@ use k8s_openapi::api::core::v1::{ use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; -use kube::api::{DeleteParams, Preconditions, PropagationPolicy}; +use kube::api::{DeleteParams, ListParams, Preconditions, PropagationPolicy}; use kube::core::PartialObjectMeta; use kube::runtime::reflector::{ObjectRef, Store}; use kube::{ api::{Patch, PatchParams}, Api, ResourceExt, }; -use tracing::{debug, warn}; +use tracing::{debug, error, warn}; use crate::podidentityassociations::{PodIdentityAssociation, PodIdentityAssociationSpec}; use crate::reconcilers::{label_selector, mandatory_labels, object_meta}; @@ -342,6 +342,7 @@ pub async fn reconcile_compute( let svc_api: Api = Api::namespaced(ctx.client.clone(), namespace); let svcacc_api: Api = Api::namespaced(ctx.client.clone(), namespace); let pia_api: Api = Api::namespaced(ctx.client.clone(), namespace); + let job_api: Api = Api::namespaced(ctx.client.clone(), namespace); let pod_api: Api = Api::namespaced(ctx.client.clone(), namespace); let sgp_api: Api = Api::namespaced(ctx.client.clone(), namespace); @@ -385,9 +386,7 @@ pub async fn reconcile_compute( return Err(Error::NotReady { reason: "PodIdentityAssociationNotSynced".into(), message: "Waiting for the AWS ACK controller to provision the Pod Identity Association with IAM".into(), requeue_after: None }); } - if !check_pia(namespace, base_metadata, &pod_api).await? { - return Err(Error::NotReady { reason: "PodIdentityAssociationCanaryFailed".into(), message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(), requeue_after: Some(Duration::from_secs(2)) }); - } + check_pia(namespace, base_metadata, &job_api, &pod_api).await?; // Pods MUST roll when these change, so we will apply these parameters as annotations to the pod meta let pod_annotations = pod_annotations.get_or_insert_with(Default::default); @@ -402,6 +401,7 @@ pub async fn reconcile_compute( } (Some(_), None) => { delete_pod_identity_association(namespace, &pia_api, "restate").await?; + delete_job(namespace, &job_api, "restate-pia-canary").await?; } (None, Some(aws_pod_identity_association_role_arn)) => { warn!("Ignoring AWS pod identity association role ARN {aws_pod_identity_association_role_arn} as the operator is not configured with --aws-pod-identity-association-cluster"); @@ -523,8 +523,9 @@ async fn apply_pod_identity_association( async fn check_pia( namespace: &str, base_metadata: &ObjectMeta, + job_api: &Api, pod_api: &Api, -) -> Result { +) -> Result<(), Error> { let name = "restate-pia-canary"; let params: PatchParams = PatchParams::apply("restate-operator").force(); @@ -537,24 +538,38 @@ async fn check_pia( } debug!( - "Applying PodIdentityAssociation canary Pod in namespace {}", + "Applying PodIdentityAssociation canary Job in namespace {}", namespace ); - let created = pod_api + let created = job_api .patch( name, ¶ms, - &Patch::Apply(&Pod { - metadata: object_meta(base_metadata, name), - spec: Some(PodSpec { - service_account_name: Some("restate".into()), - containers: vec![Container { - name: "canary".into(), - image: Some("hello-world:linux".into()), - ..Default::default() - }], - restart_policy: Some("Never".into()), + &Patch::Apply(&Job { + metadata, + spec: Some(JobSpec { + // single-use job that we delete on failuire; don't want to wait 10 seconds for retries + backoff_limit: Some(1), + template: PodTemplateSpec { + metadata: None, + spec: Some(PodSpec { + service_account_name: Some("restate".into()), + containers: vec![Container { + name: "canary".into(), + image: Some("busybox:uclibc".into()), + command: Some(vec![ + "grep".into(), + "-q".into(), + "AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE".into(), + "/proc/self/environ".into(), + ]), + ..Default::default() + }], + restart_policy: Some("Never".into()), + ..Default::default() + }), + }, ..Default::default() }), status: None, @@ -562,28 +577,90 @@ async fn check_pia( ) .await?; - if let Some(spec) = created.spec { - if let Some(volumes) = spec.volumes { - if volumes.iter().any(|v| v.name == "eks-pod-identity-token") { - debug!( - "PodIdentityAssociation canary check succeeded in namespace {}", - namespace - ); - // leave pod in place as a signal that we passed the check - return Ok(true); + if let Some(conditions) = created.status.and_then(|s| s.conditions) { + for condition in conditions { + if condition.status != "True" { + continue; } + match condition.type_.as_str() { + "Complete" => { + debug!( + "PodIdentityAssociation canary check succeeded in namespace {}", + namespace + ); + return Ok(()); + } + "Failed" => { + error!( + "PodIdentityAssociation canary check failed in namespace {}, deleting Job", + namespace + ); + + delete_job(namespace, job_api, name).await?; + + return Err(Error::NotReady { + reason: "PodIdentityAssociationCanaryFailed".into(), + message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(), + // job watch will cover this + requeue_after: None, + }); + } + _ => {} + } + } + } + + // if we are here then the job hasn't succeeded or failed yet; lets try and figure things out a bit quicker + // because it takes times for pods to schedule etc + + let pods = pod_api + .list(&ListParams::default().labels(&format!( + "batch.kubernetes.io/job-name={name},batch.kubernetes.io/controller-uid={}", + created.metadata.uid.unwrap() + ))) + .await?; + + if let Some(pod) = pods.items.first() { + if pod + .spec + .as_ref() + .and_then(|s| s.volumes.as_ref()) + .map(|vs| vs.iter().any(|v| v.name == "eks-pod-identity-token")) + .unwrap_or(false) + { + debug!( + "PodIdentityAssociation canary check succeeded via pod lookup in namespace {}", + namespace + ); + return Ok(()); } + + debug!( + "PodIdentityAssociation canary check failed via pod lookup in namespace {}, deleting Job", + namespace + ); + delete_job(namespace, job_api, name).await?; + + return Err(Error::NotReady { + reason: "PodIdentityAssociationCanaryFailed".into(), + message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(), + // job watch will cover this + requeue_after: None, + }); } + // no pods; we generally expect this immediately after creating the job debug!( - "PodIdentityAssociation canary check failed in namespace {}, deleting canary Pod", + "PodIdentityAssociation canary Job not yet succeeded in namespace {}", namespace ); - // delete pod to try again next time - pod_api.delete(name, &Default::default()).await?; - - Ok(false) + Err(Error::NotReady { + reason: "PodIdentityAssociationCanaryPending".into(), + message: "Canary Job has not yet succeeded; PIA webhook may need to catch up".into(), + // job watch will cover this + requeue_after: None, + }) } fn is_pod_identity_association_synced(pia: PodIdentityAssociation) -> bool { @@ -618,6 +695,18 @@ async fn delete_pod_identity_association( } } +async fn delete_job(namespace: &str, job_api: &Api, name: &str) -> Result<(), Error> { + debug!( + "Ensuring Job {} in namespace {} does not exist", + name, namespace + ); + match job_api.delete(name, &DeleteParams::default()).await { + Err(kube::Error::Api(kube::error::ErrorResponse { code: 404, .. })) => Ok(()), + Err(err) => Err(err.into()), + Ok(_) => Ok(()), + } +} + async fn apply_security_group_policy( namespace: &str, pia_api: &Api,