Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Canary pods need to be created via Jobs #7

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion charts/restate-operator-helm/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ rules:
- statefulsets
- persistentvolumeclaims
- pods
- jobs
- securitygrouppolicies
- secretproviderclasses
verbs:
Expand All @@ -64,7 +65,7 @@ rules:
- resources:
- statefulsets
- networkpolicies
- pods
- jobs
- securitygrouppolicies
- secretproviderclasses
verbs:
Expand Down
15 changes: 9 additions & 6 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use k8s_openapi::api::apps::v1::StatefulSet;
use k8s_openapi::api::batch::v1::Job;
use k8s_openapi::api::core::v1::{
EnvVar, Namespace, PersistentVolumeClaim, Pod, PodDNSConfig, ResourceRequirements, Service,
EnvVar, Namespace, PersistentVolumeClaim, PodDNSConfig, ResourceRequirements, Service,
ServiceAccount,
};
use k8s_openapi::api::networking::v1;
use k8s_openapi::api::networking::v1::{NetworkPolicy, NetworkPolicyPeer, NetworkPolicyPort};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{APIGroup, ObjectMeta};

use kube::core::object::HasStatus;
use kube::core::PartialObjectMeta;
use kube::runtime::reflector::{ObjectRef, Store};
Expand Down Expand Up @@ -417,7 +419,7 @@ async fn reconcile(rc: Arc<RestateCluster>, ctx: Arc<Context>) -> Result<Action>
}
}

