-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Possible Bug: Retry does not work on an observable created using the Start operator #238
Comments
Could you rework this into a simple unit test and post it here (or even better, create a pull request)? |
@danielcweber Sure, will do at my earliest convenience. Thank you. |
Not a bug. Start returns a hot observable, so operators like Retry won't have an effect on it. It's pretty much the same as turning the synchronous function into a Task, except we use an observable to represent the outcome. |
Thanks, @bartdesmet. Why shouldn't it be possible to retry observing a hot sequence if an attempt to observe fails? Is that because hot sequences are thought of as being "out of the control of the observer" and as having potential side-effects? For e.g. a customer just deposited $100 into his bank account, and we'd created an observable out of this So retrying them simply because an observation (and not the event itself) failed could mean replaying unintended side-effects? In contrast, a cold observable is, by design, meant to be replayed for each subscriber, so retrying them is quite safe? I am now suddenly left wondering, when we retry, what are we actually retrying? Are we retrying:
In both cases, though, you're right, retrying a hot observable makes little sense. It's amazing the clarity you get when you try to articulate your ideas or a question about something. Just the writing it down made me look at the improbability of retrying a hot observable. |
@Sathyaish See if this helps: |
@Sathyaish, your intuitions are more or less correct, but allow me to add some detail nonetheless. Let's start by analyzing the simplest case of When we observe such behavior (or rather don't observe anything, ignoring the fact we'd have to wait forever), what can we conclude about the nature of the source sequence? Not much really... It could be cold or hot. Let's pretend the source is hot for a moment. Now the question becomes whether this source has ever produced a non-empty sequence and we tuned in too late (i.e. after So what about the case of arriving too late to the party and having missed all notifications, including terminal ones? If were to apply Wow, that's concerning, isn't it? Luckily many hot sequences, including Okay, so what if a hot resource does indeed have some memory to cover up the absence of notifications coming out after reaching a terminal state? Are we no longer "asynchronously deadlocked" when applying What this all boils down to is that one should know darn well about the behavior of the sequence one applies So, where's Another case where What happens here really? The source sent an Does the new incoming observer simply get added to the list of current observers? Was that list snapshotted prior to sending out notifications? If it wasn't, we'll receive the same Luckily, in Rx, we use a snapshot of the observers when sending out a notification (see However, this doesn't mean that Also, the hot sequence may exhibit asynchronous subscription behavior relative to the outgoing notifications, so the behavior of a All in all, if you don't know what you're retrying (which can be a long chain of composed operators hidden behind an |
Wow! :-) Thank you so much, @bartdesmet. To parse your reply, I implemented bits and pieces of it. I am pasting the code snippets here so they may serve as a reminder to me and to any other like myself who is trying to learn Rx.
Example 1 using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
namespace BartAnswersWhyCantRetryHot
{
class Program
{
// This program is meant to parse the commentary Bart de Smet
// made in response to my question, "Why doesn't Rx allow retrying of hot observables?"
// Reference: https://github.com/Reactive-Extensions/Rx.NET/issues/238#issuecomment-247894496
static void Main(string[] args)
{
ExternalSourceBasedHotWithNoStateAboutTermination();
Console.ReadKey();
}
static void ExternalSourceBasedHotWithNoStateAboutTermination()
{
var account = new BankAccount();
var i = 0;
Task.Run(() => { while (i++ < 100) account.Deposit(1); });
account.Subscribe(
v => WriteLine($"early: {v}"),
e => WriteLine($"early: {e.Message}"),
() => WriteLine("early done!"));
Thread.Sleep(6000);
account.Subscribe(v => WriteLine($"late (and unsafe because bad producer): {v}"),
e => WriteLine($"late (and unsafe because bad producer): {e.Message}"),
() => WriteLine("late (and unsafe because bad producer) done!"));
// If we leave the following 3 lines of code
// commented, the late (and unsafe) observer
// lingers on hopelessly while the producer's
// (BankAccount class') thread is long dead and
// has been retired to the thread pool.
// But if we uncomment these lines, we observe
// the bad behavior of our bad producer BankAccount class.
// It violates the Rx contract. It is not storing the termination
// state and reproducing it but is actually attempting to produce
// another value after it was previous terminated. But we don't get to know
// because:
// 1. The observer is unsafe once again because (2) below;
// 2. The producer BankAccount does not ensure safe observer
Thread.Sleep(2000);
Console.WriteLine("\nAnd much later...\n");
account.Deposit(1);
// In summary, I make the following conclusions of Bart's comments thus far:
// Since Rx has no control over producers of hot sequences, hot sequences
// cannot be reliably retried. Therefore, including Retry as a feature on
// hot observables would be something that works sometimes (if the author of the observable
// was mindful of creating safe observers and communicating termination state)
// and doesn't at others (idiot programmer). It would be best, therefore, to not
// have that feature.
}
}
// Hot observable linked to an external source
// and preserving no state about termination
// of the sequence. If we use an ISubject<T> here,
// it will tell late subscribers / observers of previous
// terminal state, so this one doesn't use any subjects.
public class BankAccount : IObservable<decimal>
{
private decimal _balance = 0m;
// Mimic subject, but poorly
private List<IObserver<decimal>> _observers = new List<IObserver<decimal>>();
public void Deposit(decimal amount)
{
try
{
if (_balance >= 5) throw new Exception("Oops!");
Thread.Sleep(1000);
_balance += amount;
Notify(_balance);
}
catch (Exception ex)
{
Notify(ex);
}
}
public void Withdraw(decimal amount) { /* Same as deposit() ... */ }
public IDisposable Subscribe(IObserver<decimal> observer)
{
if (observer == null) throw new ArgumentNullException("observer");
_observers.Add(observer);
return Disposable.Create(() => { });
}
protected virtual void Notify(decimal v)
{
_observers?.ForEach(o => o.OnNext(v));
}
protected virtual void Notify(Exception ex)
{
_observers?.ForEach(o => o.OnError(ex));
}
}
}
Example 2 using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
namespace BartAnswersWhyCantRetryHot
{
class Program
{
// This program is meant to parse the commentary Bart de Smet
// made in response to my question, "Why doesn't Rx allow retrying of hot observables?"
// Reference: https://github.com/Reactive-Extensions/Rx.NET/issues/238#issuecomment-247894496
static void Main(string[] args)
{
RetryExternalSourceBasedHotWithNoStateAboutTermination();
Console.ReadKey();
}
static void RetryExternalSourceBasedHotWithNoStateAboutTermination()
{
var account = new BankAccount();
var i = 0;
Task.Run(() => { while (i++ < 100) account.Deposit(1); });
account.Retry(3).Subscribe(
v => WriteLine($"early: {v}"),
e => WriteLine($"early: {e.Message}"),
() => WriteLine("early done!"));
Thread.Sleep(6000);
account.Retry(3).Subscribe(v => WriteLine($"late: {v}"),
e => WriteLine($"late: {e.Message}"),
() => WriteLine("late done!"));
}
}
// Hot observable linked to an external source
// and preserving no state about termination
// of the sequence. If we use an ISubject<T> here,
// it will tell late subscribers / observers of previous
// terminal state, so this one doesn't use any subjects.
public class BankAccount : IObservable<decimal>
{
private decimal _balance = 0m;
// Mimic subject, but poorly
private List<IObserver<decimal>> _observers = new List<IObserver<decimal>>();
public void Deposit(decimal amount)
{
try
{
if (_balance >= 5) throw new Exception("Oops!");
Thread.Sleep(1000);
_balance += amount;
Notify(_balance);
}
catch (Exception ex)
{
Notify(ex);
}
}
public void Withdraw(decimal amount) { /* Same as deposit() ... */ }
public IDisposable Subscribe(IObserver<decimal> observer)
{
if (observer == null) throw new ArgumentNullException("observer");
_observers.Add(observer);
return Disposable.Create(() => { });
}
protected virtual void Notify(decimal v)
{
_observers?.ForEach(o => o.OnNext(v));
}
protected virtual void Notify(Exception ex)
{
_observers?.ForEach(o => o.OnError(ex));
}
}
}
From System.Reactive.Subjects.Subject::Subscribe(IObserver) var done = oldObserver as DoneObserver<T>;
if (done != null)
{
observer.OnError(done.Exception);
return Disposable.Empty;
} For a demo, see the example Example 3 below. Example 3 using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using static System.Console;
namespace BartAnswersWhyCantRetryHot
{
class Program
{
// This program is meant to parse the commentary Bart de Smet
// made in response to my question, "Why doesn't Rx allow retrying of hot observables?"
// Reference: https://github.com/Reactive-Extensions/Rx.NET/issues/238#issuecomment-247894496
static void Main(string[] args)
{
Console.ReadKey();
}
static void SubjectBasedHotSoTerminalNotificationPreservedForLateComers()
{
var hot = new Hot();
hot.Subscribe(
v => WriteLine($"early: {v}"),
e => WriteLine($"early: {e.Message}"),
() => WriteLine("early done!"));
Thread.Sleep(6000);
hot.Subscribe(v => WriteLine($"late: {v}"),
e => WriteLine($"late: {e.Message}"),
() => WriteLine("late done!"));
}
static void RetrySubjectBasedHotSoTerminalNotificationPreservedForLateComers()
{
var hot = new Hot();
hot.Retry(3).Subscribe(
v => WriteLine($"early: {v}"),
e => WriteLine($"early: {e.Message}"),
() => WriteLine("early done!"));
Thread.Sleep(6000);
hot.Retry(3).Subscribe(v => WriteLine($"late: {v}"),
e => WriteLine($"late: {e.Message}"),
() => WriteLine("late done!"));
}
}
class Hot : IObservable<int>
{
private ISubject<int> _subject = new Subject<int>();
public Hot()
{
var source = Observable.Generate(1,
i => i <= 100,
i => ++i,
i => { if (i == 5) throw new Exception("Oops!"); else return i; },
i => TimeSpan.FromSeconds(1));
source.Subscribe(_subject);
}
public IDisposable Subscribe(IObserver<int> observer)
{
return _subject.Subscribe(observer);
}
}
}
For a demo of the concept outlined in the preceding paragraph, please see Example 1 in this post.
Example 4 using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using static System.Console;
namespace BartAnswersWhyCantRetryHot
{
class Program
{
// This program is meant to parse the commentary Bart de Smet
// made in response to my question, "Why doesn't Rx allow retrying of hot observables?"
// Reference: https://github.com/Reactive-Extensions/Rx.NET/issues/238#issuecomment-247894496
static void Main(string[] args)
{
ExternalSourceHotThatKeepsTerminationState();
Console.ReadKey();
}
static void ExternalSourceHotThatKeepsTerminationState()
{
var school = new School();
int i = 0;
Task.Run(() => { while (++i <= 100) school.Admit(new Student(i)); });
school.Subscribe(
v => WriteLine($"early: {v}"),
e => WriteLine($"early: {e.Message}"),
() => WriteLine("early done!"));
Thread.Sleep(6000);
school.Subscribe(v => WriteLine($"late: {v}"),
e => WriteLine($"late: {e.Message}"),
() => WriteLine("late done!"));
// This should report back the termination state
// with the last exception without attempting to produce
// another value. Let's see.
// And it does the right thing.
Thread.Sleep(2000);
Console.WriteLine("And much later...");
school.Admit(new Student(100));
}
static void RetryExternalSourceHotThatKeepsTerminationState()
{
var school = new School();
int i = 0;
// I am guessing that here again, you can't retry producing
// values out of this hot observable School because the code
// that actually produces these values is external to the producer.
// Or, one may say that the producer itself is outside of the control
// of Rx, as is evident with the line of code below.
// There is no way for the Retry operator to know how to re-generate
// the values for the School object previously generated or to
// even generate new ones.
// But theoretically, if it was possible, or if such a producer
// were to internally cache old values and spit them out or were
// to use one of the subjects that cached values (ReplaySubject)
// then retrying this operation would result, as Bart correctly
// suggests, in an infinite loop of "recieving an exception and
// retrying the operation only to receive the same exception again."
Task.Run(() => { while (++i <= 100) school.Admit(new Student(i)); });
school.Retry(3).Subscribe(
v => WriteLine($"early: {v}"),
e => WriteLine($"early: {e.Message}"),
() => WriteLine("early done!"));
Thread.Sleep(6000);
school.Retry(3).Subscribe(v => WriteLine($"late: {v}"),
e => WriteLine($"late: {e.Message}"),
() => WriteLine("late done!"));
}
}
public class Student
{
public Student(int rollNumber)
{
RollNumber = rollNumber;
}
public int RollNumber { get; set; }
public override string ToString()
{
return RollNumber.ToString();
}
}
public class School : IObservable<Student>
{
// Mimic subject that communicates termination state
// and therefore ensures safe observers
private List<IObserver<Student>> _observers = new List<IObserver<Student>>();
private List<Student> _students = new List<Student>();
private bool _erred = false;
private Exception _lastException = null;
public void Admit(Student student)
{
try
{
if (_erred)
{
Notify(_lastException);
return;
}
if (student.RollNumber >= 5) throw new Exception("Oops!");
Thread.Sleep(1000);
_students.Add(student);
Notify(student);
}
catch (Exception ex)
{
_lastException = ex;
Notify(ex);
}
}
public IDisposable Subscribe(IObserver<Student> observer)
{
if (observer == null) throw new ArgumentNullException("observer");
_observers.Add(observer);
return Disposable.Create(() => { });
}
protected virtual void Notify(Student v)
{
_observers?.ForEach(o => o.OnNext(v));
}
protected virtual void Notify(Exception ex)
{
_observers?.ForEach(o => o.OnError(ex));
}
}
}
Thanks much much, @RxDave. I'll be reading the article you linked to soon and following-up on this thread if I have more related questions. |
@bartdesmet I've rummaged through the rest of your reply a few times but haven't paid it its full consideration. I will do that later and post any follow-up questions or a summary then. Many thanks once again. :-) |
I think I am having trouble that is similar and am not sure how to work around it. _displayOnPage = Observable.Defer(() => CreateSequenceWithSideEffectsThatChangeUI())
.Publish()
.RefCount();
...
public IObservable<Unit> Display()
{
return _displayOnPage
}
.. else where
Display().Subscribe()
....
await Display(); This runs just fine the first time. But then on subsequenct subscribes the subscription immediately receives the OnComplete before the defer action is even executed. What I would like is to share the published stream until everyone unsubscribes(so the side effects aren't duplicated) and then if someone subscribes to it again it should reevaluate the original sequence. Is that supposed to happen? If not how can I accomplish that? I've found similar issues for RXJS and RXPy which is even linked. |
Maybe the termination of the main source is racing with the refCount operator so that the observer gets in there only to receive the terminal event. |
I think it is because publish just uses a subject without any extra reuse logic. And Subjects aren't valid once they issue an onerror or oncompleted. Unless I misunderstand where publish is actually implemented. |
Running the following code snippet only tries the
DownloadString
operation once and gives up on the first failure where it should have retried 3 times before reporting failure.Please note, however, that it works just fine without using the
Start
operator, as shown below.Also note that it works just fine when not using the
Start
operator even if I were to schedule the work on the default pool scheduler, as the code snippet below shows:The text was updated successfully, but these errors were encountered: