Skip to content

Commit

Permalink
Use predicates to dedupe changes to cm, svc, ss, job (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman authored Dec 11, 2024
1 parent 6c20dfa commit d0b9aba
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 8 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ kube = { version = "0.88.1", features = [
] }
schemars = { version = "0.8.12", features = ["chrono"] }
serde = { version = "1.0.185", features = ["derive"] }
serde-hashkey = "0.4.5"
serde_json = "1.0.105"
serde_yaml = "0.9.25"
prometheus = "0.13.3"
Expand Down
101 changes: 93 additions & 8 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use std::sync::Arc;

use chrono::{DateTime, Utc};
use futures::StreamExt;
use k8s_openapi::api::apps::v1::StatefulSet;
use k8s_openapi::api::apps::v1::{StatefulSet, StatefulSetStatus};
use k8s_openapi::api::batch::v1::Job;
use k8s_openapi::api::core::v1::{
ConfigMap, EnvVar, Namespace, PersistentVolumeClaim, PodDNSConfig, ResourceRequirements,
Service, ServiceAccount, Toleration,
Service, ServiceAccount, ServiceSpec, Toleration,
};
use k8s_openapi::api::networking::v1;
use k8s_openapi::api::networking::v1::{NetworkPolicy, NetworkPolicyPeer, NetworkPolicyPort};
Expand Down Expand Up @@ -750,7 +750,6 @@ pub async fn run(state: State) {
let cm_api = Api::<ConfigMap>::all(client.clone());
let np_api = Api::<NetworkPolicy>::all(client.clone());
let pia_api = Api::<PodIdentityAssociation>::all(client.clone());
let job_api = Api::<Job>::all(client.clone());
let sgp_api = Api::<SecurityGroupPolicy>::all(client.clone());
let spc_api = Api::<SecretProviderClass>::all(client.clone());

Expand All @@ -777,7 +776,8 @@ pub async fn run(state: State) {
let (ss_store, ss_writer) = reflector::store();
let ss_reflector = reflector(ss_writer, watcher(ss_api, cfg.clone()))
.touched_objects()
.default_backoff();
.default_backoff()
.predicate_filter(changed_predicate.combine(status_predicate_serde));

let np_watcher = metadata_watcher(np_api, cfg.clone())
.touched_objects()
Expand All @@ -791,10 +791,20 @@ pub async fn run(state: State) {
.touched_objects()
.predicate_filter(changed_predicate);

let svc_watcher = watcher(svc_api, cfg.clone())
.touched_objects()
// svc has no generation so we hash the spec to check for changes
.predicate_filter(changed_predicate.combine(spec_predicate_serde));

let cm_watcher = watcher(cm_api, cfg.clone())
.touched_objects()
// cm has no generation so we hash the data to check for changes
.predicate_filter(changed_predicate.combine(spec_predicate));

let controller = Controller::new(rc_api, rc_cfg.clone())
.shutdown_on_signal()
.owns(svc_api, cfg.clone())
.owns(cm_api, cfg.clone())
.owns_stream(svc_watcher)
.owns_stream(cm_watcher)
.owns_stream(ns_watcher)
.owns_stream(svcacc_watcher)
.owns_stream(np_watcher)
Expand All @@ -819,10 +829,16 @@ pub async fn run(state: State) {
// avoid apply loops that seem to happen with crds
.predicate_filter(changed_predicate.combine(status_predicate));

controller.owns_stream(pia_watcher).owns(
let job_api = Api::<Job>::all(client.clone());

let job_watcher = metadata_watcher(
job_api,
Config::default().labels("app.kubernetes.io/name=restate-pia-canary"),
)
.touched_objects()
.predicate_filter(changed_predicate);

controller.owns_stream(pia_watcher).owns_stream(job_watcher)
} else {
controller
};
Expand Down Expand Up @@ -871,7 +887,7 @@ fn changed_predicate<K: Resource>(obj: &K) -> Option<u64> {
}
obj.labels().hash(&mut hasher);
obj.annotations().hash(&mut hasher);
// we don't care about status (and don't currently watch anything but metadata)
// ignore status
Some(hasher.finish())
}

Expand All @@ -885,3 +901,72 @@ where
}
Some(hasher.finish())
}

trait MyHasStatus {
type Status;

fn status(&self) -> Option<&Self::Status>;
}

impl MyHasStatus for StatefulSet {
type Status = StatefulSetStatus;

fn status(&self) -> Option<&Self::Status> {
self.status.as_ref()
}
}

fn status_predicate_serde<K: Resource + MyHasStatus>(obj: &K) -> Option<u64>
where
K::Status: Serialize,
{
let mut hasher = DefaultHasher::new();
if let Some(s) = obj.status() {
serde_hashkey::to_key(s)
.expect("serde_hashkey never to return an error")
.hash(&mut hasher);
}
Some(hasher.finish())
}

pub trait MyHasSpec {
type Spec;

fn spec(&self) -> &Self::Spec;
}

impl MyHasSpec for Service {
type Spec = Option<ServiceSpec>;

fn spec(&self) -> &Self::Spec {
&self.spec
}
}

impl MyHasSpec for ConfigMap {
type Spec = Option<std::collections::BTreeMap<String, String>>;

fn spec(&self) -> &Self::Spec {
&self.data
}
}

fn spec_predicate<K: Resource + MyHasSpec>(obj: &K) -> Option<u64>
where
K::Spec: Hash,
{
let mut hasher = DefaultHasher::new();
obj.spec().hash(&mut hasher);
Some(hasher.finish())
}

fn spec_predicate_serde<K: Resource + MyHasSpec>(obj: &K) -> Option<u64>
where
K::Spec: Serialize,
{
let mut hasher = DefaultHasher::new();
serde_hashkey::to_key(obj.spec())
.expect("serde_hashkey never to return an error")
.hash(&mut hasher);
Some(hasher.finish())
}

0 comments on commit d0b9aba

Please sign in to comment.