Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Deanonymize operators #549

Merged
merged 3 commits into from
May 30, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,22 @@ public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> sou
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));

return new AnonymousObservable<TSource>(observer =>
return new SubscribeOnObservable<TSource>(source, scheduler);
}

sealed class SubscribeOnObservable<TSource> : ObservableBase<TSource>
{
readonly IObservable<TSource> source;

readonly IScheduler scheduler;

public SubscribeOnObservable(IObservable<TSource> source, IScheduler scheduler)
{
this.source = source;
this.scheduler = scheduler;
}

protected override IDisposable SubscribeCore(IObserver<TSource> observer)
{
var m = new SingleAssignmentDisposable();
var d = new SerialDisposable();
Expand All @@ -47,7 +62,7 @@ public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> sou
});

return d;
});
}
}

/// <summary>
Expand All @@ -69,7 +84,22 @@ public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> sou
if (context == null)
throw new ArgumentNullException(nameof(context));

return new AnonymousObservable<TSource>(observer =>
return new SubscribeOnCtxObservable<TSource>(source, context);
}

sealed class SubscribeOnCtxObservable<TSource> : ObservableBase<TSource>
{
readonly IObservable<TSource> source;

readonly SynchronizationContext context;

public SubscribeOnCtxObservable(IObservable<TSource> source, SynchronizationContext context)
{
this.source = source;
this.context = context;
}

protected override IDisposable SubscribeCore(IObserver<TSource> observer)
{
var subscription = new SingleAssignmentDisposable();
context.PostWithStartComplete(() =>
Expand All @@ -80,7 +110,7 @@ public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> sou
}
});
return subscription;
});
}
}

#endregion
Expand Down
28 changes: 18 additions & 10 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -725,14 +725,29 @@ private IObservable<TSource> StartAsyncImpl<TSource>(Func<CancellationToken, Tas
result = task.ToObservable();
}

return new AnonymousObservable<TSource>(observer =>
return new StartAsyncObservable<TSource>(cancellable, result);
}

sealed class StartAsyncObservable<TSource> : ObservableBase<TSource>
{
readonly CancellationDisposable cancellable;

readonly IObservable<TSource> result;

public StartAsyncObservable(CancellationDisposable cancellable, IObservable<TSource> result)
{
this.cancellable = cancellable;
this.result = result;
}

protected override IDisposable SubscribeCore(IObserver<TSource> observer)
{
//
// [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
//
var subscription = result.Subscribe/*Unsafe*/(observer);
return StableCompositeDisposable.Create(cancellable, subscription);
});
}
}

#endregion
Expand Down Expand Up @@ -816,14 +831,7 @@ private IObservable<Unit> StartAsyncImpl(Func<CancellationToken, Task> actionAsy
result = task.ToObservable();
}

return new AnonymousObservable<Unit>(observer =>
{
//
// [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
//
var subscription = result.Subscribe/*Unsafe*/(observer);
return StableCompositeDisposable.Create(cancellable, subscription);
});
return new StartAsyncObservable<Unit>(cancellable, result);
}

#endregion
Expand Down
196 changes: 178 additions & 18 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,43 @@ internal partial class QueryLanguage

public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
return new AnonymousObservable<TSource>(subscribe);
return new CreateWithDisposableObservable<TSource>(subscribe);
}

sealed class CreateWithDisposableObservable<TSource> : ObservableBase<TSource>
{
readonly Func<IObserver<TSource>, IDisposable> subscribe;

public CreateWithDisposableObservable(Func<IObserver<TSource>, IDisposable> subscribe)
{
this.subscribe = subscribe;
}

protected override IDisposable SubscribeCore(IObserver<TSource> observer)
{
return subscribe(observer) ?? Disposable.Empty;
}
}

public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe)
{
return new AnonymousObservable<TSource>(o =>
return new CreateWithActionDisposable<TSource>(subscribe);
}

sealed class CreateWithActionDisposable<TSource> : ObservableBase<TSource>
{
readonly Func<IObserver<TSource>, Action> subscribe;

public CreateWithActionDisposable(Func<IObserver<TSource>, Action> subscribe)
{
var a = subscribe(o);
this.subscribe = subscribe;
}

protected override IDisposable SubscribeCore(IObserver<TSource> observer)
{
var a = subscribe(observer);
return a != null ? Disposable.Create(a) : Disposable.Empty;
});
}
}

