Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scheduler and concurrency control operators #1082

Merged
merged 4 commits into from
Oct 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Bonsai.Core/Bonsai.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
</PropertyGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net462'">
<PackageReference Include="Rx-Linq" Version="2.2.5" />
<PackageReference Include="Rx-PlatformServices" Version="2.2.5" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0' or '$(TargetFramework)' == 'net6.0'">
<PackageReference Include="System.Reactive" Version="5.0.0" />
Expand Down
28 changes: 28 additions & 0 deletions Bonsai.Core/Reactive/Concurrency/CurrentThreadScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.ComponentModel;
using System.Reactive.Linq;
using Rx = System.Reactive.Concurrency;

namespace Bonsai.Reactive
{
/// <summary>
/// Represents an operator that returns an object that schedules units of work
/// on the current thread.
/// </summary>
[Description("Returns an object that schedules units of work on the current thread.")]
public sealed class CurrentThreadScheduler : Source<Rx.CurrentThreadScheduler>
{
/// <summary>
/// Generates an observable sequence that returns the singleton
/// <see cref="Rx.CurrentThreadScheduler"/> object.
/// </summary>
/// <returns>
/// A sequence containing the singleton <see cref="Rx.CurrentThreadScheduler"/>
/// object.
/// </returns>
public override IObservable<Rx.CurrentThreadScheduler> Generate()
{
return Observable.Return(Rx.CurrentThreadScheduler.Instance);
}
}
}
28 changes: 28 additions & 0 deletions Bonsai.Core/Reactive/Concurrency/DefaultScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.ComponentModel;
using System.Reactive.Linq;
using Rx = System.Reactive.Concurrency;

namespace Bonsai.Reactive
{
/// <summary>
/// Represents an operator that returns an object that schedules units of work
/// on the platform's default scheduler.
/// </summary>
[Description("Returns an object that schedules units of work on the platform's default scheduler.")]
public sealed class DefaultScheduler : Source<Rx.DefaultScheduler>
{
/// <summary>
/// Generates an observable sequence that returns the singleton
/// <see cref="Rx.DefaultScheduler"/> object.
/// </summary>
/// <returns>
/// A sequence containing the singleton <see cref="Rx.DefaultScheduler"/>
/// object.
/// </returns>
public override IObservable<Rx.DefaultScheduler> Generate()
{
return Observable.Return(Rx.DefaultScheduler.Instance);
}
}
}
27 changes: 27 additions & 0 deletions Bonsai.Core/Reactive/Concurrency/EventLoopScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.ComponentModel;
using System.Reactive.Linq;
using Rx = System.Reactive.Concurrency;

namespace Bonsai.Reactive
{
/// <summary>
/// Represents an operator that creates an object that schedules units of work
/// on a single dedicated thread.
/// </summary>
[Description("Creates an object that schedules units of work on a single dedicated thread.")]
public sealed class EventLoopScheduler : Source<Rx.EventLoopScheduler>
{
/// <summary>
/// Generates an observable sequence that returns a new
/// <see cref="Rx.EventLoopScheduler"/> object.
/// </summary>
/// <returns>
/// A sequence containing the created <see cref="Rx.EventLoopScheduler"/> object.
/// </returns>
public override IObservable<Rx.EventLoopScheduler> Generate()
{
return Observable.Defer(() => Observable.Return(new Rx.EventLoopScheduler()));
}
}
}
28 changes: 28 additions & 0 deletions Bonsai.Core/Reactive/Concurrency/ImmediateScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.ComponentModel;
using System.Reactive.Linq;
using Rx = System.Reactive.Concurrency;

namespace Bonsai.Reactive
{
/// <summary>
/// Represents an operator that returns an object that schedules units of work
/// to run immediately on the current thread.
/// </summary>
[Description("Returns an object that schedules units of work to run immediately on the current thread.")]
public sealed class ImmediateScheduler : Source<Rx.ImmediateScheduler>
{
/// <summary>
/// Generates an observable sequence that returns the singleton
/// <see cref="Rx.ImmediateScheduler"/> object.
/// </summary>
/// <returns>
/// A sequence containing the singleton <see cref="Rx.ImmediateScheduler"/>
/// object.
/// </returns>
public override IObservable<Rx.ImmediateScheduler> Generate()
{
return Observable.Return(Rx.ImmediateScheduler.Instance);
}
}
}
28 changes: 28 additions & 0 deletions Bonsai.Core/Reactive/Concurrency/NewThreadScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.ComponentModel;
using System.Reactive.Linq;
using Rx = System.Reactive.Concurrency;

