Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shared stream interfaces #1449

Merged
merged 40 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
15ba09d
Add a simple controller example
mateiidavid Jan 31, 2024
fc2fc42
Merge branch 'main' of github.com:kube-rs/kube into matei/arc-watcher
mateiidavid Jan 31, 2024
1657ddc
Add shared stream controller example
mateiidavid Feb 23, 2024
98255dc
Try to get something working
mateiidavid Feb 23, 2024
3685b9e
Rm my notes
mateiidavid Feb 23, 2024
683e77d
Results or objectefs
mateiidavid Feb 29, 2024
af7a309
Working shared stream
mateiidavid Feb 29, 2024
8d4d694
Different way of doing it
mateiidavid Feb 29, 2024
8534770
Switch to async_broadcast
mateiidavid Mar 2, 2024
9bbe8e1
Remove old, unused code
mateiidavid Mar 2, 2024
3f874ce
Remove unused examples
mateiidavid Mar 2, 2024
1e1e347
Gotta state machine this stuff
mateiidavid Mar 7, 2024
15f6e1d
Take 1 with try_recv
mateiidavid Mar 8, 2024
49eaf12
try_recv take 2
mateiidavid Mar 8, 2024
e7aad76
Working on names next
mateiidavid Mar 11, 2024
b6ff97f
Ok surprising this worked
mateiidavid Mar 13, 2024
7a570fd
Write tests and rename file to reflect dispatch
mateiidavid Mar 25, 2024
0256cb0
WIP
mateiidavid Mar 25, 2024
74f09f7
WIP 2
mateiidavid Mar 26, 2024
2d5a3b0
Start working on store side
mateiidavid Mar 26, 2024
0cb816b
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Mar 26, 2024
9bf111c
Tests are green
mateiidavid Mar 26, 2024
04a53d1
rm redundant trait bounds
mateiidavid Mar 26, 2024
6b5bd31
Update example with new interfaces
mateiidavid Mar 26, 2024
def0011
Add comments and a small todo
mateiidavid Mar 27, 2024
d69213a
Remove dispatch mod from utils
mateiidavid Mar 27, 2024
21dbbae
@clux's feedback
mateiidavid Apr 3, 2024
c7fc333
@clux's feedback
mateiidavid Apr 3, 2024
1b81f4c
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Apr 3, 2024
c6d1027
Fix tests & clippy warns
mateiidavid Apr 3, 2024
a30f2e6
Run fmt
mateiidavid Apr 3, 2024
fef5d83
Update examples/shared_stream_controllers.rs
mateiidavid Apr 8, 2024
44c441e
@clux's feedback on examples
mateiidavid Apr 11, 2024
8347103
Fix name in ns
mateiidavid Apr 15, 2024
9f7edd1
Add comments and feature flags
mateiidavid Apr 15, 2024
e2399f1
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Apr 16, 2024
a14d6b4
Fix CI checks
mateiidavid Apr 16, 2024
de2eda1
Run rustfmt
mateiidavid Apr 16, 2024
276b75e
@clux's feedback
mateiidavid Apr 17, 2024
eca6be1
Run fmt
mateiidavid Apr 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ crossterm = "0.27.0"
name = "configmapgen_controller"
path = "configmapgen_controller.rs"

[[example]]
name = "shared_stream_controllers"
path = "shared_stream_controllers.rs"

[[example]]
name = "crd_api"
path = "crd_api.rs"
Expand Down
234 changes: 234 additions & 0 deletions examples/shared_stream_controllers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
use std::{sync::Arc, time::Duration};

use futures::StreamExt;
use k8s_openapi::api::core::v1::{Pod, PodCondition};
use kube::{
api::{Patch, PatchParams},
runtime::{controller::Action, reflector, watcher, Config, Controller, WatchStreamExt},
Api, Client, ResourceExt,
};
use tokio::sync::mpsc;
use tracing::{info, warn};

use thiserror::Error;

// Helper module that namespaces two constants describing a Kubernetes status condition
pub mod condition {
pub static UNDOCUMENTED_TYPE: &str = "UndocumentedPort";
pub static STATUS_TRUE: &str = "True";
}

