Skip to content

Commit

Permalink
Remove abandoned StreamSubscribe implementation
Browse files Browse the repository at this point in the history
This was never properly usable without further integration, and as such
was never stabilised. It is replaced via #1449

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux committed Apr 18, 2024
1 parent c89a419 commit 158ad28
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 300 deletions.
232 changes: 0 additions & 232 deletions kube-runtime/src/utils/stream_subscribe.rs

This file was deleted.

68 changes: 0 additions & 68 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#[cfg(feature = "unstable-runtime-predicates")]
use crate::utils::predicate::{Predicate, PredicateFilter};
#[cfg(feature = "unstable-runtime-subscribe")]
use crate::utils::stream_subscribe::StreamSubscribe;
use crate::{
utils::{event_flatten::EventFlatten, event_modify::EventModify, stream_backoff::StreamBackoff},
watcher,
Expand Down Expand Up @@ -128,72 +126,6 @@ pub trait WatchStreamExt: Stream {
PredicateFilter::new(self, predicate)
}

/// Create a [`StreamSubscribe`] from a [`watcher()`] stream.
///
/// The [`StreamSubscribe::subscribe()`] method which allows additional consumers
/// of events from a stream without consuming the stream itself.
///
/// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`](crate::utils::stream_subscribe::Error::Lagged)
/// error. The subscriber can then decide to abort its task or tolerate the lost events.
///
/// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams
/// will also end.
///
/// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
///
/// ## Warning
///
/// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams
/// will never receive any events.
///
/// # Usage
///
/// ```
/// use futures::{Stream, StreamExt};
/// use std::{fmt::Debug, sync::Arc};
/// use kube_runtime::{watcher, WatchStreamExt};
///
/// fn explain_events<K, S>(
/// stream: S,
/// ) -> (
/// impl Stream<Item = Arc<Result<watcher::Event<K>, watcher::Error>>> + Send + Sized + 'static,
/// impl Stream<Item = String> + Send + Sized + 'static,
/// )
/// where
/// K: Clone + Debug + Send + Sync + 'static,
/// S: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
/// {
/// // Create a stream that can be subscribed to
/// let stream_subscribe = stream.stream_subscribe();
/// // Create a subscription to that stream
/// let subscription = stream_subscribe.subscribe();
///
/// // Create a stream of descriptions of the events
/// let explain_stream = subscription.filter_map(|event| async move {
/// // We don't care about lagged events so we can throw that error away
/// match event.ok()?.as_ref() {
/// Ok(watcher::Event::Applied(event)) => {
/// Some(format!("An object was added or modified: {event:?}"))
/// }
/// Ok(_) => todo!("explain other events"),
/// // We don't care about watcher errors either
/// Err(_) => None,
/// }
/// });
///
/// // We now still have the original stream, and a secondary stream of explanations
/// (stream_subscribe, explain_stream)
/// }
/// ```
#[cfg(feature = "unstable-runtime-subscribe")]
fn stream_subscribe<K>(self) -> StreamSubscribe<Self>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
K: Clone,
{
StreamSubscribe::new(self)
}

/// Reflect a [`watcher()`] stream into a [`Store`] through a [`Writer`]
///
/// Returns the stream unmodified, but passes every [`watcher::Event`] through a [`Writer`].
Expand Down

0 comments on commit 158ad28

Please sign in to comment.