namespace Bonsai.Reactive
{
/// <summary>
/// Represents an operator that returns an object that schedules each unit of work
/// on a separate thread.
/// </summary>
[Description("Returns an object that schedules each unit of work on a separate thread.")]
public sealed class NewThreadScheduler : Source<Rx.NewThreadScheduler>
{
/// <summary>
/// Generates an observable sequence that returns the default
/// <see cref="Rx.NewThreadScheduler"/> object.
/// </summary>
/// <returns>
/// A sequence containing the default <see cref="Rx.NewThreadScheduler"/>
/// object.
/// </returns>
public override IObservable<Rx.NewThreadScheduler> Generate()
{
return Observable.Return(Rx.NewThreadScheduler.Default);
}
}
}
95 changes: 95 additions & 0 deletions Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Reactive.Concurrency;
using System.Xml.Serialization;
using Rx = System.Reactive.Concurrency;

namespace Bonsai.Reactive
{
/// <summary>
/// Represents a value specifying the scheduler to be used when handling
/// concurrency in a reactive operator.
/// </summary>
[XmlType(Namespace = Constants.ReactiveXmlNamespace)]
public struct SchedulerMapping : IEquatable<SchedulerMapping>
{
/// <summary>
/// Initializes a new instance of the <see cref="SchedulerMapping"/> class
/// using the specified scheduler.
/// </summary>
/// <param name="scheduler">The scheduler assigned to the mapping.</param>
/// <exception cref="ArgumentNullException">
/// <paramref name="scheduler"/> is <see langword="null"/>.
/// </exception>
public SchedulerMapping(IScheduler scheduler)
{
Instance = scheduler ?? throw new ArgumentNullException(nameof(scheduler));
}

/// <summary>
/// Gets or sets the scheduler object assigned to the mapping.
/// </summary>
[XmlIgnore]
public IScheduler Instance { get; private set; }

/// <summary>
/// Gets or sets an XML representation of the scheduler instance for serialization.
/// </summary>
[XmlText]
[Browsable(false)]
public string InstanceXml
{
get
{
var instance = Instance;
if (instance == Rx.DefaultScheduler.Instance) return nameof(DefaultScheduler);
if (instance == Rx.CurrentThreadScheduler.Instance) return nameof(CurrentThreadScheduler);
if (instance == Rx.ImmediateScheduler.Instance) return nameof(ImmediateScheduler);
if (instance == Rx.NewThreadScheduler.Default) return nameof(NewThreadScheduler);
if (instance == Rx.TaskPoolScheduler.Default) return nameof(TaskPoolScheduler);
if (instance == Rx.ThreadPoolScheduler.Instance) return nameof(ThreadPoolScheduler);
return null;
}
set
{
Instance = value switch
{
nameof(DefaultScheduler) => Rx.DefaultScheduler.Instance,
nameof(CurrentThreadScheduler) => Rx.CurrentThreadScheduler.Instance,
nameof(ImmediateScheduler) => Rx.ImmediateScheduler.Instance,
nameof(NewThreadScheduler) => Rx.NewThreadScheduler.Default,
nameof(TaskPoolScheduler) => Rx.TaskPoolScheduler.Default,
nameof(ThreadPoolScheduler) => Rx.ThreadPoolScheduler.Instance,
_ => null,
};
}
}

/// <summary>
/// Returns a value indicating whether this object has the same scheduler
/// instance as a specified <see cref="SchedulerMapping"/> value.
/// </summary>
/// <param name="other">The <see cref="SchedulerMapping"/> value to compare to this object.</param>
/// <returns>
/// <see langword="true"/> if <paramref name="other"/> has the same scheduler instance
/// as this object; otherwise, <see langword="false"/>.
/// </returns>
public bool Equals(SchedulerMapping other)
{
return Instance == other.Instance;
}

/// <inheritdoc/>
public override bool Equals(object obj)
{
return obj is SchedulerMapping mapping && Instance == mapping.Instance;
}

/// <inheritdoc/>
public override int GetHashCode()
{
return EqualityComparer<IScheduler>.Default.GetHashCode(Instance);
}
}
}
56 changes: 56 additions & 0 deletions Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.ComponentModel;
using System.Globalization;
using Rx = System.Reactive.Concurrency;

