diff --git a/Bonsai.Core/Reactive/Sink.cs b/Bonsai.Core/Reactive/Sink.cs index 3e1bc9f1..0f517be1 100644 --- a/Bonsai.Core/Reactive/Sink.cs +++ b/Bonsai.Core/Reactive/Sink.cs @@ -6,6 +6,7 @@ using System.ComponentModel; using System.Xml.Serialization; using Bonsai.Expressions; +using System.Reflection; namespace Bonsai.Reactive { @@ -62,13 +63,17 @@ public override Expression Build(IEnumerable arguments) var selector = Expression.Lambda(selectorBody, selectorParameter); var selectorObservableType = selector.ReturnType.GetGenericArguments()[0]; return Expression.Call( - typeof(Sink), nameof(Process), - new [] { source.Type.GetGenericArguments()[0], selectorObservableType }, + GetProcessMethod(source.Type.GetGenericArguments()[0], selectorObservableType), source, selector); }); } - static IObservable Process(IObservable source, Func, IObservable> sink) + internal virtual MethodInfo GetProcessMethod(params Type[] typeArguments) + { + return typeof(Sink).GetMethod(nameof(Process), BindingFlags.Static | BindingFlags.NonPublic).MakeGenericMethod(typeArguments); + } + + internal static IObservable Process(IObservable source, Func, IObservable> sink) { return source.Publish(ps => MergeDependencies(ps, sink(ps).IgnoreElements().Select(xs => default(TSource)))); } diff --git a/Bonsai.Core/Reactive/Visualizer.cs b/Bonsai.Core/Reactive/Visualizer.cs index 368f4fe7..c5f83553 100644 --- a/Bonsai.Core/Reactive/Visualizer.cs +++ b/Bonsai.Core/Reactive/Visualizer.cs @@ -1,4 +1,6 @@ -using System.ComponentModel; +using System; +using System.ComponentModel; +using System.Reflection; using System.Xml.Serialization; using Bonsai.Expressions; @@ -33,5 +35,15 @@ public Visualizer(ExpressionBuilderGraph workflow) : base(workflow) { } + + internal override MethodInfo GetProcessMethod(params Type[] typeArguments) + { + return typeof(Visualizer).GetMethod(nameof(Process), BindingFlags.Static | BindingFlags.NonPublic).MakeGenericMethod(typeArguments); + } + + static new IObservable Process(IObservable source, Func, IObservable> sink) + { + return Sink.Process(source, sink); + } } }