const SUBSCRIBE_BUFFER_SIZE: usize = 256;

#[derive(Debug, Error)]
enum Error {
#[error("Failed to patch pod: {0}")]
WriteFailed(#[source] kube::Error),

#[error("Missing po field: {0}")]
MissingField(&'static str),
}

#[derive(Clone)]
struct Data {
client: Client,
}

/// A simple reconciliation function that will copy a pod's labels into the annotations.
async fn reconcile_metadata(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> {
if pod.name_any() == "kube-system" {
mateiidavid marked this conversation as resolved.
Show resolved Hide resolved
return Ok(Action::await_change());
}

let labels = pod.labels();
if labels.is_empty() {
return Ok(Action::await_change());
}

let mut annotations = pod.annotations().clone();
for (key, value) in labels {
annotations.insert(key.to_owned(), value.to_owned());
}
mateiidavid marked this conversation as resolved.
Show resolved Hide resolved

let mut pod = (*pod).clone();
pod.metadata.annotations = Some(annotations);
pod.metadata.managed_fields = None;

let pod_api = Api::<Pod>::namespaced(
ctx.client.clone(),
pod.metadata
.namespace
.as_ref()
.ok_or_else(|| Error::MissingField(".metadata.name"))?,
);

pod_api
.patch(
&pod.name_any(),
&PatchParams::apply("controller-1"),
&Patch::Apply(&pod),
)
.await
.map_err(Error::WriteFailed)?;

Ok(Action::requeue(Duration::from_secs(300)))
}

/// Another reconiliation function that will add an 'UndocumentedPort' condition to pods that do
/// do not have any ports declared across all containers.
async fn reconcile_status(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> {
for container in pod.spec.clone().unwrap_or_default().containers.iter() {
if container.ports.clone().unwrap_or_default().len() != 0 {
tracing::debug!(name = %pod.name_any(), "Skipped updating pod with documented ports");
return Ok(Action::await_change());
}
}

let pod_api = Api::<Pod>::namespaced(
ctx.client.clone(),
pod.metadata
.namespace
.as_ref()
.ok_or_else(|| Error::MissingField(".metadata.name"))?,
);

let undocumented_condition = PodCondition {
type_: condition::UNDOCUMENTED_TYPE.into(),
status: condition::STATUS_TRUE.into(),
..Default::default()
};
let value = serde_json::json!({
"status": {
"name": pod.name_any(),
"kind": "Pod",
"conditions": vec![undocumented_condition]
}
});
pod_api
.patch_status(
&pod.name_any(),
&PatchParams::apply("controller-2"),
&Patch::Strategic(value),
)
.await
.map_err(Error::WriteFailed)?;

Ok(Action::requeue(Duration::from_secs(300)))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

let client = Client::try_default().await?;
let pods = Api::<Pod>::namespaced(client.clone(), "default");
let config = Config::default().concurrency(2);
let ctx = Arc::new(Data { client });

// Create a shared store with a predefined buffer that will be shared between subscribers.
let (reader, writer) = reflector::store_shared(SUBSCRIBE_BUFFER_SIZE);
// Before threading an object watch through the store, create a subscriber.
// Any number of subscribers can be created from one writer.
let subscriber = writer
.subscribe()
.expect("subscribers can only be created from shared stores");

// Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to
// be able to consume updates, the reflector must be shared.
let mut pod_watch = watcher(pods.clone(), Default::default())
.default_backoff()
.reflect_shared(writer)
.boxed();

// Create the first controller using the reconcile_metadata function. Controllers accept
// subscribers through a dedicated interface.
let mut metadata_controller = Controller::for_shared_stream(subscriber.clone(), reader)
.with_config(config.clone())
.run(
reconcile_metadata,
|pod, error, _| {
tracing::error!(%error, name = %pod.name_any(), "Failed to reconcile metadata");
Action::requeue(Duration::from_secs(10))
},
mateiidavid marked this conversation as resolved.
Show resolved Hide resolved
ctx.clone(),
)
.boxed();

// Subscribers can be used to get a read handle on the store, if the initial handle has been
// moved or dropped.
let reader = subscriber.reader();
// Create the second controller using the reconcile_status function.
let mut status_controller = Controller::for_shared_stream(subscriber, reader)
.with_config(config)
.run(
reconcile_status,
|pod, error, _| {
tracing::error!(%error, name = %pod.name_any(), "Failed to reconcile status");
Action::requeue(Duration::from_secs(10))
},
ctx,
)
.boxed();

// A simple handler to shutdown on CTRL-C or SIGTERM.
let mut shutdown_rx = shutdown_handler();
mateiidavid marked this conversation as resolved.
Show resolved Hide resolved

// Drive streams to readiness. The initial watch (that is reflected) needs to be driven to
// consume events from the API Server and forward them to subscribers.
//
// Both controllers will operate on shared objects.
loop {
tokio::select! {
Some(res) = metadata_controller.next() => {
match res {
Ok(v) => info!("Reconciled metadata {v:?}"),
Err(error) => warn!(%error, "Failed to reconcile metadata"),
}
},

Some(res) = status_controller.next() => {
match res {
Ok(v) => info!("Reconciled status {v:?}"),
Err(error) => warn!(%error, "Failed to reconcile object"),
}
},

Some(item) = pod_watch.next() => {
match item {
Err(error) => tracing::error!(%error, "Received error from main watcher stream"),
_ => {}
}
},

_ = shutdown_rx.recv() => {
tracing::info!("Received shutdown signal; terminating...");
break;
}
}
}

Ok(())
}

// Create a channel that will hold at most one item. Whenever a signal is received it is sent
// through the channel.
// We do not use a oneshot because we don't want to clone the receiver in each loop iteration.
fn shutdown_handler() -> mpsc::Receiver<()> {
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let mut terminate = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("should not fail to register sighandler");
let ctrlc = tokio::signal::ctrl_c();
tokio::spawn(async move {
tokio::select! {
_ = terminate.recv() => {
shutdown_tx.send(()).await
},

_ = ctrlc => {
shutdown_tx.send(()).await
}
}
});

shutdown_rx
}
2 changes: 2 additions & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ backoff.workspace = true
async-trait.workspace = true
hashbrown.workspace = true
k8s-openapi.workspace = true
async-broadcast = "0.7.0"
async-stream = "0.3.5"
mateiidavid marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" }
Expand Down
53 changes: 53 additions & 0 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@
})
}

pub fn trigger_self_shared<K, S>(
mateiidavid marked this conversation as resolved.
Show resolved Hide resolved
stream: S,
dyntype: K::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = Arc<K>>,
K: Resource,
K::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
Some(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
reason: ReconcileReason::ObjectUpdated,

Check warning on line 137 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L134-L137

Added lines #L134 - L137 were not covered by tests
})
})
}