namespace Bonsai.Reactive.Concurrency
{
internal class SchedulerMappingConverter : TypeConverter
{
static readonly SchedulerMapping[] DefaultSchedulers = new SchedulerMapping[]
{
new SchedulerMapping(),
new SchedulerMapping(Rx.DefaultScheduler.Instance),
new SchedulerMapping(Rx.CurrentThreadScheduler.Instance),
new SchedulerMapping(Rx.ImmediateScheduler.Instance),
new SchedulerMapping(Rx.ThreadPoolScheduler.Instance),
new SchedulerMapping(Rx.NewThreadScheduler.Default),
new SchedulerMapping(Rx.TaskPoolScheduler.Default),
};

public override bool CanConvertFrom(ITypeDescriptorContext context, Type sourceType)
{
return sourceType == typeof(string) || base.CanConvertFrom(context, sourceType);
}

public override object ConvertFrom(ITypeDescriptorContext context, CultureInfo culture, object value)
{
if (value is string name)
{
return new SchedulerMapping { InstanceXml = name };
}

return base.ConvertFrom(context, culture, value);
}

public override object ConvertTo(ITypeDescriptorContext context, CultureInfo culture, object value, Type destinationType)
{
if (value is SchedulerMapping mapping && destinationType == typeof(string))
{
return mapping.InstanceXml;
}

return base.ConvertTo(context, culture, value, destinationType);
}

public override bool GetStandardValuesSupported(ITypeDescriptorContext context)
{
return true;
}

public override StandardValuesCollection GetStandardValues(ITypeDescriptorContext context)
{
return new StandardValuesCollection(DefaultSchedulers);
}
}
}
28 changes: 28 additions & 0 deletions Bonsai.Core/Reactive/Concurrency/TaskPoolScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.ComponentModel;
using System.Reactive.Linq;
using Rx = System.Reactive.Concurrency;

namespace Bonsai.Reactive
{
/// <summary>
/// Represents an operator that returns an object that schedules units of work
/// on the Task Parallel Library (TPL) task pool.
/// </summary>
[Description("Returns an object that schedules units of work on the Task Parallel Library (TPL) task pool.")]
public sealed class TaskPoolScheduler : Source<Rx.TaskPoolScheduler>
{
/// <summary>
/// Generates an observable sequence that returns the default
/// <see cref="Rx.TaskPoolScheduler"/> object.
/// </summary>
/// <returns>
/// A sequence containing the default <see cref="Rx.TaskPoolScheduler"/>
/// object.
/// </returns>
public override IObservable<Rx.TaskPoolScheduler> Generate()
{
return Observable.Return(Rx.TaskPoolScheduler.Default);
}
}
}
28 changes: 28 additions & 0 deletions Bonsai.Core/Reactive/Concurrency/ThreadPoolScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.ComponentModel;
using System.Reactive.Linq;
using Rx = System.Reactive.Concurrency;

namespace Bonsai.Reactive
{
/// <summary>
/// Represents an operator that returns an object that schedules units of work
/// on the CLR thread pool.
/// </summary>
[Description("Returns an object that schedules units of work on the CLR thread pool.")]
public sealed class ThreadPoolScheduler : Source<Rx.ThreadPoolScheduler>
{
/// <summary>
/// Generates an observable sequence that returns the singleton
/// <see cref="Rx.ThreadPoolScheduler"/> object.
/// </summary>
/// <returns>
/// A sequence containing the singleton <see cref="Rx.ThreadPoolScheduler"/>
/// object.
/// </returns>
public override IObservable<Rx.ThreadPoolScheduler> Generate()
{
return Observable.Return(Rx.ThreadPoolScheduler.Instance);
}
}
}
Loading