Skip to content

Commit

Permalink
operators: Add ref_count(other) operator overload.
Browse files Browse the repository at this point in the history
The existing `connectable_observable.ref_count()` operator calls
connect on the source when it's subscribed to.

Generalize this by allowing an optional parameter `other`, i.e.
`observable.ref_count(connectable_observable other)` to be used as the
connect target.

Useful for implementing diamond graphs while retaining composability:

```
     A
   /   \
  B     C
   \   /
     D
     |
     E

auto A = ... | publish();
auto B = A | ...;
auto C = A | ...;
auto D = B | merge(C) | ref_count(A);
auto E = D | ...;

E | subscribe(...);
```

Resolves: ReactiveX#484
  • Loading branch information
iam committed Feb 15, 2019
1 parent aac2fc9 commit 77a5b21
Show file tree
Hide file tree
Showing 7 changed files with 393 additions and 15 deletions.
95 changes: 95 additions & 0 deletions Rx/v2/examples/doxygen/publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
#include "rxcpp/rx-test.hpp"
#include "catch.hpp"

#include <atomic>
#include <array>

SCENARIO("publish_synchronized sample"){
printf("//! [publish_synchronized sample]\n");
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
Expand Down Expand Up @@ -95,3 +98,95 @@ SCENARIO("publish behavior sample"){
values.as_blocking().subscribe();
printf("//! [publish behavior sample]\n");
}

SCENARIO("publish diamond bgthread sample"){
printf("//! [publish diamond bgthread sample]\n");

/*
* Implements the following diamond graph chain with publish+connect on a background thread.
*
* Values
* / \
* *2 *100
* \ /
* Merge
*/
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread()).
take(5).
publish();

// Left side multiplies by 2.
auto left = values.map(
[](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} );

// Right side multiplies by 100.
auto right = values.map(
[](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; });

// Merge the left,right sides together.
// The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...].
auto merged = left.merge(right);

std::atomic<bool> completed{false};

// Add subscription to see results
merged.subscribe(
[](long v) { printf("[3] OnNext: %ld\n", v); },
[&]() { printf("[3] OnCompleted:\n"); completed = true; });

// Start emitting
values.connect();

// Block until subscription terminates.
while (!completed) {}

// Note: consider using ref_count(other) in real code, it's more composable.

printf("//! [publish diamond bgthread sample]\n");
}

SCENARIO("publish diamond samethread sample"){
printf("//! [publish diamond samethread sample]\n");

/*
* Implements the following diamond graph chain with publish+connect diamond without using threads.
*
* Values
* / \
* *2 *100
* \ /
* Merge
*/

std::array<int, 5> a={{1, 2, 3, 4, 5}};
auto values = rxcpp::observable<>::iterate(a).
publish();

// Left side multiplies by 2.
auto left = values.map(
[](long v){printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2;} );

// Right side multiplies by 100.
auto right = values.map(
[](long v){printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100; });

// Merge the left,right sides together.
// The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...].
auto merged = left.merge(right);

// Add subscription to see results
merged.subscribe(
[](long v) { printf("[3] OnNext: %ld\n", v); },
[&]() { printf("[3] OnCompleted:\n"); });

// Start emitting
// - because there are no other threads here, the connect call blocks until the source
// calls on_completed.
values.connect();

// Note: consider using ref_count(other) in real code, it's more composable.

printf("//! [publish diamond samethread sample]\n");
}

// see also examples/doxygen/ref_count.cpp for more diamond examples
55 changes: 55 additions & 0 deletions Rx/v2/examples/doxygen/ref_count.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#include "rxcpp/rx.hpp"

#include "rxcpp/rx-test.hpp"
#include "catch.hpp"

#include <array>

