Skip to content

Commit

Permalink
Introduce a common base class for the single values, and resolve some…
Browse files Browse the repository at this point in the history
… rebase conflicts.
  • Loading branch information
quinmars committed Jul 6, 2018
1 parent 321fe1a commit 1c43c5c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 48 deletions.
70 changes: 24 additions & 46 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ internal interface IAppendPrepend : IObservable<TSource>
IScheduler Scheduler { get; }
}

internal sealed class SingleValue : Producer<TSource, SingleValue._>, IAppendPrepend
internal abstract class SingleBase<TSink> : Producer<TSource, TSink>, IAppendPrepend
where TSink : IDisposable
{
private readonly IObservable<TSource> _source;
private readonly TSource _value;
private readonly bool _append;
protected readonly IObservable<TSource> _source;
protected readonly TSource _value;
protected readonly bool _append;

public IScheduler Scheduler { get; }
public abstract IScheduler Scheduler { get; }

public SingleValue(IObservable<TSource> source, TSource value, IScheduler scheduler, bool append)
public SingleBase(IObservable<TSource> source, TSource value, bool append)
{
_source = source;
_value = value;
_append = append;
Scheduler = scheduler;
}

public IAppendPrepend Append(TSource value)
Expand Down Expand Up @@ -79,6 +79,18 @@ private IAppendPrepend CreateAppendPrepend(Node<TSource> prepend, Node<TSource>

return new Recursive(_source, prepend, append, Scheduler);
}
}


internal sealed class SingleValue : SingleBase<SingleValue._>
{
public override IScheduler Scheduler { get; }

public SingleValue(IObservable<TSource> source, TSource value, IScheduler scheduler, bool append)
: base (source, value, append)
{
Scheduler = scheduler;
}

protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);

Expand Down Expand Up @@ -479,47 +491,13 @@ public T[] ToReverseArray()
}
}

internal sealed class AppendPrependSingleImmediate<TSource> : Producer<TSource, AppendPrependSingleImmediate<TSource>._>, IAppendPrepend<TSource>
internal sealed class SingleImmediate : SingleBase<SingleImmediate._>
{
private readonly IObservable<TSource> _source;
private readonly TSource _value;
private readonly bool _append;

public IScheduler Scheduler { get { return ImmediateScheduler.Instance; } }

public AppendPrependSingleImmediate(IObservable<TSource> source, TSource value, bool append)
{
_source = source;
_value = value;
_append = append;
}

public IAppendPrepend<TSource> Append(TSource value)
{
var prev = new Node<TSource>(_value);

if (_append)
{
return new AppendPrependMultiple<TSource>(_source,
null, new Node<TSource>(prev, value), Scheduler);
}

return new AppendPrependMultiple<TSource>(_source,
prev, new Node<TSource>(value), Scheduler);
}
public override IScheduler Scheduler => ImmediateScheduler.Instance;

public IAppendPrepend<TSource> Prepend(TSource value)
public SingleImmediate(IObservable<TSource> source, TSource value, bool append)
: base(source, value, append)
{
var prev = new Node<TSource>(_value);

if (_append)
{
return new AppendPrependMultiple<TSource>(_source,
new Node<TSource>(value), prev, Scheduler);
}

return new AppendPrependMultiple<TSource>(_source,
new Node<TSource>(prev, value), null, Scheduler);
}

protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
Expand All @@ -532,7 +510,7 @@ internal sealed class _ : IdentitySink<TSource>
private readonly TSource _value;
private readonly bool _append;

public _(AppendPrependSingleImmediate<TSource> parent, IObserver<TSource> observer)
public _(SingleImmediate parent, IObserver<TSource> observer)
: base(observer)
{
_source = parent._source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private static IObservable<TSource> Append_<TSource>(IObservable<TSource> source
}
if (scheduler == ImmediateScheduler.Instance)
{
return new AppendPrepend<TSource>.SingleImmediate<TSource>(source, value, true);
return new AppendPrepend<TSource>.SingleImmediate(source, value, append: true);
}
return new AppendPrepend<TSource>.SingleValue(source, value, scheduler, append: true);
}
Expand Down Expand Up @@ -213,7 +213,7 @@ private static IObservable<TSource> Prepend_<TSource>(IObservable<TSource> sourc

if (scheduler == ImmediateScheduler.Instance)
{
return new AppendPrepend.AppendPrependSingleImmediate<TSource>(source, value, false);
return new AppendPrepend<TSource>.SingleImmediate(source, value, append: false);
}

return new AppendPrepend<TSource>.SingleValue(source, value, scheduler, append: false);
Expand Down

0 comments on commit 1c43c5c

Please sign in to comment.