Skip to content

Commit

Permalink
Remove abandoned StreamSubscribe implementation (#1470)
Browse files Browse the repository at this point in the history
* Remove abandoned `StreamSubscribe` implementation

This was never properly usable without further integration, and as such
was never stabilised. It is replaced via #1449

Signed-off-by: clux <[email protected]>

* forgot two imports

Signed-off-by: clux <[email protected]>

---------

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux authored May 1, 2024
1 parent 9eb5048 commit 6d0c9b1
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 303 deletions.
3 changes: 0 additions & 3 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ mod event_modify;
#[cfg(feature = "unstable-runtime-predicates")] mod predicate;
mod reflect;
mod stream_backoff;
#[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe;
mod watch_ext;

pub use backoff_reset_timer::ResetTimerBackoff;
Expand All @@ -17,8 +16,6 @@ pub use event_modify::EventModify;
pub use predicate::{predicates, Predicate, PredicateFilter};
pub use reflect::Reflect;
pub use stream_backoff::StreamBackoff;
#[cfg(feature = "unstable-runtime-subscribe")]
pub use stream_subscribe::StreamSubscribe;
pub use watch_ext::WatchStreamExt;

use futures::{
Expand Down
232 changes: 0 additions & 232 deletions kube-runtime/src/utils/stream_subscribe.rs

This file was deleted.

68 changes: 0 additions & 68 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#[cfg(feature = "unstable-runtime-predicates")]
use crate::utils::predicate::{Predicate, PredicateFilter};
#[cfg(feature = "unstable-runtime-subscribe")]
use crate::utils::stream_subscribe::StreamSubscribe;
use crate::{
utils::{event_flatten::EventFlatten, event_modify::EventModify, stream_backoff::StreamBackoff},
watcher,
Expand Down Expand Up @@ -128,72 +126,6 @@ pub trait WatchStreamExt: Stream {
PredicateFilter::new(self, predicate)
}

/// Create a [`StreamSubscribe`] from a [`watcher()`] stream.
///
/// The [`StreamSubscribe::subscribe()`] method which allows additional consumers
/// of events from a stream without consuming the stream itself.
///
/// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`](crate::utils::stream_subscribe::Error::Lagged)
/// error. The subscriber can then decide to abort its task or tolerate the lost events.
///
/// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams
/// will also end.
///
/// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
///
/// ## Warning
///
/// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams
/// will never receive any events.
///
/// # Usage
///
/// ```
/// use futures::{Stream, StreamExt};
/// use std::{fmt::Debug, sync::Arc};
/// use kube_runtime::{watcher, WatchStreamExt};
///
/// fn explain_events<K, S>(
/// stream: S,
/// ) -> (
/// impl Stream<Item = Arc<Result<watcher::Event<K>, watcher::Error>>> + Send + Sized + 'static,
/// impl Stream<Item = String> + Send + Sized + 'static,
/// )
/// where
/// K: Clone + Debug + Send + Sync + 'static,
/// S: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
/// {
/// // Create a stream that can be subscribed to
/// let stream_subscribe = stream.stream_subscribe();
/// // Create a subscription to that stream
/// let subscription = stream_subscribe.subscribe();
///
/// // Create a stream of descriptions of the events
/// let explain_stream = subscription.filter_map(|event| async move {
/// // We don't care about lagged events so we can throw that error away
/// match event.ok()?.as_ref() {
/// Ok(watcher::Event::Applied(event)) => {
/// Some(format!("An object was added or modified: {event:?}"))
/// }
/// Ok(_) => todo!("explain other events"),
/// // We don't care about watcher errors either
/// Err(_) => None,
/// }
/// });
///
/// // We now still have the original stream, and a secondary stream of explanations
/// (stream_subscribe, explain_stream)
/// }
/// ```
#[cfg(feature = "unstable-runtime-subscribe")]
fn stream_subscribe<K>(self) -> StreamSubscribe<Self>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
K: Clone,
{
StreamSubscribe::new(self)
}

/// Reflect a [`watcher()`] stream into a [`Store`] through a [`Writer`]
///
/// Returns the stream unmodified, but passes every [`watcher::Event`] through a [`Writer`].
Expand Down

0 comments on commit 6d0c9b1

Please sign in to comment.