/// Enqueues any mapper returned `K` types for reconciliation
fn trigger_others<S, K, I>(
stream: S,
Expand Down Expand Up @@ -701,6 +718,42 @@
}
}

// TODO: do an entrypoint for shared streams of owned objects
//
// Is it better to use a concrete type (i.e. a SubscribeHandle as a trigger)
// or to pass in the reader out-of-band?
pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
where
K::DynamicType: Default,
{
Self::for_shared_stream_with(trigger, reader, Default::default())

Check warning on line 729 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L729

Added line #L729 was not covered by tests
}

pub fn for_shared_stream_with(
mateiidavid marked this conversation as resolved.
Show resolved Hide resolved
trigger: impl Stream<Item = Arc<K>> + Send + 'static,
reader: Store<K>,
dyntype: K::DynamicType,
) -> Self {
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
trigger_selector.push(self_watcher);

Check warning on line 739 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L737-L739

Added lines #L737 - L739 were not covered by tests
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
graceful_shutdown_selector: vec![

Check warning on line 743 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L742-L743

Added lines #L742 - L743 were not covered by tests
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
],
forceful_shutdown_selector: vec![

Check warning on line 747 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L747

Added line #L747 was not covered by tests
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
],
dyntype,
reader,
config: Default::default(),

Check warning on line 753 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L753

Added line #L753 was not covered by tests
}
}

/// Specify the configuration for the controller's behavior.
#[must_use]
pub fn with_config(mut self, config: Config) -> Self {
Expand Down
Loading
Loading