diff --git a/README.md b/README.md index 29fc28bf..e11543bc 100644 --- a/README.md +++ b/README.md @@ -609,7 +609,7 @@ println(s"After set: ${myVar.now()}") myVar.update(_ + 1) println(s"After update: ${myVar.now()}") -new Transaction { _ => +Transaction { println(s"After trx: ${myVar.now()}") } diff --git a/src/main/scala/com/raquo/airstream/combine/MergeStream.scala b/src/main/scala/com/raquo/airstream/combine/MergeStream.scala index 76dce65e..24388837 100644 --- a/src/main/scala/com/raquo/airstream/combine/MergeStream.scala +++ b/src/main/scala/com/raquo/airstream/combine/MergeStream.scala @@ -49,15 +49,18 @@ class MergeStream[A]( * produce unexpected results. */ override private[airstream] def syncFire(transaction: Transaction): Unit = { + //println(s"${this} syncFire in $transaction") while (pendingParentValues.nonEmpty) { val nextValue = pendingParentValues.dequeue().value if (lastFiredInTrx.contains(transaction)) { + //println("- syncFire in new trx") nextValue.fold( nextError => new Transaction(fireError(nextError, _)), nextEvent => new Transaction(fireValue(nextEvent, _)) ) } else { lastFiredInTrx = transaction + //println("- syncFire in same trx") nextValue.fold( fireError(_, transaction), fireValue(_, transaction) diff --git a/src/main/scala/com/raquo/airstream/core/Transaction.scala b/src/main/scala/com/raquo/airstream/core/Transaction.scala index 7d826827..9a844b4a 100644 --- a/src/main/scala/com/raquo/airstream/core/Transaction.scala +++ b/src/main/scala/com/raquo/airstream/core/Transaction.scala @@ -9,9 +9,11 @@ import scala.scalajs.js /** @param code Note: Must not throw! */ class Transaction(private[Transaction] var code: Transaction => Any) { - // val id: Int = Transaction.nextId() + // val id = Transaction.nextId() - //println(s" - create trx $id") + //println(s"--CREATE Trx@${id}") + + //override def toString: String = s"Trx@${id}" /** Priority queue of pending observables: sorted by their topoRank. * @@ -19,9 +21,18 @@ class Transaction(private[Transaction] var code: Transaction => Any) { */ private[this] var maybePendingObservables: js.UndefOr[JsPriorityQueue[SyncObservable[_]]] = js.undefined - Transaction.pendingTransactions.add(this) + if (Transaction.onStart.isSharedStart) { + // This delays scheduling transactions until the end of + // the shared start transaction + //println(s">>> onStart.postStartTransactions.push($this)") + Transaction.onStart.postStartTransactions.push(this) + } else { + //println(s">>> Transaction.pendingTransactions.add($this)") + Transaction.pendingTransactions.add(this) + } @inline private[Transaction] def resolvePendingObservables(): Unit = { + //println(s"$this resolvePendingObservables (n=${maybePendingObservables.map(_.size).getOrElse(0)})") maybePendingObservables.foreach { pendingObservables => while (pendingObservables.nonEmpty) { //dom.console.log("RANKS: ", pendingObservables.debugQueue.map(_.topoRank)) @@ -48,6 +59,16 @@ class Transaction(private[Transaction] var code: Transaction => Any) { object Transaction { + /** Create new transaction (typically used in internal observable code) */ + def apply(code: Transaction => Unit): Unit = new Transaction(code) + + /** Create new transaction (typically used in end user code). + * + * Warning: It is rare that you need to manually create transactions. + * Example of legitimate use case: [[https://github.com/raquo/Airstream/#var-transaction-delay Var transaction delay]] + */ + def apply(code: => Unit): Unit = new Transaction(_ => code) + /** This object holds a queue of callbacks that should be executed * when all observables finish starting. This lets `signal.changes` * streams emit the updated signal's value when restarting, in such @@ -64,10 +85,19 @@ object Transaction { */ object onStart { - private var level: Int = 0 + private[Transaction] var isSharedStart: Boolean = false private val pendingCallbacks: JsArray[Transaction => Unit] = JsArray() + private[Transaction] val postStartTransactions: JsArray[Transaction] = JsArray() + + private[Transaction] var isResolving = false + + // #nc just add a default value to `when` param. Keeping this method for binary compat for now. + def shared[A](code: => A): A = { + shared(code, when = true) + } + /* Put the code that (potentially) adds more than one observer inside. * If that code causes `signal.changes` to restart (and emit the signal's * updated value), this event will be delayed until the rest of your code @@ -95,17 +125,25 @@ object Transaction { * * See https://github.com/raquo/Airstream/#restarting-streams-that-depend-on-signals--signalchanges- */ - def shared[A](code: => A): A = { - level += 1 - val result = try { + def shared[A](code: => A, when: Boolean): A = { + if (isSharedStart || !when) { + // - We are already executing inside the `code` argument passed + // to another onStart.shared block, so adding another try-catch + // block is not necessary: that other block will take care of it. + // - Or, the caller explicitly doesn't want a shared block now (!when) code - } finally { - level -= 1 - if (level == 0) { + } else { + //println("> START SHARED") + isSharedStart = true + val result = try { + code + } finally { + isSharedStart = false resolve() } + //println("< END SHARED") + result } - result } /** Add a callback to execute once the new shared transaction gets executed. @@ -113,17 +151,31 @@ object Transaction { * @param callback - Must not throw! */ def add(callback: Transaction => Unit): Unit = { + //println(s"// add callback ${callback.hashCode()}") pendingCallbacks.push(callback) } private def resolve(): Unit = { - if (pendingCallbacks.length > 0) { - new Transaction(trx => { + if (pendingCallbacks.length == 0) { + //println("- no pending callbacks") + if (postStartTransactions.length > 0) { + //println(s"> CREATE ALT RESOLVE TRX. Num trx-s = ${postStartTransactions.length}") + Transaction { + while (postStartTransactions.length > 0) { + pendingTransactions.add(postStartTransactions.shift()) + } + } + } + } else { + //println(s"> CREATE RESOLVE TRX. Num callbacks = ${pendingCallbacks.length}") + Transaction { trx => // #TODO[Integrity] What if calling callback(trx) calls onStart.add? // Is it ok to put it into the same list, or should it go into a new list, // to be executed in a separate transaction? + isResolving = true while (pendingCallbacks.length > 0) { - val callback = pendingCallbacks.pop() + val callback = pendingCallbacks.shift() + // println(s"// resolve callback ${callback.hashCode()}") try { callback(trx) } catch { @@ -132,14 +184,29 @@ object Transaction { AirstreamError.sendUnhandledError(err) } } - }) + isResolving = false + // println("// resolved any callbacks") + // Any transactions created during the shared start + // that weren't converted to callbacks, will now be + // scheduled, and will be executed after this shared + // transaction finishes (they are marked as its + // children), in the same order as they were created. + // println(s"postStartTransactions.length = ${postStartTransactions.length}") + while (postStartTransactions.length > 0) { + val _trx = postStartTransactions.shift() + // println(s"- pendingTransactions.add(${t})") + pendingTransactions.add(_trx) + } + } } } } private object pendingTransactions { - /** first transaction is the top of the stack, currently running */ + /** First transaction is the top of the stack, currently running. + * That transaction's parent transaction is the second item, and so on. + */ private val stack: JsArray[Transaction] = JsArray() private val children: JsMap[Transaction, JsArray[Transaction]] = new JsMap() @@ -159,7 +226,7 @@ object Transaction { // Consider this later when I have moer comprehensive benchmarks. pushToStack(newTransaction) run(newTransaction) - }{ currentTransaction => + } { currentTransaction => enqueueChild(parent = currentTransaction, newChild = newTransaction) } } @@ -178,7 +245,7 @@ object Transaction { putNextTransactionOnStack(doneTransaction = transaction) - transaction.code = throwDeadTrxError // stop holding up `trx` contents in memory + transaction.code = throwDeadTrxError // stop holding up `trx` contents in memory peekStack().fold { if (children.size > 0) { @@ -187,7 +254,7 @@ object Transaction { children.forEach((transactions, _) => numChildren += transactions.length) throw new Exception(s"Transaction queue error: Stack cleared, but a total of ${numChildren} children for ${children.size} transactions remain. This is a bug in Airstream.") } - }{ nextTransaction => + } { nextTransaction => run(nextTransaction) } } @@ -204,11 +271,9 @@ object Transaction { peekStack().foreach { parentTransaction => putNextTransactionOnStack(doneTransaction = parentTransaction) } - }{ nextChild => + } { nextChild => // Found a child transaction, so put it on the stack, so that it wil run next. // Once that child is all done, it will be popped from the stack, and we will - // - // pushToStack(nextChild) } } @@ -263,10 +328,13 @@ object Transaction { private[core] def isClearState: Boolean = pendingTransactions.isClearState + //private var maybeCurrentTransaction: js.UndefOr[Transaction] = js.undefined + private[airstream] def currentTransaction(): js.UndefOr[Transaction] = pendingTransactions.peekStack() private def run(transaction: Transaction): Unit = { - //println(s"--start trx ${transaction.id}") + //println(s"--START ${transaction}") + //maybeCurrentTransaction = transaction try { transaction.code(transaction) transaction.resolvePendingObservables() @@ -278,11 +346,12 @@ object Transaction { } finally { // @TODO[API,Integrity] // This block is executed regardless of whether an exception is thrown in `code` or not, - // but it doesn't actually catch the exception, so `new Transaction(code)` actually throws + // but it doesn't actually catch the exception, so `Transaction(code)` actually throws // iff `code` throws AND the transaction was created while no other transaction is running // This is not very predictable, so we should fix it. - //println(s"--end trx ${transaction.id}") + //println(s"--END ${transaction}") pendingTransactions.done(transaction) + //maybeCurrentTransaction = js.undefined } } diff --git a/src/main/scala/com/raquo/airstream/core/WritableObservable.scala b/src/main/scala/com/raquo/airstream/core/WritableObservable.scala index cd3d7819..acc9f14e 100644 --- a/src/main/scala/com/raquo/airstream/core/WritableObservable.scala +++ b/src/main/scala/com/raquo/airstream/core/WritableObservable.scala @@ -52,15 +52,14 @@ trait WritableObservable[A] extends Observable[A] { protected val internalObservers: ObserverList[InternalObserver[A]] = new ObserverList(JsArray()) override def addObserver(observer: Observer[A])(implicit owner: Owner): Subscription = { - // #nc[doc] - document this onstart.shared mechanism, both in code and in the real docs. - // - also for extenders: this must be called by your observable also if it can get started without external observers downstream (basically, it shouldn't). - Transaction.onStart.shared { + // println(s"// ${this} addObserver ${observer}") + Transaction.onStart.shared({ maybeWillStart() val subscription = addExternalObserver(observer, owner) onAddedExternalObserver(observer) maybeStart() subscription - } + }, when = !isStarted) } /** Subscribe an external observer to this observable */ @@ -76,12 +75,14 @@ trait WritableObservable[A] extends Observable[A] { */ override protected[airstream] def addInternalObserver(observer: InternalObserver[A], shouldCallMaybeWillStart: Boolean): Unit = { //println(s"$this > aio shouldCallMaybeWillStart=$shouldCallMaybeWillStart") - if (!isStarted && shouldCallMaybeWillStart) { - maybeWillStart() - } - //println(s"$this < aio") - internalObservers.push(observer) - maybeStart() + Transaction.onStart.shared({ + if (!isStarted && shouldCallMaybeWillStart) { + maybeWillStart() + } + //println(s"$this < aio") + internalObservers.push(observer) + maybeStart() + }, when = !isStarted) } /** Child observable should call parent.removeInternalObserver(childInternalObserver) when it is stopped. diff --git a/src/main/scala/com/raquo/airstream/flatten/ConcurrentStream.scala b/src/main/scala/com/raquo/airstream/flatten/ConcurrentStream.scala index bf91791f..7ad212b8 100644 --- a/src/main/scala/com/raquo/airstream/flatten/ConcurrentStream.scala +++ b/src/main/scala/com/raquo/airstream/flatten/ConcurrentStream.scala @@ -33,7 +33,8 @@ class ConcurrentStream[A]( case signal: Signal[EventStream[A @unchecked] @unchecked] => signal.tryNow() match { case Success(stream) => - // We add internal observer later, in `onStart`. onWillStart should not start any observables. // #nc[doc] this pattern + // We add internal observer later, in `onStart`. onWillStart should not start any observables. + // #TODO[Doc] Document this pattern ^^^. maybeAddStream(stream, addInternalObserver = false) case _ => () } diff --git a/src/main/scala/com/raquo/airstream/flatten/SwitchSignal.scala b/src/main/scala/com/raquo/airstream/flatten/SwitchSignal.scala index 92617148..be8dbbe1 100644 --- a/src/main/scala/com/raquo/airstream/flatten/SwitchSignal.scala +++ b/src/main/scala/com/raquo/airstream/flatten/SwitchSignal.scala @@ -28,7 +28,7 @@ class SwitchSignal[A]( private[this] val internalEventObserver: InternalObserver[A] = InternalObserver.fromTry[A]( onTry = (nextTry, _) => { - //println(s"> init trx from SwitchSignal.onValue($nextTry)") + //println(s"> init trx from $this SwitchSignal.onValue($nextTry)") innerSignalLastSeenUpdateId = Protected.lastUpdateId(currentSignalTry.get) new Transaction(fireTry(nextTry, _)) } diff --git a/src/main/scala/com/raquo/airstream/ownership/DynamicSubscription.scala b/src/main/scala/com/raquo/airstream/ownership/DynamicSubscription.scala index 96263cda..f532b0f3 100644 --- a/src/main/scala/com/raquo/airstream/ownership/DynamicSubscription.scala +++ b/src/main/scala/com/raquo/airstream/ownership/DynamicSubscription.scala @@ -43,8 +43,6 @@ class DynamicSubscription private ( private[ownership] def onActivate(owner: Owner): Unit = { //println(s" - activating $this") - // I don't think Laminar itself needs onStart.shared here, this is for users' custom dynamic subscriptions. - // Honestly this might be overkill, but I think this is cheap, and diagnosing these kinds of bugs is expensive. Transaction.onStart.shared { maybeCurrentSubscription = activate(owner) } diff --git a/src/main/scala/com/raquo/airstream/timing/JsPromiseSignal.scala b/src/main/scala/com/raquo/airstream/timing/JsPromiseSignal.scala index fbc74e05..f1d0f82c 100644 --- a/src/main/scala/com/raquo/airstream/timing/JsPromiseSignal.scala +++ b/src/main/scala/com/raquo/airstream/timing/JsPromiseSignal.scala @@ -38,7 +38,7 @@ class JsPromiseSignal[A](promise: js.Promise[A]) extends WritableSignal[Option[A } private def onPromiseResolved(nextPromiseValue: Try[A]): Unit = { - // #nc doc this about onWillStart + // #TODO[Doc] Document this about onWillStart // #Note Normally onWillStart must not create transactions / emit values, but this is ok here // because this callback is always called asynchronously, so any value will be emitted from here // long after the onWillStart / onStart chain has finished. diff --git a/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala b/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala index 2b6e0999..5ad56f06 100644 --- a/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala +++ b/src/test/scala/com/raquo/airstream/core/PullResetSignalSpec.scala @@ -280,8 +280,8 @@ class PullResetSignalSpec extends UnitSpec { $combined.addObserver(Observer.empty) log.toList shouldBe List( - "-4 isEven = true", "-4 isPositive = false", + "-4 isEven = true", "-4 isPositive = false, isEven = true" ) log.clear() @@ -343,13 +343,13 @@ class PullResetSignalSpec extends UnitSpec { v.set(-2) - // #Warning This is a known glitch: of all the new observers, only the first one - // manages to receive the sync event from `changes`. This is because the whole - // onWillStart / onStart loop completes before the other observers are added, - // and even though the event is emitted in a new Transaction, in this case, the - // transaction payload is executed immediately without delay, because there is + // #Warning This is a known glitch with a known workaround: of all the new observers, + // only the first one manages to receive the sync event from `changes`. This is + // because the whole onWillStart / onStart loop completes before the other observers + // are added, and even though the event is emitted in a Transaction, in this case, + // the transaction payload is executed immediately without delay, because there is // no current transaction that we need to wait for. - // - This glitch can be avoided by wrapping subs creation in `new Transaction`, + // - This glitch can be avoided by wrapping subs creation in `Transaction`, // `DynamicSubscription`, or `Transaction.onStart.shared` – see the tests below val subs2 = List( diff --git a/src/test/scala/com/raquo/airstream/core/SharedStartStreamSpec.scala b/src/test/scala/com/raquo/airstream/core/SharedStartStreamSpec.scala new file mode 100644 index 00000000..c2a2f593 --- /dev/null +++ b/src/test/scala/com/raquo/airstream/core/SharedStartStreamSpec.scala @@ -0,0 +1,64 @@ +package com.raquo.airstream.core + +import com.raquo.airstream.UnitSpec +import com.raquo.airstream.fixtures.Effect +import com.raquo.airstream.ownership.{DynamicOwner, DynamicSubscription, Subscription} + +import scala.collection.mutable + +class SharedStartStreamSpec extends UnitSpec { + + it("EventStream.fromValue() / CustomStreamSource") { + + val dynOwner = new DynamicOwner(() => throw new Exception("Accessing dynamic owner after it is killed")) + + val stream = EventStream.fromValue(1) + + val effects = mutable.Buffer[Effect[Int]]() + + val obs1 = Observer[Int](effects += Effect("obs1", _)) + val obs2 = Observer[Int](effects += Effect("obs2", _)) + + // Wrapping in DynamicSubscription put us inside a Transaction.shared block. + // This is (kind of) similar to how Laminar activates multiple subscriptions. + DynamicSubscription.unsafe( + dynOwner, + activate = { o => + val sub1 = stream.addObserver(obs1)(o) + val sub2 = stream.addObserver(obs2)(o) + new Subscription(o, cleanup = () => { + sub1.kill() + sub2.kill() + }) + } + ) + + assert(effects.isEmpty) + + // -- + + dynOwner.activate() + + assert(effects.toList == List( + Effect("obs1", 1), + Effect("obs2", 1), + )) + effects.clear() + + // -- + + dynOwner.deactivate() + assert(effects.isEmpty) + + // -- + + dynOwner.activate() + + assert(effects.toList == List( + Effect("obs1", 1), + Effect("obs2", 1), + )) + effects.clear() + + } +} diff --git a/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenSpec.scala b/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenSpec.scala index ad0e619c..84ab05f5 100644 --- a/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenSpec.scala +++ b/src/test/scala/com/raquo/airstream/flatten/EventStreamFlattenSpec.scala @@ -20,18 +20,50 @@ class EventStreamFlattenSpec extends AsyncUnitSpec { val range = 0 to 3 val stream = EventStream.fromSeq(range, emitOnce = true) + val flatStream = + stream.setDisplayName("SRC-FS") + .map { v => + EventStream.fromSeq(Seq(v * 3), emitOnce = true).setDisplayName(s"INT-FS-$v") + }.setDisplayName("META") + .flatten.setDisplayName("FLAT") + + val effects = mutable.Buffer[Effect[_]]() + val obs0 = Observer[Int](newValue => effects += Effect("obs0", newValue)).setDisplayName("obs0") + val subscription0 = flatStream.addObserver(obs0) + + subscription0.kill() + effects.toList shouldBe range.map(i => Effect("obs0", i * 3)) + } + + it("sync map-flatten without fromSeq") { + + implicit val owner: Owner = new TestableOwner + + val bus = new EventBus[Int] + val stream = bus.events val flatStream = stream .map { v => - EventStream.fromSeq(Seq(v * 3), emitOnce = true) + EventStream.fromValue(v * 3, emitOnce = true).setDisplayName(s"S-${v}") } - .flatten + .setDisplayName("MO") + .flatten.setDisplayName("FS") val effects = mutable.Buffer[Effect[_]]() - val subscription0 = flatStream.foreach(newValue => effects += Effect("obs0", newValue)) + val obs = Observer[Int](v => effects += Effect("obs0", v)).setDisplayName("obs") + val subscription0 = flatStream.addObserver(obs) + + bus.emit(0) + bus.emit(1) + bus.emit(2) subscription0.kill() - effects.toList shouldBe range.map(i => Effect("obs0", i * 3)) + + effects.toList shouldBe List( + Effect("obs0", 0 * 3), + Effect("obs0", 1 * 3), + Effect("obs0", 2 * 3), + ) } it("sync three-level map-flatten") { diff --git a/src/test/scala/com/raquo/airstream/flatten/SwitchSignalSpec.scala b/src/test/scala/com/raquo/airstream/flatten/SwitchSignalSpec.scala index 8a51e775..4088706f 100644 --- a/src/test/scala/com/raquo/airstream/flatten/SwitchSignalSpec.scala +++ b/src/test/scala/com/raquo/airstream/flatten/SwitchSignalSpec.scala @@ -209,30 +209,32 @@ class SwitchSignalSpec extends UnitSpec { // - fromSeq streams are used to ensure that onStart isn't called extraneously // - bus.events streams are used to ensure that onStop isn't called extraneously - val outerBus = new EventBus[Int] + val outerBus = new EventBus[Int].setDisplayName("outerBus") - val smallBus = new EventBus[String] + val smallBus = new EventBus[String].setDisplayName("smallBus") - val bigBus = new EventBus[String] + val bigBus = new EventBus[String].setDisplayName("bigBus") val smallSignal = EventStream.merge( - smallBus.events, - EventStream.fromSeq("small-1" :: "small-2" :: Nil, emitOnce = true) - ).startWith("small-0") + smallBus.events.setDisplayName("smallBus.events"), + EventStream.fromSeq("small-1" :: "small-2" :: Nil, emitOnce = true).setDisplayName("smallSeq") + ).setDisplayName("smallMerged").startWith("small-0").setDisplayName("smallSignal") val bigSignal = EventStream.merge( - bigBus.events, - EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true) - ).startWith("big-0") + bigBus.events.setDisplayName("bigBus.events"), + EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true).setDisplayName("bigSeq") + ).setDisplayName("bigMerged").startWith("big-0").setDisplayName("bigSignal") - val flatSignal = outerBus.events.startWith(0).flatMap { + val flatSignal = outerBus.events.setDisplayName("outerBus.events").startWith(0).setDisplayName("outerBus.signal").map { case i if i >= 10 => bigSignal case _ => smallSignal - }.map(Calculation.log("flat", calculations)) + }.setDisplayName("outerBus.meta").flatten.setDisplayName("flatSignal").map(Calculation.log("flat", calculations)) // -- - flatSignal.addObserver(Observer.empty) + val emptyObs = Observer.empty.setDisplayName("emptyObs") + + flatSignal.addObserver(emptyObs) assert(calculations.toList == List( Calculation("flat", "small-0"), diff --git a/src/test/scala/com/raquo/airstream/flatten/SwitchStreamSpec.scala b/src/test/scala/com/raquo/airstream/flatten/SwitchStreamSpec.scala index 5d651394..44318b00 100644 --- a/src/test/scala/com/raquo/airstream/flatten/SwitchStreamSpec.scala +++ b/src/test/scala/com/raquo/airstream/flatten/SwitchStreamSpec.scala @@ -431,7 +431,7 @@ class SwitchStreamSpec extends UnitSpec { implicit val owner: TestableOwner = new TestableOwner - val outerBus = new EventBus[Int] + val outerBus = new EventBus[Int].setDisplayName("outer-bus") val calculations = mutable.Buffer[Calculation[String]]() @@ -439,28 +439,32 @@ class SwitchStreamSpec extends UnitSpec { // - fromSeq streams are used to ensure that onStart isn't called extraneously // - bus.events streams are used to ensure that onStop isn't called extraneously - val smallBus = new EventBus[String] + val smallBus = new EventBus[String].setDisplayName("small-bus") val smallStream = EventStream.merge( - smallBus.events, - EventStream.fromSeq("small-1" :: "small-2" :: Nil, emitOnce = true) - ) + smallBus.events.setDisplayName("small-bus-events"), + EventStream.fromSeq("small-1" :: "small-2" :: Nil, emitOnce = true).setDisplayName("small-fromSeq") + ).setDisplayName("small-M") - val bigBus = new EventBus[String] + val bigBus = new EventBus[String].setDisplayName("big-bus") val bigStream = EventStream.merge( - bigBus.events, - EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true) - ) + bigBus.events.setDisplayName("big-bus-events"), + EventStream.fromSeq("big-1" :: "big-2" :: Nil, emitOnce = true).setDisplayName("big-fromSeq") + ).setDisplayName("big-M") - val flatStream = outerBus.events.startWith(0).flatMap { + val flatStream = outerBus.events.setDisplayName("outer-bus-events").startWith(0).setDisplayName("outer-signal").map { case i if i >= 10 => bigStream case _ => smallStream - }.map(Calculation.log("flat", calculations)) + }.setDisplayName("outer-meta") + .flatten.setDisplayName("outer-flat") + .map(Calculation.log("flat", calculations)).setDisplayName("outer-flat-map") // -- - flatStream.addObserver(Observer.empty) + val emptyObserver = Observer.empty.setDisplayName("emptyObserver") + + flatStream.addObserver(emptyObserver) assert(calculations.toList == List( Calculation("flat", "small-1"),