SCENARIO("ref_count other diamond sample"){
printf("//! [ref_count other diamond sample]\n");

/*
* Implements the following diamond graph chain with publish+ref_count without using threads.
* This version is composable because it does not use connect explicitly.
*
* Values
* / \
* *2 *100
* \ /
* Merge
* |
* RefCount
*/

std::array<double, 5> a={{1.0, 2.0, 3.0, 4.0, 5.0}};
auto values = rxcpp::observable<>::iterate(a)
// The root of the chain is only subscribed to once.
.tap([](double v) { printf("[0] OnNext: %lf\n", v); })
.publish();

auto values_to_long = values.map([](double v) { return (long) v; });

// Left side multiplies by 2.
auto left = values_to_long.map(
[](long v) -> long {printf("[1] OnNext: %ld -> %ld\n", v, v*2); return v * 2L;} );

// Right side multiplies by 100.
auto right = values_to_long.map(
[](long v) -> long {printf("[2] OnNext: %ld -> %ld\n", v, v*100); return v * 100L; });

// Merge the left,right sides together.
// The items are emitted interleaved ... [left1, right1, left2, right2, left3, right3, ...].
auto merged = left.merge(right);

// When this value is subscribed to, it calls connect on values.
auto connect_on_subscribe = merged.ref_count(values);

// This immediately starts emitting all values and blocks until they are completed.
connect_on_subscribe.subscribe(
[](long v) { printf("[3] OnNext: %ld\n", v); },
[&]() { printf("[3] OnCompleted:\n"); });

printf("//! [ref_count other diamond sample]\n");
}

// see also examples/doxygen/publish.cpp for non-ref_count diamonds
12 changes: 12 additions & 0 deletions Rx/v2/src/rxcpp/operators/rx-publish.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@
\sample
\snippet publish.cpp publish behavior sample
\snippet output.txt publish behavior sample
\sample
\snippet publish.cpp publish diamond samethread sample
\snippet output.txt publish diamond samethread sample
\sample
\snippet publish.cpp publish diamond bgthread sample
\snippet output.txt publish diamond bgthread sample
\sample
\snippet ref_count.cpp ref_count other diamond sample
\snippet output.txt ref_count other diamond sample
*/

#if !defined(RXCPP_OPERATORS_RX_PUBLISH_HPP)
Expand Down
134 changes: 119 additions & 15 deletions Rx/v2/src/rxcpp/operators/rx-ref_count.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,26 @@

/*! \file rx-ref_count.hpp
\brief takes a connectable_observable source and uses a ref_count of the subscribers to control the connection to the published source.
The first subscription will cause a call to connect() and the last unsubscribe will unsubscribe the connection.
\brief Make some \c connectable_observable behave like an ordinary \c observable.
Uses a reference count of the subscribers to control the connection to the published observable.
\return An observable that emitting the items from its source.
The first subscription will cause a call to \c connect(), and the last \c unsubscribe will unsubscribe the connection.
There are 2 variants of the operator:
\li \c ref_count(): calls \c connect on the \c source \c connectable_observable.
\li \c ref_count(other): calls \c connect on the \c other \c connectable_observable.
\tparam ConnectableObservable the type of the \c other \c connectable_observable (optional)
\param other \c connectable_observable to call \c connect on (optional)
If \c other is omitted, then \c source is used instead (which must be a \c connectable_observable).
Otherwise, \c source can be a regular \c observable.
\return An \c observable that emits the items from its \c source.
\sample
\snippet ref_count.cpp ref_count other diamond sample
\snippet output.txt ref_count other diamond sample
*/

#if !defined(RXCPP_OPERATORS_RX_REF_COUNT_HPP)
Expand All @@ -30,29 +46,100 @@ struct ref_count_invalid : public rxo::operator_base<ref_count_invalid_arguments
};
template<class... AN>
using ref_count_invalid_t = typename ref_count_invalid<AN...>::type;

template<class T, class ConnectableObservable>

// ref_count(other) takes a regular observable source, not a connectable_observable.
// use template specialization to avoid instantiating 'subscribe' for two different types
// which would cause a compilation error.
template <typename connectable_type, typename observable_type>
struct ref_count_state_base {
ref_count_state_base(connectable_type other, observable_type source)
: connectable(std::move(other))
, subscribable(std::move(source)) {}

connectable_type connectable; // connects to this. subscribes to this if subscribable empty.
observable_type subscribable; // subscribes to this if non-empty.

template <typename Subscriber>
void subscribe(Subscriber&& o) {
subscribable.subscribe(std::forward<Subscriber>(o));
}
};

// Note: explicit specializations have to be at namespace scope prior to C++17.
template <typename connectable_type>
struct ref_count_state_base<connectable_type, void> {
explicit ref_count_state_base(connectable_type c)
: connectable(std::move(c)) {}

connectable_type connectable; // connects to this. subscribes to this if subscribable empty.

template <typename Subscriber>
void subscribe(Subscriber&& o) {
connectable.subscribe(std::forward<Subscriber>(o));
}
};

