diff --git a/Bonsai.Core.Tests/SubjectTests.cs b/Bonsai.Core.Tests/SubjectTests.cs index f590dc6c..3a4d0ae4 100644 --- a/Bonsai.Core.Tests/SubjectTests.cs +++ b/Bonsai.Core.Tests/SubjectTests.cs @@ -1,5 +1,8 @@ using System; +using System.Collections.Generic; +using System.Linq.Expressions; using System.Reactive.Linq; +using System.Threading.Tasks; using Bonsai.Expressions; using Bonsai.Reactive; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -18,30 +21,87 @@ public override IObservable Process(IObservable source) } } + class ConstantExpressionBuilder : ZeroArgumentExpressionBuilder + { + public Expression Expression { get; set; } + + public override Expression Build(IEnumerable arguments) + { + return Expression; + } + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public void Build_MulticastSubjectMissingBuildContext_ThrowsBuildException() + { + var source = new UnitBuilder().Build(); + var builder = new MulticastSubject { Name = nameof(BehaviorSubject) }; + builder.Build(source); + Assert.Fail(); + } + + [TestMethod] + public void Build_MulticastSubjectMissingName_ReturnsSameSequence() + { + var source = Expression.Constant(Observable.Return(0)); + var builder = new TestWorkflow() + .Append(new ConstantExpressionBuilder { Expression = source }) + .Append(new MulticastSubject()) + .AppendOutput(); + var expression = builder.Workflow.Build(); + Assert.AreSame(source, expression); + } + [TestMethod] [ExpectedException(typeof(WorkflowBuildException))] public void Build_MulticastInterfaceToSubjectOfDifferentInterface_ThrowsBuildException() { - var builder = new WorkflowBuilder(); - builder.Workflow.Add(new BehaviorSubject { Name = nameof(BehaviorSubject) }); - var source = builder.Workflow.Add(new CombinatorBuilder { Combinator = new DoubleProperty { Value = 5.5 } }); - var convert1 = builder.Workflow.Add(new CombinatorBuilder { Combinator = new TypeCombinatorMock() }); - var convert2 = builder.Workflow.Add(new MulticastSubject { Name = nameof(BehaviorSubject) }); - builder.Workflow.AddEdge(source, convert1, new ExpressionBuilderArgument()); - builder.Workflow.AddEdge(convert1, convert2, new ExpressionBuilderArgument()); + var builder = new TestWorkflow() + .Append(new BehaviorSubject { Name = nameof(BehaviorSubject) }) + .ResetCursor() + .AppendCombinator(new DoubleProperty { Value = 5.5 }) + .AppendCombinator(new TypeCombinatorMock()) + .Append(new MulticastSubject { Name = nameof(BehaviorSubject) }); var expression = builder.Workflow.Build(); Assert.IsNotNull(expression); } + [TestMethod] + public async Task Build_MulticastSourceToSubject_ReturnsSameValue() + { + var value = 32; + var workflow = new TestWorkflow() + .Append(new BehaviorSubject { Name = nameof(BehaviorSubject) }) + .ResetCursor() + .AppendCombinator(new IntProperty { Value = value }) + .Append(new MulticastSubject { Name = nameof(BehaviorSubject) }) + .AppendOutput(); + var observable = workflow.BuildObservable(); + Assert.AreEqual(value, await observable.Take(1)); + } + + [TestMethod] + public async Task Build_MulticastSourceToObjectSubject_PreservesTypeOfSourceSequence() + { + // related to https://github.com/bonsai-rx/bonsai/issues/1914 + var workflow = new TestWorkflow() + .Append(new BehaviorSubject { Name = nameof(BehaviorSubject) }) + .ResetCursor() + .AppendCombinator(new IntProperty()) + .Append(new MulticastSubject { Name = nameof(BehaviorSubject) }) + .AppendOutput(); + var observable = workflow.BuildObservable(); + Assert.AreEqual(0, await observable.Take(1)); + } + [TestMethod] public void ResourceSubject_SourceTerminatesExceptionally_ShouldNotTryToDispose() { - var workflowBuilder = new WorkflowBuilder(); - var source = workflowBuilder.Workflow.Add(new CombinatorBuilder { Combinator = new ThrowSource() }); - var subject = workflowBuilder.Workflow.Add(new ResourceSubject { Name = nameof(ResourceSubject) }); - var sink = workflowBuilder.Workflow.Add(new CombinatorBuilder { Combinator = new CatchSink() }); - workflowBuilder.Workflow.AddEdge(source, subject, new ExpressionBuilderArgument()); - workflowBuilder.Workflow.AddEdge(subject, sink, new ExpressionBuilderArgument()); + var workflowBuilder = new TestWorkflow() + .AppendCombinator(new ThrowSource()) + .Append(new ResourceSubject { Name = nameof(ResourceSubject) }) + .AppendCombinator(new CatchSink()); var observable = workflowBuilder.Workflow.BuildObservable(); observable.FirstOrDefaultAsync().Wait(); } @@ -58,7 +118,7 @@ class CatchSink : Sink { public override IObservable Process(IObservable source) { - return source.Catch(Observable.Empty()); + return source.Catch(Observable.Empty()); } } diff --git a/Bonsai.Core.Tests/TestWorkflow.cs b/Bonsai.Core.Tests/TestWorkflow.cs index 27a5dac1..a81ec258 100644 --- a/Bonsai.Core.Tests/TestWorkflow.cs +++ b/Bonsai.Core.Tests/TestWorkflow.cs @@ -1,4 +1,5 @@ using System; +using System.Linq.Expressions; using Bonsai.Dag; using Bonsai.Expressions; @@ -97,5 +98,12 @@ public ExpressionBuilderGraph ToInspectableGraph() { return Workflow.ToInspectableGraph(); } + + public IObservable BuildObservable() + { + var expression = Workflow.Build(); + var observableFactory = Expression.Lambda>>(expression).Compile(); + return observableFactory(); + } } } diff --git a/Bonsai.Core/Expressions/MulticastSubject.cs b/Bonsai.Core/Expressions/MulticastSubject.cs index 44a9fe08..0b984732 100644 --- a/Bonsai.Core/Expressions/MulticastSubject.cs +++ b/Bonsai.Core/Expressions/MulticastSubject.cs @@ -67,8 +67,16 @@ public override Expression Build(IEnumerable arguments) ); } - source = CoerceMethodArgument(typeof(IObservable<>).MakeGenericType(subjectType), source); - observableType = subjectType; + var conversionParameter = Expression.Parameter(observableType); + var conversionBody = Expression.Convert(conversionParameter, subjectType); + var conversion = Expression.Lambda(conversionBody, conversionParameter); + return Expression.Call( + typeof(MulticastSubject), + nameof(Process), + new[] { observableType, subjectType }, + source, + subjectExpression, + conversion); } return Expression.Call(typeof(MulticastSubject), nameof(Process), new[] { observableType }, source, subjectExpression); @@ -79,6 +87,17 @@ static IObservable Process(IObservable source, IObser return source.Do(subject); } + static IObservable Process( + IObservable source, + IObserver subject, + Func conversion) + { + return source.Do( + value => subject.OnNext(conversion(value)), + subject.OnError, + subject.OnCompleted); + } + class MulticastSubjectTypeDescriptionProvider : TypeDescriptionProvider { readonly MulticastSubjectTypeDescriptor typeDescriptor = new MulticastSubjectTypeDescriptor(null);