#endregion
Expand All @@ -38,16 +65,53 @@ public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Act

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
{
return new AnonymousObservable<TResult>(observer =>
return new CreateWithTaskTokenObservable<TResult>(subscribeAsync);
}

sealed class CreateWithTaskTokenObservable<TResult> : ObservableBase<TResult>
{
readonly Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync;

public CreateWithTaskTokenObservable(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
{
this.subscribeAsync = subscribeAsync;
}

protected override IDisposable SubscribeCore(IObserver<TResult> observer)
{
var cancellable = new CancellationDisposable();

var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
var taskCompletionObserver = new AnonymousObserver<Unit>(Stubs<Unit>.Ignore, observer.OnError, observer.OnCompleted);
var taskCompletionObserver = new TaskCompletionObserver(observer);
var subscription = taskObservable.Subscribe(taskCompletionObserver);

return StableCompositeDisposable.Create(cancellable, subscription);
});
}

sealed class TaskCompletionObserver : IObserver<Unit>
{
readonly IObserver<TResult> observer;

public TaskCompletionObserver(IObserver<TResult> observer)
{
this.observer = observer;
}

public void OnCompleted()
{
observer.OnCompleted();
}

public void OnError(Exception error)
{
observer.OnError(error);
}

public void OnNext(Unit value)
{
// deliberately ignored
}
}
}

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task> subscribeAsync)
Expand All @@ -57,22 +121,69 @@ public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Tas

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
{
return new AnonymousObservable<TResult>(observer =>
return new CreateWithTaskDisposable<TResult>(subscribeAsync);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code reuse or inheritance or something similar?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? I've mentioned the benefits of these changes in the PR text.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean those IObserver-implementors that are duplicated. There is a good chance for code reuse I guess.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which other implementation are you refering to?

Copy link
Collaborator Author

@akarnokd akarnokd May 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync
Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync

Action != IDisposable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A common base class might be possible, but nevermind, it's good as it is.

}

sealed class CreateWithTaskDisposable<TResult> : ObservableBase<TResult>
{
readonly Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync;

public CreateWithTaskDisposable(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
{
this.subscribeAsync = subscribeAsync;
}

protected override IDisposable SubscribeCore(IObserver<TResult> observer)
{
var subscription = new SingleAssignmentDisposable();
var cancellable = new CancellationDisposable();

var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
var taskCompletionObserver = new AnonymousObserver<IDisposable>(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop);

var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);

//
// We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
taskObservable.Subscribe(taskCompletionObserver);

return StableCompositeDisposable.Create(cancellable, subscription);
});
return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
}

sealed class TaskDisposeCompletionObserver : IObserver<IDisposable>, IDisposable
{
readonly IObserver<TResult> observer;

IDisposable disposable;

public TaskDisposeCompletionObserver(IObserver<TResult> observer)
{
this.observer = observer;
}

public void Dispose()
{
Interlocked.Exchange(ref disposable, BooleanDisposable.True)?.Dispose();
}

public void OnCompleted()
{
observer.OnCompleted();
}

public void OnError(Exception error)
{
observer.OnError(error);
}

public void OnNext(IDisposable value)
{
if (Interlocked.CompareExchange(ref disposable, value, null) != null)
{
value?.Dispose();
}
}
}
}

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<IDisposable>> subscribeAsync)
Expand All @@ -82,22 +193,71 @@ public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Tas

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
{
return new AnonymousObservable<TResult>(observer =>
return new CreateWithTaskActionObservable<TResult>(subscribeAsync);
}

sealed class CreateWithTaskActionObservable<TResult> : ObservableBase<TResult>
{
readonly Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync;

public CreateWithTaskActionObservable(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
{
this.subscribeAsync = subscribeAsync;
}

protected override IDisposable SubscribeCore(IObserver<TResult> observer)
{
var subscription = new SingleAssignmentDisposable();
var cancellable = new CancellationDisposable();

var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
var taskCompletionObserver = new AnonymousObserver<Action>(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop);

var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);

//
// We don't cancel the subscription below *ever* and want to make sure the returned resource eventually gets disposed.
// We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
// Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
//
taskObservable.Subscribe(taskCompletionObserver);

return StableCompositeDisposable.Create(cancellable, subscription);
});
return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
}

sealed class TaskDisposeCompletionObserver : IObserver<Action>, IDisposable
{
readonly IObserver<TResult> observer;

Action disposable;

static readonly Action DisposedAction = () => { };

public TaskDisposeCompletionObserver(IObserver<TResult> observer)
{
this.observer = observer;
}

public void Dispose()
{
Interlocked.Exchange(ref disposable, DisposedAction)?.Invoke();
}

public void OnCompleted()
{
observer.OnCompleted();
}

public void OnError(Exception error)
{
observer.OnError(error);
}

public void OnNext(Action value)
{
if (Interlocked.CompareExchange(ref disposable, value, null) != null)
{
value?.Invoke();
}
}
}
}

public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<Action>> subscribeAsync)
Expand Down
Loading