template<class T,
class ConnectableObservable,
class Observable = void> // note: type order flipped versus the operator.
struct ref_count : public operator_base<T>
{
typedef rxu::decay_t<ConnectableObservable> source_type;
typedef rxu::decay_t<Observable> observable_type;
typedef rxu::decay_t<ConnectableObservable> connectable_type;

struct ref_count_state : public std::enable_shared_from_this<ref_count_state>
// ref_count() == false
// ref_count(other) == true
using has_observable_t = rxu::negation<std::is_same<void, Observable>>;
static constexpr bool has_observable_v = has_observable_t::value;

struct ref_count_state : public std::enable_shared_from_this<ref_count_state>,
public ref_count_state_base<ConnectableObservable, Observable>
{
explicit ref_count_state(source_type o)
: source(std::move(o))
template <class HasObservable = has_observable_t,
class Enabled = rxu::enable_if_all_true_type_t<
rxu::negation<HasObservable>>>
explicit ref_count_state(connectable_type source)
: ref_count_state_base<ConnectableObservable, Observable>(std::move(source))
, subscribers(0)
{
}

template <bool HasObservableV = has_observable_v>
ref_count_state(connectable_type other,
typename std::enable_if<HasObservableV, observable_type>::type source)
: ref_count_state_base<ConnectableObservable, Observable>(std::move(other),
std::move(source))
, subscribers(0)
{
}

source_type source;
std::mutex lock;
long subscribers;
composite_subscription connection;
};
std::shared_ptr<ref_count_state> state;

explicit ref_count(source_type o)
: state(std::make_shared<ref_count_state>(std::move(o)))
// connectable_observable<T> source = ...;
// source.ref_count();
//
// calls connect on source after the subscribe on source.
template <class HasObservable = has_observable_t,
class Enabled = rxu::enable_if_all_true_type_t<
rxu::negation<HasObservable>>>
explicit ref_count(connectable_type source)
: state(std::make_shared<ref_count_state>(std::move(source)))
{
}

// connectable_observable<?> other = ...;
// observable<T> source = ...;
// source.ref_count(other);
//
// calls connect on 'other' after the subscribe on 'source'.
template <bool HasObservableV = has_observable_v>
ref_count(connectable_type other,
typename std::enable_if<HasObservableV, observable_type>::type source)
: state(std::make_shared<ref_count_state>(std::move(other), std::move(source)))
{
}

Expand All @@ -70,9 +157,9 @@ struct ref_count : public operator_base<T>
keepAlive->connection = composite_subscription();
}
});
keepAlive->source.subscribe(std::forward<Subscriber>(o));
keepAlive->subscribe(std::forward<Subscriber>(o));
if (needConnect) {
keepAlive->source.connect(keepAlive->connection);
keepAlive->connectable.connect(keepAlive->connection);
}
}
};
Expand Down Expand Up @@ -104,11 +191,28 @@ struct member_overload<ref_count_tag>
return Result(RefCount(std::forward<ConnectableObservable>(o)));
}

template<class Observable,
class ConnectableObservable,
class Enabled = rxu::enable_if_all_true_type_t<
is_observable<Observable>,
is_connectable_observable<ConnectableObservable>>,
class SourceValue = rxu::value_type_t<Observable>,
class RefCount = rxo::detail::ref_count<SourceValue,
rxu::decay_t<ConnectableObservable>,
rxu::decay_t<Observable>>,
class Value = rxu::value_type_t<RefCount>,
class Result = observable<Value, RefCount>
>
static Result member(Observable&& o, ConnectableObservable&& other) {
return Result(RefCount(std::forward<ConnectableObservable>(other),
std::forward<Observable>(o)));
}

template<class... AN>
static operators::detail::ref_count_invalid_t<AN...> member(AN...) {
std::terminate();
return {};
static_assert(sizeof...(AN) == 10000, "ref_count takes no arguments");
static_assert(sizeof...(AN) == 10000, "ref_count takes (optional ConnectableObservable)");
}
};

Expand Down
11 changes: 11 additions & 0 deletions Rx/v2/src/rxcpp/rx-observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,17 @@ class observable
static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
}

/*! @copydoc rx-ref_count.hpp
*/
template<class... AN>
auto ref_count(AN... an) const // ref_count(ConnectableObservable&&)
/// \cond SHOW_SERVICE_MEMBERS
-> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
/// \endcond
{
return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...);
}

/*! @copydoc rxcpp::operators::as_blocking
*/
template<class... AN>
Expand Down
Loading

0 comments on commit 77a5b21

Please sign in to comment.