-
-
Notifications
You must be signed in to change notification settings - Fork 320
/
configmapgen_controller.rs
123 lines (110 loc) · 4.02 KB
/
configmapgen_controller.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Nightly clippy (0.1.64) considers Drop a side effect, see https://github.com/rust-lang/rust-clippy/issues/9608
#![allow(clippy::unnecessary_lazy_evaluations)]
use anyhow::Result;
use futures::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ObjectMeta, Patch, PatchParams, Resource},
runtime::{
controller::{Action, Config, Controller},
watcher,
},
Client, CustomResource,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, io::BufRead, sync::Arc};
use thiserror::Error;
use tokio::time::Duration;
use tracing::*;
#[derive(Debug, Error)]
enum Error {
#[error("Failed to create ConfigMap: {0}")]
ConfigMapCreationFailed(#[source] kube::Error),
#[error("MissingObjectKey: {0}")]
MissingObjectKey(&'static str),
}
#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
#[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator")]
#[kube(shortname = "cmg", namespaced)]
struct ConfigMapGeneratorSpec {
content: String,
}
/// Controller triggers this whenever our main object or our children changed
async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Arc<Data>) -> Result<Action, Error> {
let client = &ctx.client;
let mut contents = BTreeMap::new();
contents.insert("content".to_string(), generator.spec.content.clone());
let oref = generator.controller_owner_ref(&()).unwrap();
let cm = ConfigMap {
metadata: ObjectMeta {
name: generator.metadata.name.clone(),
owner_references: Some(vec![oref]),
..ObjectMeta::default()
},
data: Some(contents),
..Default::default()
};
let cm_api = Api::<ConfigMap>::namespaced(
client.clone(),
generator
.metadata
.namespace
.as_ref()
.ok_or_else(|| Error::MissingObjectKey(".metadata.namespace"))?,
);
cm_api
.patch(
cm.metadata
.name
.as_ref()
.ok_or_else(|| Error::MissingObjectKey(".metadata.name"))?,
&PatchParams::apply("configmapgenerator.kube-rt.nullable.se"),
&Patch::Apply(&cm),
)
.await
.map_err(Error::ConfigMapCreationFailed)?;
Ok(Action::requeue(Duration::from_secs(300)))
}
/// The controller triggers this on reconcile errors
fn error_policy(_object: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<Data>) -> Action {
Action::requeue(Duration::from_secs(1))
}
// Data we want access to in error/reconcile calls
struct Data {
client: Client,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
let cms = Api::<ConfigMap>::all(client.clone());
info!("starting configmapgen-controller");
info!("press <enter> to force a reconciliation of all objects");
let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
// Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
// and its worker prevents the Tokio runtime from shutting down.
std::thread::spawn(move || {
for _ in std::io::BufReader::new(std::io::stdin()).lines() {
let _ = reload_tx.try_send(());
}
});
// limit the controller to running a maximum of two concurrent reconciliations
let config = Config::default().concurrency(2);
Controller::new(cmgs, watcher::Config::default())
.owns(cms, watcher::Config::default())
.with_config(config)
.reconcile_all_on(reload_rx.map(|_| ()))
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(Data { client }))
.for_each(|res| async move {
match res {
Ok(o) => info!("reconciled {:?}", o),
Err(e) => warn!("reconcile failed: {}", e),
}
})
.await;
info!("controller terminated");
Ok(())
}