From dcd547d663bc8690dc0261499f831ae4fc84b854 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Fri, 14 Oct 2022 23:05:46 +0100 Subject: [PATCH 1/4] Add scheduler and concurrency control operators --- Bonsai.Core/Bonsai.Core.csproj | 1 + .../Concurrency/CurrentThreadScheduler.cs | 21 ++++++ .../Reactive/Concurrency/DefaultScheduler.cs | 21 ++++++ .../Concurrency/EventLoopScheduler.cs | 27 +++++++ .../Concurrency/ImmediateScheduler.cs | 21 ++++++ .../Concurrency/NewThreadScheduler.cs | 21 ++++++ .../Reactive/Concurrency/SchedulerMapping.cs | 74 +++++++++++++++++++ .../Concurrency/SchedulerMappingConverter.cs | 64 ++++++++++++++++ .../Reactive/Concurrency/TaskPoolScheduler.cs | 21 ++++++ .../Concurrency/ThreadPoolScheduler.cs | 21 ++++++ Bonsai.Core/Reactive/ObserveOn.cs | 58 +++++++++++++++ Bonsai.Core/Reactive/SubscribeOn.cs | 61 +++++++++++++++ Bonsai.Editor/Bonsai.Editor.csproj | 1 - Bonsai.System/Bonsai.System.csproj | 3 - 14 files changed, 411 insertions(+), 4 deletions(-) create mode 100644 Bonsai.Core/Reactive/Concurrency/CurrentThreadScheduler.cs create mode 100644 Bonsai.Core/Reactive/Concurrency/DefaultScheduler.cs create mode 100644 Bonsai.Core/Reactive/Concurrency/EventLoopScheduler.cs create mode 100644 Bonsai.Core/Reactive/Concurrency/ImmediateScheduler.cs create mode 100644 Bonsai.Core/Reactive/Concurrency/NewThreadScheduler.cs create mode 100644 Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs create mode 100644 Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs create mode 100644 Bonsai.Core/Reactive/Concurrency/TaskPoolScheduler.cs create mode 100644 Bonsai.Core/Reactive/Concurrency/ThreadPoolScheduler.cs create mode 100644 Bonsai.Core/Reactive/ObserveOn.cs create mode 100644 Bonsai.Core/Reactive/SubscribeOn.cs diff --git a/Bonsai.Core/Bonsai.Core.csproj b/Bonsai.Core/Bonsai.Core.csproj index 2012e3ea1..9f7ce220f 100644 --- a/Bonsai.Core/Bonsai.Core.csproj +++ b/Bonsai.Core/Bonsai.Core.csproj @@ -10,6 +10,7 @@ + diff --git a/Bonsai.Core/Reactive/Concurrency/CurrentThreadScheduler.cs b/Bonsai.Core/Reactive/Concurrency/CurrentThreadScheduler.cs new file mode 100644 index 000000000..89c26b449 --- /dev/null +++ b/Bonsai.Core/Reactive/Concurrency/CurrentThreadScheduler.cs @@ -0,0 +1,21 @@ +using System.ComponentModel; +using Rx = System.Reactive.Concurrency; + +namespace Bonsai.Reactive +{ + /// + /// Represents an operator that returns an object that schedules units of work + /// on the current thread. + /// + [Description("Returns an object that schedules units of work on the current thread.")] + public sealed class CurrentThreadScheduler : SchedulerSource + { + /// + /// Initializes a new instance of the class. + /// + public CurrentThreadScheduler() + : base(Rx.CurrentThreadScheduler.Instance) + { + } + } +} diff --git a/Bonsai.Core/Reactive/Concurrency/DefaultScheduler.cs b/Bonsai.Core/Reactive/Concurrency/DefaultScheduler.cs new file mode 100644 index 000000000..1b2d8549a --- /dev/null +++ b/Bonsai.Core/Reactive/Concurrency/DefaultScheduler.cs @@ -0,0 +1,21 @@ +using System.ComponentModel; +using Rx = System.Reactive.Concurrency; + +namespace Bonsai.Reactive +{ + /// + /// Represents an operator that returns an object that schedules units of work + /// on the platform's default scheduler. + /// + [Description("Returns an object that schedules units of work on the platform's default scheduler.")] + public sealed class DefaultScheduler : SchedulerSource + { + /// + /// Initializes a new instance of the class. + /// + public DefaultScheduler() + : base(Rx.DefaultScheduler.Instance) + { + } + } +} diff --git a/Bonsai.Core/Reactive/Concurrency/EventLoopScheduler.cs b/Bonsai.Core/Reactive/Concurrency/EventLoopScheduler.cs new file mode 100644 index 000000000..1db9742a7 --- /dev/null +++ b/Bonsai.Core/Reactive/Concurrency/EventLoopScheduler.cs @@ -0,0 +1,27 @@ +using System; +using System.ComponentModel; +using System.Reactive.Linq; +using Rx = System.Reactive.Concurrency; + +namespace Bonsai.Reactive +{ + /// + /// Represents an operator that creates an object that schedules units of work + /// on a single dedicated thread. + /// + [Description("Creates an object that schedules units of work on a single dedicated thread.")] + public sealed class EventLoopScheduler : Source + { + /// + /// Generates an observable sequence that returns a new + /// object. + /// + /// + /// A sequence containing the created object. + /// + public override IObservable Generate() + { + return Observable.Defer(() => Observable.Return(new Rx.EventLoopScheduler())); + } + } +} diff --git a/Bonsai.Core/Reactive/Concurrency/ImmediateScheduler.cs b/Bonsai.Core/Reactive/Concurrency/ImmediateScheduler.cs new file mode 100644 index 000000000..db91477a7 --- /dev/null +++ b/Bonsai.Core/Reactive/Concurrency/ImmediateScheduler.cs @@ -0,0 +1,21 @@ +using System.ComponentModel; +using Rx = System.Reactive.Concurrency; + +namespace Bonsai.Reactive +{ + /// + /// Represents an operator that returns an object that schedules units of work + /// to run immediately on the current thread. + /// + [Description("Returns an object that schedules units of work to run immediately on the current thread.")] + public sealed class ImmediateScheduler : SchedulerSource + { + /// + /// Initializes a new instance of the class. + /// + public ImmediateScheduler() + : base(Rx.ImmediateScheduler.Instance) + { + } + } +} diff --git a/Bonsai.Core/Reactive/Concurrency/NewThreadScheduler.cs b/Bonsai.Core/Reactive/Concurrency/NewThreadScheduler.cs new file mode 100644 index 000000000..d5053d01b --- /dev/null +++ b/Bonsai.Core/Reactive/Concurrency/NewThreadScheduler.cs @@ -0,0 +1,21 @@ +using System.ComponentModel; +using Rx = System.Reactive.Concurrency; + +namespace Bonsai.Reactive +{ + /// + /// Represents an operator that returns an object that schedules each unit of work + /// on a separate thread. + /// + [Description("Returns an object that schedules each unit of work on a separate thread.")] + public sealed class NewThreadScheduler : SchedulerSource + { + /// + /// Initializes a new instance of the class. + /// + public NewThreadScheduler() + : base(Rx.NewThreadScheduler.Default) + { + } + } +} diff --git a/Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs b/Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs new file mode 100644 index 000000000..5b879925e --- /dev/null +++ b/Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs @@ -0,0 +1,74 @@ +using System; +using System.Reactive.Concurrency; +using System.Reactive.Linq; +using System.Xml.Serialization; + +namespace Bonsai.Reactive +{ + /// + /// Represents an object that specifies a scheduler to be used when handling + /// concurrency in a reactive operator. + /// + [XmlType(Namespace = Constants.ReactiveXmlNamespace)] + public class SchedulerMapping + { + internal static readonly SchedulerMapping Default = new DefaultScheduler(); + + internal SchedulerMapping() + { + } + + /// + /// Initializes a new instance of the class + /// using the specified scheduler. + /// + /// The scheduler to be assigned to the mapping. + /// + /// is . + /// + public SchedulerMapping(IScheduler scheduler) + { + Instance = scheduler ?? throw new ArgumentNullException(nameof(scheduler)); + } + + internal IScheduler Instance { get; } + } + + /// + /// Represents an operator that returns a scheduler object. + /// + /// The type of the scheduler object. + [Combinator(MethodName = nameof(Generate))] + [WorkflowElementCategory(ElementCategory.Source)] + [XmlType(Namespace = Constants.ReactiveXmlNamespace)] + public abstract class SchedulerSource : SchedulerMapping where TScheduler : IScheduler + { + internal SchedulerSource(TScheduler defaultScheduler) + : base(defaultScheduler) + { + } + + /// + /// Generates an observable sequence that returns the scheduler instance. + /// + /// + /// A sequence containing the object. + /// + public IObservable Generate() + { + return Observable.Return((TScheduler)Instance); + } + + /// + public override bool Equals(object obj) + { + return obj is SchedulerSource scheduler && scheduler.Instance == Instance; + } + + /// + public override int GetHashCode() + { + return Instance.GetHashCode(); + } + } +} diff --git a/Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs b/Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs new file mode 100644 index 000000000..53a9019f6 --- /dev/null +++ b/Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs @@ -0,0 +1,64 @@ +using System; +using System.ComponentModel; +using System.Globalization; + +namespace Bonsai.Reactive.Concurrency +{ + internal class SchedulerMappingConverter : TypeConverter + { + static readonly SchedulerMapping[] DefaultSchedulers = new SchedulerMapping[] + { + new DefaultScheduler(), + new CurrentThreadScheduler(), + new ImmediateScheduler(), + new NewThreadScheduler(), + new TaskPoolScheduler(), + new ThreadPoolScheduler() + }; + + public override bool CanConvertFrom(ITypeDescriptorContext context, Type sourceType) + { + return sourceType == typeof(string) || base.CanConvertFrom(context, sourceType); + } + + public override object ConvertFrom(ITypeDescriptorContext context, CultureInfo culture, object value) + { + if (value is string name) + { + var scheduler = Array.Find(DefaultSchedulers, x => x?.GetType().Name == (string)value); + return scheduler ?? throw new ArgumentException( + "The specified string does not identify a well-known scheduler type.", + nameof(value)); + } + + return base.ConvertFrom(context, culture, value); + } + + public override object ConvertTo(ITypeDescriptorContext context, CultureInfo culture, object value, Type destinationType) + { + if (value != null && destinationType == typeof(string)) + { + var scheduler = Array.Find(DefaultSchedulers, x => x != null && x.Equals(value)); + if (scheduler == null) + { + return "(" + nameof(SchedulerMapping) + ")"; + } + + var sourceType = value.GetType(); + return sourceType.Name; + } + + return base.ConvertTo(context, culture, value, destinationType); + } + + public override bool GetStandardValuesSupported(ITypeDescriptorContext context) + { + return true; + } + + public override StandardValuesCollection GetStandardValues(ITypeDescriptorContext context) + { + return new StandardValuesCollection(DefaultSchedulers); + } + } +} diff --git a/Bonsai.Core/Reactive/Concurrency/TaskPoolScheduler.cs b/Bonsai.Core/Reactive/Concurrency/TaskPoolScheduler.cs new file mode 100644 index 000000000..673cae297 --- /dev/null +++ b/Bonsai.Core/Reactive/Concurrency/TaskPoolScheduler.cs @@ -0,0 +1,21 @@ +using System.ComponentModel; +using Rx = System.Reactive.Concurrency; + +namespace Bonsai.Reactive +{ + /// + /// Represents an operator that returns an object that schedules units of work + /// on the Task Parallel Library (TPL) task pool. + /// + [Description("Returns an object that schedules units of work on the Task Parallel Library (TPL) task pool.")] + public sealed class TaskPoolScheduler : SchedulerSource + { + /// + /// Initializes a new instance of the class. + /// + public TaskPoolScheduler() + : base(Rx.TaskPoolScheduler.Default) + { + } + } +} diff --git a/Bonsai.Core/Reactive/Concurrency/ThreadPoolScheduler.cs b/Bonsai.Core/Reactive/Concurrency/ThreadPoolScheduler.cs new file mode 100644 index 000000000..ecb714a76 --- /dev/null +++ b/Bonsai.Core/Reactive/Concurrency/ThreadPoolScheduler.cs @@ -0,0 +1,21 @@ +using System.ComponentModel; +using Rx = System.Reactive.Concurrency; + +namespace Bonsai.Reactive +{ + /// + /// Represents an operator that returns an object that schedules units of work + /// on the CLR thread pool. + /// + [Description("Returns an object that schedules units of work on the CLR thread pool.")] + public sealed class ThreadPoolScheduler : SchedulerSource + { + /// + /// Initializes a new instance of the class. + /// + public ThreadPoolScheduler() + : base(Rx.ThreadPoolScheduler.Instance) + { + } + } +} diff --git a/Bonsai.Core/Reactive/ObserveOn.cs b/Bonsai.Core/Reactive/ObserveOn.cs new file mode 100644 index 000000000..28024c2ce --- /dev/null +++ b/Bonsai.Core/Reactive/ObserveOn.cs @@ -0,0 +1,58 @@ +using System; +using System.Reactive.Linq; +using System.Xml.Serialization; +using System.ComponentModel; +using Bonsai.Expressions; +using Bonsai.Reactive.Concurrency; + +namespace Bonsai.Reactive +{ + /// + /// Represents an operator that sends all notifications in the sequence to + /// the specified scheduler. + /// + [XmlType(Namespace = Constants.XmlNamespace)] + [Description("Sends all notifications in the sequence to the specified scheduler.")] + public class ObserveOn : Combinator, ISerializableElement + { + /// + /// Gets or sets a value specifying the scheduler on which to observe notifications. + /// + [XmlElement(Namespace = Constants.XmlNamespace)] + [TypeConverter(typeof(SchedulerMappingConverter))] + [Description("Specifies the scheduler on which to observe notifications.")] + public SchedulerMapping Scheduler { get; set; } = SchedulerMapping.Default; + + object ISerializableElement.Element => Scheduler; + + /// + /// Gets a value indicating whether the property should be serialized. + /// + [Browsable(false)] + public bool SchedulerSpecified + { + get + { + var scheduler = Scheduler; + return scheduler != SchedulerMapping.Default && + scheduler?.GetType() != typeof(SchedulerMapping); + } + } + + /// + /// Sends all notifications in an observable sequence to the specified scheduler. + /// + /// + /// The type of the elements in the sequence. + /// + /// The source sequence to schedule notifications for. + /// + /// An observable sequence where all notifications are sent to the specified + /// scheduler. + /// + public override IObservable Process(IObservable source) + { + return source.ObserveOn(Scheduler?.Instance); + } + } +} diff --git a/Bonsai.Core/Reactive/SubscribeOn.cs b/Bonsai.Core/Reactive/SubscribeOn.cs new file mode 100644 index 000000000..5f9f97e20 --- /dev/null +++ b/Bonsai.Core/Reactive/SubscribeOn.cs @@ -0,0 +1,61 @@ +using System; +using System.Reactive.Linq; +using System.Xml.Serialization; +using System.ComponentModel; +using Bonsai.Expressions; +using Bonsai.Reactive.Concurrency; + +namespace Bonsai.Reactive +{ + /// + /// Represents an operator that wraps the source sequence in order to run its + /// subscription and unsubscription logic on the specified scheduler. + /// + /// This operator is not commonly used. + /// + [XmlType(Namespace = Constants.ReactiveXmlNamespace)] + [Description("Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.")] + public class SubscribeOn : Combinator, ISerializableElement + { + /// + /// Gets or sets a value specifying the scheduler on which to run subscription and + /// unsubscription actions. + /// + [XmlElement(Namespace = Constants.XmlNamespace)] + [TypeConverter(typeof(SchedulerMappingConverter))] + public SchedulerMapping Scheduler { get; set; } = SchedulerMapping.Default; + + object ISerializableElement.Element => Scheduler; + + /// + /// Gets a value indicating whether the property should be serialized. + /// + [Browsable(false)] + public bool SchedulerSpecified + { + get + { + var scheduler = Scheduler; + return scheduler != SchedulerMapping.Default && + scheduler?.GetType() != typeof(SchedulerMapping); + } + } + + /// + /// Wraps the source sequence in order to run its subscription and + /// unsubscription logic on the specified scheduler. + /// + /// + /// The type of the elements in the sequence. + /// + /// The observable sequence to wrap. + /// + /// An observable sequence where subscription and unsubscription logic + /// run on the specified scheduler. + /// + public override IObservable Process(IObservable source) + { + return source.SubscribeOn(Scheduler?.Instance); + } + } +} diff --git a/Bonsai.Editor/Bonsai.Editor.csproj b/Bonsai.Editor/Bonsai.Editor.csproj index 977b92175..6e309059b 100644 --- a/Bonsai.Editor/Bonsai.Editor.csproj +++ b/Bonsai.Editor/Bonsai.Editor.csproj @@ -13,7 +13,6 @@ - diff --git a/Bonsai.System/Bonsai.System.csproj b/Bonsai.System/Bonsai.System.csproj index 064b1ec43..2eba4d7e5 100644 --- a/Bonsai.System/Bonsai.System.csproj +++ b/Bonsai.System/Bonsai.System.csproj @@ -7,9 +7,6 @@ net462;netstandard2.0 2.7.0 - - - From c1ebf81da21cb413ae7b94a8cb98a7936d4af311 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Sat, 15 Oct 2022 12:21:29 +0100 Subject: [PATCH 2/4] Allow serializing scheduler in include workflows --- .../Concurrency/CurrentThreadScheduler.cs | 17 ++-- .../Reactive/Concurrency/DefaultScheduler.cs | 17 ++-- .../Concurrency/ImmediateScheduler.cs | 17 ++-- .../Concurrency/NewThreadScheduler.cs | 17 ++-- .../Reactive/Concurrency/SchedulerMapping.cs | 79 ++++++++++++------- .../Concurrency/SchedulerMappingConverter.cs | 30 +++---- .../Reactive/Concurrency/TaskPoolScheduler.cs | 17 ++-- .../Concurrency/ThreadPoolScheduler.cs | 17 ++-- Bonsai.Core/Reactive/ObserveOn.cs | 24 +----- Bonsai.Core/Reactive/SubscribeOn.cs | 25 +----- 10 files changed, 140 insertions(+), 120 deletions(-) diff --git a/Bonsai.Core/Reactive/Concurrency/CurrentThreadScheduler.cs b/Bonsai.Core/Reactive/Concurrency/CurrentThreadScheduler.cs index 89c26b449..d718ddb99 100644 --- a/Bonsai.Core/Reactive/Concurrency/CurrentThreadScheduler.cs +++ b/Bonsai.Core/Reactive/Concurrency/CurrentThreadScheduler.cs @@ -1,4 +1,6 @@ -using System.ComponentModel; +using System; +using System.ComponentModel; +using System.Reactive.Linq; using Rx = System.Reactive.Concurrency; namespace Bonsai.Reactive @@ -8,14 +10,19 @@ namespace Bonsai.Reactive /// on the current thread. /// [Description("Returns an object that schedules units of work on the current thread.")] - public sealed class CurrentThreadScheduler : SchedulerSource + public sealed class CurrentThreadScheduler : Source { /// - /// Initializes a new instance of the class. + /// Generates an observable sequence that returns the singleton + /// object. /// - public CurrentThreadScheduler() - : base(Rx.CurrentThreadScheduler.Instance) + /// + /// A sequence containing the singleton + /// object. + /// + public override IObservable Generate() { + return Observable.Return(Rx.CurrentThreadScheduler.Instance); } } } diff --git a/Bonsai.Core/Reactive/Concurrency/DefaultScheduler.cs b/Bonsai.Core/Reactive/Concurrency/DefaultScheduler.cs index 1b2d8549a..6f206d6fe 100644 --- a/Bonsai.Core/Reactive/Concurrency/DefaultScheduler.cs +++ b/Bonsai.Core/Reactive/Concurrency/DefaultScheduler.cs @@ -1,4 +1,6 @@ -using System.ComponentModel; +using System; +using System.ComponentModel; +using System.Reactive.Linq; using Rx = System.Reactive.Concurrency; namespace Bonsai.Reactive @@ -8,14 +10,19 @@ namespace Bonsai.Reactive /// on the platform's default scheduler. /// [Description("Returns an object that schedules units of work on the platform's default scheduler.")] - public sealed class DefaultScheduler : SchedulerSource + public sealed class DefaultScheduler : Source { /// - /// Initializes a new instance of the class. + /// Generates an observable sequence that returns the singleton + /// object. /// - public DefaultScheduler() - : base(Rx.DefaultScheduler.Instance) + /// + /// A sequence containing the singleton + /// object. + /// + public override IObservable Generate() { + return Observable.Return(Rx.DefaultScheduler.Instance); } } } diff --git a/Bonsai.Core/Reactive/Concurrency/ImmediateScheduler.cs b/Bonsai.Core/Reactive/Concurrency/ImmediateScheduler.cs index db91477a7..d51b829b6 100644 --- a/Bonsai.Core/Reactive/Concurrency/ImmediateScheduler.cs +++ b/Bonsai.Core/Reactive/Concurrency/ImmediateScheduler.cs @@ -1,4 +1,6 @@ -using System.ComponentModel; +using System; +using System.ComponentModel; +using System.Reactive.Linq; using Rx = System.Reactive.Concurrency; namespace Bonsai.Reactive @@ -8,14 +10,19 @@ namespace Bonsai.Reactive /// to run immediately on the current thread. /// [Description("Returns an object that schedules units of work to run immediately on the current thread.")] - public sealed class ImmediateScheduler : SchedulerSource + public sealed class ImmediateScheduler : Source { /// - /// Initializes a new instance of the class. + /// Generates an observable sequence that returns the singleton + /// object. /// - public ImmediateScheduler() - : base(Rx.ImmediateScheduler.Instance) + /// + /// A sequence containing the singleton + /// object. + /// + public override IObservable Generate() { + return Observable.Return(Rx.ImmediateScheduler.Instance); } } } diff --git a/Bonsai.Core/Reactive/Concurrency/NewThreadScheduler.cs b/Bonsai.Core/Reactive/Concurrency/NewThreadScheduler.cs index d5053d01b..c8eaee03f 100644 --- a/Bonsai.Core/Reactive/Concurrency/NewThreadScheduler.cs +++ b/Bonsai.Core/Reactive/Concurrency/NewThreadScheduler.cs @@ -1,4 +1,6 @@ -using System.ComponentModel; +using System; +using System.ComponentModel; +using System.Reactive.Linq; using Rx = System.Reactive.Concurrency; namespace Bonsai.Reactive @@ -8,14 +10,19 @@ namespace Bonsai.Reactive /// on a separate thread. /// [Description("Returns an object that schedules each unit of work on a separate thread.")] - public sealed class NewThreadScheduler : SchedulerSource + public sealed class NewThreadScheduler : Source { /// - /// Initializes a new instance of the class. + /// Generates an observable sequence that returns the default + /// object. /// - public NewThreadScheduler() - : base(Rx.NewThreadScheduler.Default) + /// + /// A sequence containing the default + /// object. + /// + public override IObservable Generate() { + return Observable.Return(Rx.NewThreadScheduler.Default); } } } diff --git a/Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs b/Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs index 5b879925e..78a1a9bd2 100644 --- a/Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs +++ b/Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs @@ -1,28 +1,24 @@ using System; +using System.Collections.Generic; +using System.ComponentModel; using System.Reactive.Concurrency; -using System.Reactive.Linq; using System.Xml.Serialization; +using Rx = System.Reactive.Concurrency; namespace Bonsai.Reactive { /// - /// Represents an object that specifies a scheduler to be used when handling + /// Represents a value specifying the scheduler to be used when handling /// concurrency in a reactive operator. /// [XmlType(Namespace = Constants.ReactiveXmlNamespace)] - public class SchedulerMapping + public struct SchedulerMapping : IEquatable { - internal static readonly SchedulerMapping Default = new DefaultScheduler(); - - internal SchedulerMapping() - { - } - /// /// Initializes a new instance of the class /// using the specified scheduler. /// - /// The scheduler to be assigned to the mapping. + /// The scheduler assigned to the mapping. /// /// is . /// @@ -31,44 +27,69 @@ public SchedulerMapping(IScheduler scheduler) Instance = scheduler ?? throw new ArgumentNullException(nameof(scheduler)); } - internal IScheduler Instance { get; } - } + /// + /// Gets or sets the scheduler object assigned to the mapping. + /// + [XmlIgnore] + public IScheduler Instance { get; set; } - /// - /// Represents an operator that returns a scheduler object. - /// - /// The type of the scheduler object. - [Combinator(MethodName = nameof(Generate))] - [WorkflowElementCategory(ElementCategory.Source)] - [XmlType(Namespace = Constants.ReactiveXmlNamespace)] - public abstract class SchedulerSource : SchedulerMapping where TScheduler : IScheduler - { - internal SchedulerSource(TScheduler defaultScheduler) - : base(defaultScheduler) + /// + /// Gets or sets an XML representation of the scheduler instance for serialization. + /// + [XmlText] + [Browsable(false)] + public string InstanceXml { + get + { + var instance = Instance; + if (instance == Rx.DefaultScheduler.Instance) return nameof(DefaultScheduler); + if (instance == Rx.CurrentThreadScheduler.Instance) return nameof(CurrentThreadScheduler); + if (instance == Rx.ImmediateScheduler.Instance) return nameof(ImmediateScheduler); + if (instance == Rx.NewThreadScheduler.Default) return nameof(NewThreadScheduler); + if (instance == Rx.TaskPoolScheduler.Default) return nameof(TaskPoolScheduler); + if (instance == Rx.ThreadPoolScheduler.Instance) return nameof(ThreadPoolScheduler); + return null; + } + set + { + Instance = value switch + { + nameof(DefaultScheduler) => Rx.DefaultScheduler.Instance, + nameof(CurrentThreadScheduler) => Rx.CurrentThreadScheduler.Instance, + nameof(ImmediateScheduler) => Rx.ImmediateScheduler.Instance, + nameof(NewThreadScheduler) => Rx.NewThreadScheduler.Default, + nameof(TaskPoolScheduler) => Rx.TaskPoolScheduler.Default, + nameof(ThreadPoolScheduler) => Rx.ThreadPoolScheduler.Instance, + _ => null, + }; + } } /// - /// Generates an observable sequence that returns the scheduler instance. + /// Returns a value indicating whether this object has the same scheduler + /// instance as a specified value. /// + /// The value to compare to this object. /// - /// A sequence containing the object. + /// if has the same scheduler instance + /// as this object; otherwise, . /// - public IObservable Generate() + public bool Equals(SchedulerMapping other) { - return Observable.Return((TScheduler)Instance); + return Instance == other.Instance; } /// public override bool Equals(object obj) { - return obj is SchedulerSource scheduler && scheduler.Instance == Instance; + return obj is SchedulerMapping mapping && Instance == mapping.Instance; } /// public override int GetHashCode() { - return Instance.GetHashCode(); + return EqualityComparer.Default.GetHashCode(Instance); } } } diff --git a/Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs b/Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs index 53a9019f6..76d7efd47 100644 --- a/Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs +++ b/Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs @@ -1,6 +1,7 @@ using System; using System.ComponentModel; using System.Globalization; +using Rx = System.Reactive.Concurrency; namespace Bonsai.Reactive.Concurrency { @@ -8,12 +9,13 @@ internal class SchedulerMappingConverter : TypeConverter { static readonly SchedulerMapping[] DefaultSchedulers = new SchedulerMapping[] { - new DefaultScheduler(), - new CurrentThreadScheduler(), - new ImmediateScheduler(), - new NewThreadScheduler(), - new TaskPoolScheduler(), - new ThreadPoolScheduler() + new SchedulerMapping(), + new SchedulerMapping { Instance = Rx.DefaultScheduler.Instance }, + new SchedulerMapping { Instance = Rx.CurrentThreadScheduler.Instance }, + new SchedulerMapping { Instance = Rx.ImmediateScheduler.Instance }, + new SchedulerMapping { Instance = Rx.ThreadPoolScheduler.Instance }, + new SchedulerMapping { Instance = Rx.NewThreadScheduler.Default }, + new SchedulerMapping { Instance = Rx.TaskPoolScheduler.Default }, }; public override bool CanConvertFrom(ITypeDescriptorContext context, Type sourceType) @@ -25,10 +27,7 @@ public override object ConvertFrom(ITypeDescriptorContext context, CultureInfo c { if (value is string name) { - var scheduler = Array.Find(DefaultSchedulers, x => x?.GetType().Name == (string)value); - return scheduler ?? throw new ArgumentException( - "The specified string does not identify a well-known scheduler type.", - nameof(value)); + return new SchedulerMapping { InstanceXml = name }; } return base.ConvertFrom(context, culture, value); @@ -36,16 +35,9 @@ public override object ConvertFrom(ITypeDescriptorContext context, CultureInfo c public override object ConvertTo(ITypeDescriptorContext context, CultureInfo culture, object value, Type destinationType) { - if (value != null && destinationType == typeof(string)) + if (value is SchedulerMapping mapping && destinationType == typeof(string)) { - var scheduler = Array.Find(DefaultSchedulers, x => x != null && x.Equals(value)); - if (scheduler == null) - { - return "(" + nameof(SchedulerMapping) + ")"; - } - - var sourceType = value.GetType(); - return sourceType.Name; + return mapping.InstanceXml; } return base.ConvertTo(context, culture, value, destinationType); diff --git a/Bonsai.Core/Reactive/Concurrency/TaskPoolScheduler.cs b/Bonsai.Core/Reactive/Concurrency/TaskPoolScheduler.cs index 673cae297..5443d6598 100644 --- a/Bonsai.Core/Reactive/Concurrency/TaskPoolScheduler.cs +++ b/Bonsai.Core/Reactive/Concurrency/TaskPoolScheduler.cs @@ -1,4 +1,6 @@ -using System.ComponentModel; +using System; +using System.ComponentModel; +using System.Reactive.Linq; using Rx = System.Reactive.Concurrency; namespace Bonsai.Reactive @@ -8,14 +10,19 @@ namespace Bonsai.Reactive /// on the Task Parallel Library (TPL) task pool. /// [Description("Returns an object that schedules units of work on the Task Parallel Library (TPL) task pool.")] - public sealed class TaskPoolScheduler : SchedulerSource + public sealed class TaskPoolScheduler : Source { /// - /// Initializes a new instance of the class. + /// Generates an observable sequence that returns the default + /// object. /// - public TaskPoolScheduler() - : base(Rx.TaskPoolScheduler.Default) + /// + /// A sequence containing the default + /// object. + /// + public override IObservable Generate() { + return Observable.Return(Rx.TaskPoolScheduler.Default); } } } diff --git a/Bonsai.Core/Reactive/Concurrency/ThreadPoolScheduler.cs b/Bonsai.Core/Reactive/Concurrency/ThreadPoolScheduler.cs index ecb714a76..0638c286b 100644 --- a/Bonsai.Core/Reactive/Concurrency/ThreadPoolScheduler.cs +++ b/Bonsai.Core/Reactive/Concurrency/ThreadPoolScheduler.cs @@ -1,4 +1,6 @@ -using System.ComponentModel; +using System; +using System.ComponentModel; +using System.Reactive.Linq; using Rx = System.Reactive.Concurrency; namespace Bonsai.Reactive @@ -8,14 +10,19 @@ namespace Bonsai.Reactive /// on the CLR thread pool. /// [Description("Returns an object that schedules units of work on the CLR thread pool.")] - public sealed class ThreadPoolScheduler : SchedulerSource + public sealed class ThreadPoolScheduler : Source { /// - /// Initializes a new instance of the class. + /// Generates an observable sequence that returns the singleton + /// object. /// - public ThreadPoolScheduler() - : base(Rx.ThreadPoolScheduler.Instance) + /// + /// A sequence containing the singleton + /// object. + /// + public override IObservable Generate() { + return Observable.Return(Rx.ThreadPoolScheduler.Instance); } } } diff --git a/Bonsai.Core/Reactive/ObserveOn.cs b/Bonsai.Core/Reactive/ObserveOn.cs index 28024c2ce..6e61c5941 100644 --- a/Bonsai.Core/Reactive/ObserveOn.cs +++ b/Bonsai.Core/Reactive/ObserveOn.cs @@ -2,7 +2,6 @@ using System.Reactive.Linq; using System.Xml.Serialization; using System.ComponentModel; -using Bonsai.Expressions; using Bonsai.Reactive.Concurrency; namespace Bonsai.Reactive @@ -13,31 +12,14 @@ namespace Bonsai.Reactive /// [XmlType(Namespace = Constants.XmlNamespace)] [Description("Sends all notifications in the sequence to the specified scheduler.")] - public class ObserveOn : Combinator, ISerializableElement + public class ObserveOn : Combinator { /// /// Gets or sets a value specifying the scheduler on which to observe notifications. /// - [XmlElement(Namespace = Constants.XmlNamespace)] [TypeConverter(typeof(SchedulerMappingConverter))] [Description("Specifies the scheduler on which to observe notifications.")] - public SchedulerMapping Scheduler { get; set; } = SchedulerMapping.Default; - - object ISerializableElement.Element => Scheduler; - - /// - /// Gets a value indicating whether the property should be serialized. - /// - [Browsable(false)] - public bool SchedulerSpecified - { - get - { - var scheduler = Scheduler; - return scheduler != SchedulerMapping.Default && - scheduler?.GetType() != typeof(SchedulerMapping); - } - } + public SchedulerMapping Scheduler { get; set; } /// /// Sends all notifications in an observable sequence to the specified scheduler. @@ -52,7 +34,7 @@ public bool SchedulerSpecified /// public override IObservable Process(IObservable source) { - return source.ObserveOn(Scheduler?.Instance); + return source.ObserveOn(Scheduler.Instance); } } } diff --git a/Bonsai.Core/Reactive/SubscribeOn.cs b/Bonsai.Core/Reactive/SubscribeOn.cs index 5f9f97e20..478ca4ddc 100644 --- a/Bonsai.Core/Reactive/SubscribeOn.cs +++ b/Bonsai.Core/Reactive/SubscribeOn.cs @@ -2,7 +2,6 @@ using System.Reactive.Linq; using System.Xml.Serialization; using System.ComponentModel; -using Bonsai.Expressions; using Bonsai.Reactive.Concurrency; namespace Bonsai.Reactive @@ -15,31 +14,15 @@ namespace Bonsai.Reactive /// [XmlType(Namespace = Constants.ReactiveXmlNamespace)] [Description("Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.")] - public class SubscribeOn : Combinator, ISerializableElement + public class SubscribeOn : Combinator { /// /// Gets or sets a value specifying the scheduler on which to run subscription and /// unsubscription actions. /// - [XmlElement(Namespace = Constants.XmlNamespace)] [TypeConverter(typeof(SchedulerMappingConverter))] - public SchedulerMapping Scheduler { get; set; } = SchedulerMapping.Default; - - object ISerializableElement.Element => Scheduler; - - /// - /// Gets a value indicating whether the property should be serialized. - /// - [Browsable(false)] - public bool SchedulerSpecified - { - get - { - var scheduler = Scheduler; - return scheduler != SchedulerMapping.Default && - scheduler?.GetType() != typeof(SchedulerMapping); - } - } + [Description("Specifies the scheduler on which to run subscription and unsubscription actions.")] + public SchedulerMapping Scheduler { get; set; } /// /// Wraps the source sequence in order to run its subscription and @@ -55,7 +38,7 @@ public bool SchedulerSpecified /// public override IObservable Process(IObservable source) { - return source.SubscribeOn(Scheduler?.Instance); + return source.SubscribeOn(Scheduler.Instance); } } } From b3764a171ef7112d01a0fe0636dfcb71beea4cf4 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Sat, 15 Oct 2022 12:48:29 +0100 Subject: [PATCH 3/4] Avoid serializing null or custom scheduler --- Bonsai.Core/Reactive/ObserveOn.cs | 9 +++++++++ Bonsai.Core/Reactive/SubscribeOn.cs | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/Bonsai.Core/Reactive/ObserveOn.cs b/Bonsai.Core/Reactive/ObserveOn.cs index 6e61c5941..3e69b291c 100644 --- a/Bonsai.Core/Reactive/ObserveOn.cs +++ b/Bonsai.Core/Reactive/ObserveOn.cs @@ -21,6 +21,15 @@ public class ObserveOn : Combinator [Description("Specifies the scheduler on which to observe notifications.")] public SchedulerMapping Scheduler { get; set; } + /// + /// Gets a value indicating whether the property should be serialized. + /// + [Browsable(false)] + public bool SchedulerSpecified + { + get { return !string.IsNullOrEmpty(Scheduler.InstanceXml); } + } + /// /// Sends all notifications in an observable sequence to the specified scheduler. /// diff --git a/Bonsai.Core/Reactive/SubscribeOn.cs b/Bonsai.Core/Reactive/SubscribeOn.cs index 478ca4ddc..a51d80e5b 100644 --- a/Bonsai.Core/Reactive/SubscribeOn.cs +++ b/Bonsai.Core/Reactive/SubscribeOn.cs @@ -24,6 +24,15 @@ public class SubscribeOn : Combinator [Description("Specifies the scheduler on which to run subscription and unsubscription actions.")] public SchedulerMapping Scheduler { get; set; } + /// + /// Gets a value indicating whether the property should be serialized. + /// + [Browsable(false)] + public bool SchedulerSpecified + { + get { return !string.IsNullOrEmpty(Scheduler.InstanceXml); } + } + /// /// Wraps the source sequence in order to run its subscription and /// unsubscription logic on the specified scheduler. From cd773a0e38405097dc710644c7ee0d47bb6bbd86 Mon Sep 17 00:00:00 2001 From: glopesdev Date: Sat, 15 Oct 2022 14:55:18 +0100 Subject: [PATCH 4/4] Avoid property initialization --- .../Reactive/Concurrency/SchedulerMapping.cs | 2 +- .../Concurrency/SchedulerMappingConverter.cs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs b/Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs index 78a1a9bd2..dd5773259 100644 --- a/Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs +++ b/Bonsai.Core/Reactive/Concurrency/SchedulerMapping.cs @@ -31,7 +31,7 @@ public SchedulerMapping(IScheduler scheduler) /// Gets or sets the scheduler object assigned to the mapping. /// [XmlIgnore] - public IScheduler Instance { get; set; } + public IScheduler Instance { get; private set; } /// /// Gets or sets an XML representation of the scheduler instance for serialization. diff --git a/Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs b/Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs index 76d7efd47..3c14b09fe 100644 --- a/Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs +++ b/Bonsai.Core/Reactive/Concurrency/SchedulerMappingConverter.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.ComponentModel; using System.Globalization; using Rx = System.Reactive.Concurrency; @@ -10,12 +10,12 @@ internal class SchedulerMappingConverter : TypeConverter static readonly SchedulerMapping[] DefaultSchedulers = new SchedulerMapping[] { new SchedulerMapping(), - new SchedulerMapping { Instance = Rx.DefaultScheduler.Instance }, - new SchedulerMapping { Instance = Rx.CurrentThreadScheduler.Instance }, - new SchedulerMapping { Instance = Rx.ImmediateScheduler.Instance }, - new SchedulerMapping { Instance = Rx.ThreadPoolScheduler.Instance }, - new SchedulerMapping { Instance = Rx.NewThreadScheduler.Default }, - new SchedulerMapping { Instance = Rx.TaskPoolScheduler.Default }, + new SchedulerMapping(Rx.DefaultScheduler.Instance), + new SchedulerMapping(Rx.CurrentThreadScheduler.Instance), + new SchedulerMapping(Rx.ImmediateScheduler.Instance), + new SchedulerMapping(Rx.ThreadPoolScheduler.Instance), + new SchedulerMapping(Rx.NewThreadScheduler.Default), + new SchedulerMapping(Rx.TaskPoolScheduler.Default), }; public override bool CanConvertFrom(ITypeDescriptorContext context, Type sourceType)