fn error_policy(_rc: Arc<RestateCluster>, _error: &Error, _ctx: Arc<Context>) -> Action {
fn error_policy<K, C>(_rc: Arc<K>, _error: &Error, _ctx: C) -> Action {
Action::requeue(Duration::from_secs(30))
}

Expand Down Expand Up @@ -724,7 +726,7 @@ pub async fn run(state: State) {
let svcacc_api = Api::<ServiceAccount>::all(client.clone());
let np_api = Api::<NetworkPolicy>::all(client.clone());
let pia_api = Api::<PodIdentityAssociation>::all(client.clone());
let pod_api = Api::<Pod>::all(client.clone());
let job_api = Api::<Job>::all(client.clone());
let sgp_api = Api::<SecurityGroupPolicy>::all(client.clone());
let spc_api = Api::<SecretProviderClass>::all(client.clone());

Expand Down Expand Up @@ -785,9 +787,10 @@ pub async fn run(state: State) {
// avoid apply loops that seem to happen with crds
.predicate_filter(changed_predicate.combine(status_predicate));

controller
.owns_stream(pia_watcher)
.owns(pod_api, cfg.clone())
controller.owns_stream(pia_watcher).owns(
job_api,
Config::default().labels("app.kubernetes.io/name=restate-pia-canary"),
)
} else {
controller
};
Expand Down
155 changes: 122 additions & 33 deletions src/reconcilers/compute.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::{BTreeMap, HashSet};
use std::convert::Into;
use std::path::PathBuf;
use std::time::Duration;

use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetSpec, StatefulSetStatus};
use k8s_openapi::api::batch::v1::{Job, JobSpec};
use k8s_openapi::api::core::v1::{
Container, ContainerPort, EnvVar, HTTPGetAction, PersistentVolumeClaim,
PersistentVolumeClaimSpec, Pod, PodSecurityContext, PodSpec, PodTemplateSpec, Probe,
Expand All @@ -13,14 +13,14 @@ use k8s_openapi::api::core::v1::{
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
use kube::api::{DeleteParams, Preconditions, PropagationPolicy};
use kube::api::{DeleteParams, ListParams, Preconditions, PropagationPolicy};
use kube::core::PartialObjectMeta;
use kube::runtime::reflector::{ObjectRef, Store};
use kube::{
api::{Patch, PatchParams},
Api, ResourceExt,
};
use tracing::{debug, warn};
use tracing::{debug, error, warn};

use crate::podidentityassociations::{PodIdentityAssociation, PodIdentityAssociationSpec};
use crate::reconcilers::{label_selector, mandatory_labels, object_meta};
Expand Down Expand Up @@ -342,6 +342,7 @@ pub async fn reconcile_compute(
let svc_api: Api<Service> = Api::namespaced(ctx.client.clone(), namespace);
let svcacc_api: Api<ServiceAccount> = Api::namespaced(ctx.client.clone(), namespace);
let pia_api: Api<PodIdentityAssociation> = Api::namespaced(ctx.client.clone(), namespace);
let job_api: Api<Job> = Api::namespaced(ctx.client.clone(), namespace);
let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), namespace);
let sgp_api: Api<SecurityGroupPolicy> = Api::namespaced(ctx.client.clone(), namespace);

Expand Down Expand Up @@ -385,9 +386,7 @@ pub async fn reconcile_compute(
return Err(Error::NotReady { reason: "PodIdentityAssociationNotSynced".into(), message: "Waiting for the AWS ACK controller to provision the Pod Identity Association with IAM".into(), requeue_after: None });
}

if !check_pia(namespace, base_metadata, &pod_api).await? {
return Err(Error::NotReady { reason: "PodIdentityAssociationCanaryFailed".into(), message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(), requeue_after: Some(Duration::from_secs(2)) });
}
check_pia(namespace, base_metadata, &job_api, &pod_api).await?;

// Pods MUST roll when these change, so we will apply these parameters as annotations to the pod meta
let pod_annotations = pod_annotations.get_or_insert_with(Default::default);
Expand All @@ -402,6 +401,7 @@ pub async fn reconcile_compute(
}
(Some(_), None) => {
delete_pod_identity_association(namespace, &pia_api, "restate").await?;
delete_job(namespace, &job_api, "restate-pia-canary").await?;
}
(None, Some(aws_pod_identity_association_role_arn)) => {
warn!("Ignoring AWS pod identity association role ARN {aws_pod_identity_association_role_arn} as the operator is not configured with --aws-pod-identity-association-cluster");
Expand Down Expand Up @@ -523,8 +523,9 @@ async fn apply_pod_identity_association(
async fn check_pia(
namespace: &str,
base_metadata: &ObjectMeta,
job_api: &Api<Job>,
pod_api: &Api<Pod>,
) -> Result<bool, Error> {
) -> Result<(), Error> {
let name = "restate-pia-canary";
let params: PatchParams = PatchParams::apply("restate-operator").force();

Expand All @@ -537,53 +538,129 @@ async fn check_pia(
}

debug!(
"Applying PodIdentityAssociation canary Pod in namespace {}",
"Applying PodIdentityAssociation canary Job in namespace {}",
namespace
);

let created = pod_api
let created = job_api
.patch(
name,
&params,
&Patch::Apply(&Pod {
metadata: object_meta(base_metadata, name),
spec: Some(PodSpec {
service_account_name: Some("restate".into()),
containers: vec![Container {
name: "canary".into(),
image: Some("hello-world:linux".into()),
..Default::default()
}],
restart_policy: Some("Never".into()),
&Patch::Apply(&Job {
metadata,
spec: Some(JobSpec {
// single-use job that we delete on failuire; don't want to wait 10 seconds for retries
backoff_limit: Some(1),
template: PodTemplateSpec {
metadata: None,
spec: Some(PodSpec {
service_account_name: Some("restate".into()),
containers: vec![Container {
name: "canary".into(),
image: Some("busybox:uclibc".into()),
command: Some(vec![
"grep".into(),
"-q".into(),
"AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE".into(),
"/proc/self/environ".into(),
]),
..Default::default()
}],
restart_policy: Some("Never".into()),
..Default::default()
}),
},
..Default::default()
}),
status: None,
}),
)
.await?;

if let Some(spec) = created.spec {
if let Some(volumes) = spec.volumes {
if volumes.iter().any(|v| v.name == "eks-pod-identity-token") {
debug!(
"PodIdentityAssociation canary check succeeded in namespace {}",
namespace
);
// leave pod in place as a signal that we passed the check
return Ok(true);
if let Some(conditions) = created.status.and_then(|s| s.conditions) {
for condition in conditions {
if condition.status != "True" {
continue;
}
match condition.type_.as_str() {
"Complete" => {
debug!(
"PodIdentityAssociation canary check succeeded in namespace {}",
namespace
);
return Ok(());
}
"Failed" => {
error!(
"PodIdentityAssociation canary check failed in namespace {}, deleting Job",
namespace
);

delete_job(namespace, job_api, name).await?;

return Err(Error::NotReady {
reason: "PodIdentityAssociationCanaryFailed".into(),
message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(),
// job watch will cover this
requeue_after: None,
});
}
_ => {}
}
}
}

// if we are here then the job hasn't succeeded or failed yet; lets try and figure things out a bit quicker
// because it takes times for pods to schedule etc

let pods = pod_api
.list(&ListParams::default().labels(&format!(
"batch.kubernetes.io/job-name={name},batch.kubernetes.io/controller-uid={}",
created.metadata.uid.unwrap()
)))
.await?;

if let Some(pod) = pods.items.first() {
if pod
.spec
.as_ref()
.and_then(|s| s.volumes.as_ref())
.map(|vs| vs.iter().any(|v| v.name == "eks-pod-identity-token"))
.unwrap_or(false)
{
debug!(
"PodIdentityAssociation canary check succeeded via pod lookup in namespace {}",
namespace
);
return Ok(());
}

debug!(
"PodIdentityAssociation canary check failed via pod lookup in namespace {}, deleting Job",
namespace
);
delete_job(namespace, job_api, name).await?;

return Err(Error::NotReady {
reason: "PodIdentityAssociationCanaryFailed".into(),
message: "Canary pod did not receive Pod Identity credentials; PIA webhook may need to catch up".into(),
// job watch will cover this
requeue_after: None,
});
}

// no pods; we generally expect this immediately after creating the job
debug!(
"PodIdentityAssociation canary check failed in namespace {}, deleting canary Pod",
"PodIdentityAssociation canary Job not yet succeeded in namespace {}",
namespace
);

// delete pod to try again next time
pod_api.delete(name, &Default::default()).await?;

Ok(false)
Err(Error::NotReady {
reason: "PodIdentityAssociationCanaryPending".into(),
message: "Canary Job has not yet succeeded; PIA webhook may need to catch up".into(),
// job watch will cover this
requeue_after: None,
})
}

fn is_pod_identity_association_synced(pia: PodIdentityAssociation) -> bool {
Expand Down Expand Up @@ -618,6 +695,18 @@ async fn delete_pod_identity_association(
}
}

async fn delete_job(namespace: &str, job_api: &Api<Job>, name: &str) -> Result<(), Error> {
debug!(
"Ensuring Job {} in namespace {} does not exist",
name, namespace
);
match job_api.delete(name, &DeleteParams::default()).await {
Err(kube::Error::Api(kube::error::ErrorResponse { code: 404, .. })) => Ok(()),
Err(err) => Err(err.into()),
Ok(_) => Ok(()),
}
}

async fn apply_security_group_policy(
namespace: &str,
pia_api: &Api<SecurityGroupPolicy>,
Expand Down