Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dedicated class based implementations for Append and Prepend. #600

Merged
merged 13 commits into from
Jun 26, 2018

Conversation

quinmars
Copy link
Contributor

Here is my attempt to optimize the Append and Prepend operators. Please review it carefully, I'm not sure if I got the handling of the IDisposables right.


protected override void Run(_ sink) => sink.Run(_source);

internal sealed class _ : IdentitySink<TSource>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we abandoned these underscore class names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you, but that name is used in most - if not all - operators I have seen.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely desirable, maybe in a later PR.

private static IDisposable ForwardValue(IScheduler scheduler, _ sink)
{
sink.ForwardOnNext(sink._value);
return scheduler.Schedule(sink, ForwardOnCompleted);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to schedule the OnCompleted signal separately?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that was something I tried to ask in #567. The conclusion was to do it like ToObservable/StartWith do it. And ToObservable does schedule the OnCompleted separately.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the separate schedule here.

private static IDisposable ForwardOnCompleted(IScheduler _unused, _ sink)
{
sink.ForwardOnCompleted();
return BooleanDisposable.True;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not really be returned by operators and is supposed to be confined to Disposable methods. Return Disposable.Empty instead.

protected override void Dispose(bool val)
{
base.Dispose(val);
Disposable.TryDispose(ref _schedulerDisposable);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we do this as if (disposing) { dispose custom resources } base.Dispose(disposing).

@akarnokd
Copy link
Collaborator

One thing to consider is that Append invites the following pattern:

IObservable<int> someSource = Observable.Return(0);
for (int i = 1; i < 1000; i++) {
   someSource = someSource.Append(i);
}

someSource.Subscribe(Console.WriteLine);

Does this still work with your modifications?

@quinmars
Copy link
Contributor Author

It is tested for three consequtive Appends in Append_Many_Scheduler , but I haven't added any optimizations as it is done in CoreFX for Enumerable.Append, i.e., merging consequtive operators to one, instead of using the naive O(n²) implementation. I can't even say if it becomes quadratic for observables or not.


internal sealed class _ : IdentitySink<TSource>
{
private readonly IObservable<TSource> _source;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is unused.

@akarnokd
Copy link
Collaborator

I'm worried about the stacktrace depth I get:

image

image

@akarnokd
Copy link
Collaborator

TailRecursiveSink in Concat flattens out the deep structure so it is not a problem there. When this PR is merged, I can add the changes so that a chain of append is coalesced into one Concat(source, items).

@quinmars
Copy link
Contributor Author

We could go the same route, as the corefx team has done it for the enumerable counter part [1], to optimize consecutive Appends and Prepends. That'd be:

  1. Collect all appends and prepends in an AppendPrependObservable class
  2. For the observing part: a sink that consist roughly of a IdentitySink that has a ToObservable.Sink starting part and a ToObservable.Sink ending part.

If we do that or anything else in that direction, we should propably skip the extra scheduling of the OnCompleted, at least between two Appends/Prepends.

I can do that, but that'd take some time.

[1] https://github.com/dotnet/corefx/blob/master/src/System.Linq/src/System/Linq/AppendPrepend.cs

@akarnokd
Copy link
Collaborator

It is certainly more interesting and challenging doing this type of "operator fusion". I leave it up to you if you want to tackle it.

@quinmars
Copy link
Contributor Author

I'd give it a try than, if that's ok for @danielcweber as well.

@danielcweber
Copy link
Collaborator

Sure, go ahead.

@quinmars quinmars changed the title Add dedicated class based implementations for Append and Prepend. [WIP] Add dedicated class based implementations for Append and Prepend. Jun 18, 2018
@danielcweber
Copy link
Collaborator

Looks good. Is there still something missing to make the WIP go away?

@quinmars quinmars changed the title [WIP] Add dedicated class based implementations for Append and Prepend. Add dedicated class based implementations for Append and Prepend. Jun 21, 2018
@quinmars
Copy link
Contributor Author

Not really. I first thought I put the optimizations, I'm working on, into this PR. But I think it is a good point to merge. I'll put the next commits in a new PR.

@quinmars
Copy link
Contributor Author

@akarnokd now the stacktrace should be much smaller, hopefully. 😄

@quinmars
Copy link
Contributor Author

@danielcweber, I changed my mind and put the changes into this PR. Let me know if I can help you during the review.

@danielcweber
Copy link
Collaborator

danielcweber commented Jun 22, 2018

Unfortunately, I get 4 failing tests with NullReferenceExceptions locally:

Append_Many, Append_Many_Scheduler, Append_Many_Take, Append_Many_Take_Scheduler.

Result StackTrace:	
bei System.Reactive.Linq.ObservableImpl.AppendPrepend.AppendPrependMultiple`1._.LoopRec(State state, Action`1 recurse)
   bei System.Reactive.Concurrency.Scheduler.InvokeRec1State`1.InvokeFirst(TState state)
   bei System.Reactive.Concurrency.Scheduler.InvokeRec1[TState](IScheduler scheduler, ValueTuple`2 tuple)
   bei System.Reactive.Concurrency.Scheduler.<>c__48`1.<Schedule>b__48_0(IScheduler s, ValueTuple`2 p)
   bei System.Reactive.Concurrency.ImmediateScheduler.Schedule[TState](TState state, Func`3 action)
   bei System.Reactive.Concurrency.Scheduler.Schedule[TState](IScheduler scheduler, TState state, Action`2 action)
   bei System.Reactive.Linq.ObservableImpl.AppendPrepend.AppendPrependMultiple`1._.Schedule(TSource[] array, Action`1 continueWith)
   bei System.Reactive.Linq.ObservableImpl.AppendPrepend.AppendPrependMultiple`1._.Run()
   bei System.Reactive.Linq.ObservableImpl.AppendPrepend.AppendPrependMultiple`1.Run(_ sink)
   bei System.Reactive.Producer`2.<>c.<SubscribeRaw>b__1_0(ValueTuple`2 tuple)
   bei System.Reactive.Concurrency.Scheduler.<>c__74`1.<ScheduleAction>b__74_0(IScheduler _, ValueTuple`2 tuple)
   bei System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   bei System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
   bei System.Reactive.Concurrency.Scheduler.ScheduleAction[TState](IScheduler scheduler, TState state, Action`1 action)
   bei System.Reactive.Producer`2.SubscribeRaw(IObserver`1 observer, Boolean enableSafeguard)
   bei System.Reactive.Producer`2.Subscribe(IObserver`1 observer)
   bei Microsoft.Reactive.Testing.TestScheduler.<>c__DisplayClass4_0`1.<Start>b__1(IScheduler scheduler, Object state)
   bei System.Reactive.Concurrency.VirtualTimeScheduler`2.<>c__DisplayClass4_0`1.<ScheduleAbsolute>b__0(IScheduler scheduler, TState state1)
   bei System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
   bei System.Reactive.Concurrency.ScheduledItem`1.Invoke()
   bei System.Reactive.Concurrency.VirtualTimeSchedulerBase`2.Start()
   bei Microsoft.Reactive.Testing.TestScheduler.Start[T](Func`1 create, Int64 created, Int64 subscribed, Int64 disposed)
   bei Microsoft.Reactive.Testing.TestScheduler.Start[T](Func`1 create)
   bei ReactiveTests.Tests.AppendTest.Append_Many()
Result Message:	System.NullReferenceException : Der Objektverweis wurde nicht auf eine Objektinstanz festgelegt.

}
}

private sealed class State
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace this class by the use of C#7 ValueTuples to have less allocations.

@quinmars
Copy link
Contributor Author

Sorry, I was pretty sure that I did had run all tests the day before. I'll fix that.

@quinmars
Copy link
Contributor Author

Man, in that commit was nearly everything wrong that could go wrong, sorry for that. I hope it's now ok. At least all tests pass. I changed the class to be a struct like you did in #644.

if (_prepends != null)
{
var disposable = Schedule(_prepends, s => s.SetUpstream(s._source.SubscribeSafe(s)));
Disposable.TrySetSerial(ref _schedulerDisposable, disposable);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could indeed lead to trouble, in case the subscription (one line above) will complete synchronously, you will effetively be overwriting the scheduling from OnCompletd, thus cancelling that work. TrySetSingle should be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to TrySetSingle. Should I than use two different backing fields for the two schedulers? Currently, they share one field variable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the one field should be fine, it's just the TrySetSingle should only be on the first scheduling. Now you're using it on both, which is not what I meant. In fact, if you change both to SetSingle instead of TrySetSingle, you will probably notice one of the tests throwing. The second TrySetSingle will fail in the case that we have prepends and appends. So we're currently leaking the second scheduling.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second time you schedule something, you want it to overwrite the previous scheduling, which is why you want to use TrySetSerial. The first time you schedule, however, you want to either set the disposable for the very first time or not set it at all. Because, if the scheduled work executes synchronously, the second scheduling will take place (successfully through TrySetSerial), and by the time Schedule returns, we definitely don't want to overwrite that second scheduling, which is why we want to use TrySetSingle.

_scheduler = parent.Scheduler;

if (parent._prepends != null)
_prepends = parent._prepends.ToArray();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we avoid the creation of this array by simply walking the node structure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we could. I haven't done it because we then have to duplicate the two loops. It's a tradeoff between code reuse and one allocation.

_prepends = parent._prepends.ToArray();

if (parent._appends != null)
_appends = parent._appends.ToReverseArray();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This array creation could be avoided as well if the appends where in a doubly-linked node chain. In that case, it would be enough to traverse the chain once to find the very first value and then walking backwards.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beware, by doubly linking the nodes, we can't pass around the original observable once we appended/prepended to it, or even append/prepend something to the original observable. The instances we get from appying operators should be, in a way, immutable. We would get subtle side effects here, if I got your idea correctly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about getting the immutable collections back in (#222), then it's just a stack and a queue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instances we get from appying operators should be, in a way, immutable.

Indeed, the _appends should be an immutable list. I was somehow thinking it being a copy-on-write list (i.e., ImmutableList) which requires way more allocation and copying. My mistake.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we manage to mitigate the performance degradation that has been reported in #222, immutable collections could be helpful in a lot of places. It's used internally in the Roslyn compiler since it's highly concurrent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation has a time complexity of O(n) and needs n + 1 allocations. I doubt the immutable collections offer a datatype that can beat that.

Copy link
Collaborator

@danielcweber danielcweber Jun 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a node to the lists as they are currently implemented is even in O(1). Yes, even immutable collections won't beat that. Maybe they'll find their way into the project eventually since they would be useful in the Subjects, but until that, this solution is perfectly fine.

@danielcweber danielcweber merged commit 738e3c9 into dotnet:master Jun 26, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants