Skip to content

Commit

Permalink
Fix: When starting observers inside Transaction.onStart.shared block,…
Browse files Browse the repository at this point in the history
… delay transactions until after the block has run. In practice, this makes sure all observers added in that block will see events emitted by observables onStart. Fixes #111.
  • Loading branch information
raquo committed Nov 20, 2023
1 parent 08c134b commit 9acbea5
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 76 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ println(s"After set: ${myVar.now()}")
myVar.update(_ + 1)
println(s"After update: ${myVar.now()}")

new Transaction { _ =>
Transaction {
println(s"After trx: ${myVar.now()}")
}

Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/com/raquo/airstream/combine/MergeStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ class MergeStream[A](
* produce unexpected results.
*/
override private[airstream] def syncFire(transaction: Transaction): Unit = {
//println(s"${this} syncFire in $transaction")
while (pendingParentValues.nonEmpty) {
val nextValue = pendingParentValues.dequeue().value
if (lastFiredInTrx.contains(transaction)) {
//println("- syncFire in new trx")
nextValue.fold(
nextError => new Transaction(fireError(nextError, _)),
nextEvent => new Transaction(fireValue(nextEvent, _))
)
} else {
lastFiredInTrx = transaction
//println("- syncFire in same trx")
nextValue.fold(
fireError(_, transaction),
fireValue(_, transaction)
Expand Down
119 changes: 94 additions & 25 deletions src/main/scala/com/raquo/airstream/core/Transaction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,30 @@ import scala.scalajs.js
/** @param code Note: Must not throw! */
class Transaction(private[Transaction] var code: Transaction => Any) {

// val id: Int = Transaction.nextId()
// val id = Transaction.nextId()

//println(s" - create trx $id")
//println(s"--CREATE Trx@${id}")

//override def toString: String = s"Trx@${id}"

/** Priority queue of pending observables: sorted by their topoRank.
*
* Corollary: An Observable that is dequeue-d from here does not synchronously depend on any other pending observables
*/
private[this] var maybePendingObservables: js.UndefOr[JsPriorityQueue[SyncObservable[_]]] = js.undefined

Transaction.pendingTransactions.add(this)
if (Transaction.onStart.isSharedStart) {
// This delays scheduling transactions until the end of
// the shared start transaction
//println(s">>> onStart.postStartTransactions.push($this)")
Transaction.onStart.postStartTransactions.push(this)
} else {
//println(s">>> Transaction.pendingTransactions.add($this)")
Transaction.pendingTransactions.add(this)
}

@inline private[Transaction] def resolvePendingObservables(): Unit = {
//println(s"$this resolvePendingObservables (n=${maybePendingObservables.map(_.size).getOrElse(0)})")
maybePendingObservables.foreach { pendingObservables =>
while (pendingObservables.nonEmpty) {
//dom.console.log("RANKS: ", pendingObservables.debugQueue.map(_.topoRank))
Expand All @@ -48,6 +59,16 @@ class Transaction(private[Transaction] var code: Transaction => Any) {

object Transaction {

/** Create new transaction (typically used in internal observable code) */
def apply(code: Transaction => Unit): Unit = new Transaction(code)

/** Create new transaction (typically used in end user code).
*
* Warning: It is rare that you need to manually create transactions.
* Example of legitimate use case: [[https://github.com/raquo/Airstream/#var-transaction-delay Var transaction delay]]
*/
def apply(code: => Unit): Unit = new Transaction(_ => code)

/** This object holds a queue of callbacks that should be executed
* when all observables finish starting. This lets `signal.changes`
* streams emit the updated signal's value when restarting, in such
Expand All @@ -64,10 +85,19 @@ object Transaction {
*/
object onStart {

private var level: Int = 0
private[Transaction] var isSharedStart: Boolean = false

private val pendingCallbacks: JsArray[Transaction => Unit] = JsArray()

private[Transaction] val postStartTransactions: JsArray[Transaction] = JsArray()

private[Transaction] var isResolving = false

// #nc just add a default value to `when` param. Keeping this method for binary compat for now.
def shared[A](code: => A): A = {
shared(code, when = true)
}

/* Put the code that (potentially) adds more than one observer inside.
* If that code causes `signal.changes` to restart (and emit the signal's
* updated value), this event will be delayed until the rest of your code
Expand Down Expand Up @@ -95,35 +125,57 @@ object Transaction {
*
* See https://github.com/raquo/Airstream/#restarting-streams-that-depend-on-signals--signalchanges-
*/
def shared[A](code: => A): A = {
level += 1
val result = try {
def shared[A](code: => A, when: Boolean): A = {
if (isSharedStart || !when) {
// - We are already executing inside the `code` argument passed
// to another onStart.shared block, so adding another try-catch
// block is not necessary: that other block will take care of it.
// - Or, the caller explicitly doesn't want a shared block now (!when)
code
} finally {
level -= 1
if (level == 0) {
} else {
//println("> START SHARED")
isSharedStart = true
val result = try {
code
} finally {
isSharedStart = false
resolve()
}
//println("< END SHARED")
result
}
result
}

/** Add a callback to execute once the new shared transaction gets executed.
*
* @param callback - Must not throw!
*/
def add(callback: Transaction => Unit): Unit = {
//println(s"// add callback ${callback.hashCode()}")
pendingCallbacks.push(callback)
}

private def resolve(): Unit = {
if (pendingCallbacks.length > 0) {
new Transaction(trx => {
if (pendingCallbacks.length == 0) {
//println("- no pending callbacks")
if (postStartTransactions.length > 0) {
//println(s"> CREATE ALT RESOLVE TRX. Num trx-s = ${postStartTransactions.length}")
Transaction {
while (postStartTransactions.length > 0) {
pendingTransactions.add(postStartTransactions.shift())
}
}
}
} else {
//println(s"> CREATE RESOLVE TRX. Num callbacks = ${pendingCallbacks.length}")
Transaction { trx =>
// #TODO[Integrity] What if calling callback(trx) calls onStart.add?
// Is it ok to put it into the same list, or should it go into a new list,
// to be executed in a separate transaction?
isResolving = true
while (pendingCallbacks.length > 0) {
val callback = pendingCallbacks.pop()
val callback = pendingCallbacks.shift()
// println(s"// resolve callback ${callback.hashCode()}")
try {
callback(trx)
} catch {
Expand All @@ -132,14 +184,29 @@ object Transaction {
AirstreamError.sendUnhandledError(err)
}
}
})
isResolving = false
// println("// resolved any callbacks")
// Any transactions created during the shared start
// that weren't converted to callbacks, will now be
// scheduled, and will be executed after this shared
// transaction finishes (they are marked as its
// children), in the same order as they were created.
// println(s"postStartTransactions.length = ${postStartTransactions.length}")
while (postStartTransactions.length > 0) {
val _trx = postStartTransactions.shift()
// println(s"- pendingTransactions.add(${t})")
pendingTransactions.add(_trx)
}
}
}
}
}

private object pendingTransactions {

/** first transaction is the top of the stack, currently running */
/** First transaction is the top of the stack, currently running.
* That transaction's parent transaction is the second item, and so on.
*/
private val stack: JsArray[Transaction] = JsArray()

private val children: JsMap[Transaction, JsArray[Transaction]] = new JsMap()
Expand All @@ -159,7 +226,7 @@ object Transaction {
// Consider this later when I have moer comprehensive benchmarks.
pushToStack(newTransaction)
run(newTransaction)
}{ currentTransaction =>
} { currentTransaction =>
enqueueChild(parent = currentTransaction, newChild = newTransaction)
}
}
Expand All @@ -178,7 +245,7 @@ object Transaction {

putNextTransactionOnStack(doneTransaction = transaction)

transaction.code = throwDeadTrxError // stop holding up `trx` contents in memory
transaction.code = throwDeadTrxError // stop holding up `trx` contents in memory

peekStack().fold {
if (children.size > 0) {
Expand All @@ -187,7 +254,7 @@ object Transaction {
children.forEach((transactions, _) => numChildren += transactions.length)
throw new Exception(s"Transaction queue error: Stack cleared, but a total of ${numChildren} children for ${children.size} transactions remain. This is a bug in Airstream.")
}
}{ nextTransaction =>
} { nextTransaction =>
run(nextTransaction)
}
}
Expand All @@ -204,11 +271,9 @@ object Transaction {
peekStack().foreach { parentTransaction =>
putNextTransactionOnStack(doneTransaction = parentTransaction)
}
}{ nextChild =>
} { nextChild =>
// Found a child transaction, so put it on the stack, so that it wil run next.
// Once that child is all done, it will be popped from the stack, and we will
//
//
pushToStack(nextChild)
}
}
Expand Down Expand Up @@ -263,10 +328,13 @@ object Transaction {

private[core] def isClearState: Boolean = pendingTransactions.isClearState

//private var maybeCurrentTransaction: js.UndefOr[Transaction] = js.undefined

private[airstream] def currentTransaction(): js.UndefOr[Transaction] = pendingTransactions.peekStack()

private def run(transaction: Transaction): Unit = {
//println(s"--start trx ${transaction.id}")
//println(s"--START ${transaction}")
//maybeCurrentTransaction = transaction
try {
transaction.code(transaction)
transaction.resolvePendingObservables()
Expand All @@ -278,11 +346,12 @@ object Transaction {
} finally {
// @TODO[API,Integrity]
// This block is executed regardless of whether an exception is thrown in `code` or not,
// but it doesn't actually catch the exception, so `new Transaction(code)` actually throws
// but it doesn't actually catch the exception, so `Transaction(code)` actually throws
// iff `code` throws AND the transaction was created while no other transaction is running
// This is not very predictable, so we should fix it.
//println(s"--end trx ${transaction.id}")
//println(s"--END ${transaction}")
pendingTransactions.done(transaction)
//maybeCurrentTransaction = js.undefined
}
}

Expand Down
21 changes: 11 additions & 10 deletions src/main/scala/com/raquo/airstream/core/WritableObservable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,14 @@ trait WritableObservable[A] extends Observable[A] {
protected val internalObservers: ObserverList[InternalObserver[A]] = new ObserverList(JsArray())

override def addObserver(observer: Observer[A])(implicit owner: Owner): Subscription = {
// #nc[doc] - document this onstart.shared mechanism, both in code and in the real docs.
// - also for extenders: this must be called by your observable also if it can get started without external observers downstream (basically, it shouldn't).
Transaction.onStart.shared {
// println(s"// ${this} addObserver ${observer}")
Transaction.onStart.shared({
maybeWillStart()
val subscription = addExternalObserver(observer, owner)
onAddedExternalObserver(observer)
maybeStart()
subscription
}
}, when = !isStarted)
}

/** Subscribe an external observer to this observable */
Expand All @@ -76,12 +75,14 @@ trait WritableObservable[A] extends Observable[A] {
*/
override protected[airstream] def addInternalObserver(observer: InternalObserver[A], shouldCallMaybeWillStart: Boolean): Unit = {
//println(s"$this > aio shouldCallMaybeWillStart=$shouldCallMaybeWillStart")
if (!isStarted && shouldCallMaybeWillStart) {
maybeWillStart()
}
//println(s"$this < aio")
internalObservers.push(observer)
maybeStart()
Transaction.onStart.shared({
if (!isStarted && shouldCallMaybeWillStart) {
maybeWillStart()
}
//println(s"$this < aio")
internalObservers.push(observer)
maybeStart()
}, when = !isStarted)
}

/** Child observable should call parent.removeInternalObserver(childInternalObserver) when it is stopped.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class ConcurrentStream[A](
case signal: Signal[EventStream[A @unchecked] @unchecked] =>
signal.tryNow() match {
case Success(stream) =>
// We add internal observer later, in `onStart`. onWillStart should not start any observables. // #nc[doc] this pattern
// We add internal observer later, in `onStart`. onWillStart should not start any observables.
// #TODO[Doc] Document this pattern ^^^.
maybeAddStream(stream, addInternalObserver = false)
case _ => ()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SwitchSignal[A](

private[this] val internalEventObserver: InternalObserver[A] = InternalObserver.fromTry[A](
onTry = (nextTry, _) => {
//println(s"> init trx from SwitchSignal.onValue($nextTry)")
//println(s"> init trx from $this SwitchSignal.onValue($nextTry)")
innerSignalLastSeenUpdateId = Protected.lastUpdateId(currentSignalTry.get)
new Transaction(fireTry(nextTry, _))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ class DynamicSubscription private (

private[ownership] def onActivate(owner: Owner): Unit = {
//println(s" - activating $this")
// I don't think Laminar itself needs onStart.shared here, this is for users' custom dynamic subscriptions.
// Honestly this might be overkill, but I think this is cheap, and diagnosing these kinds of bugs is expensive.
Transaction.onStart.shared {
maybeCurrentSubscription = activate(owner)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class JsPromiseSignal[A](promise: js.Promise[A]) extends WritableSignal[Option[A
}

private def onPromiseResolved(nextPromiseValue: Try[A]): Unit = {
// #nc doc this about onWillStart
// #TODO[Doc] Document this about onWillStart
// #Note Normally onWillStart must not create transactions / emit values, but this is ok here
// because this callback is always called asynchronously, so any value will be emitted from here
// long after the onWillStart / onStart chain has finished.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ class PullResetSignalSpec extends UnitSpec {
$combined.addObserver(Observer.empty)

log.toList shouldBe List(
"-4 isEven = true",
"-4 isPositive = false",
"-4 isEven = true",
"-4 isPositive = false, isEven = true"
)
log.clear()
Expand Down Expand Up @@ -343,13 +343,13 @@ class PullResetSignalSpec extends UnitSpec {

v.set(-2)

// #Warning This is a known glitch: of all the new observers, only the first one
// manages to receive the sync event from `changes`. This is because the whole
// onWillStart / onStart loop completes before the other observers are added,
// and even though the event is emitted in a new Transaction, in this case, the
// transaction payload is executed immediately without delay, because there is
// #Warning This is a known glitch with a known workaround: of all the new observers,
// only the first one manages to receive the sync event from `changes`. This is
// because the whole onWillStart / onStart loop completes before the other observers
// are added, and even though the event is emitted in a Transaction, in this case,
// the transaction payload is executed immediately without delay, because there is
// no current transaction that we need to wait for.
// - This glitch can be avoided by wrapping subs creation in `new Transaction`,
// - This glitch can be avoided by wrapping subs creation in `Transaction`,
// `DynamicSubscription`, or `Transaction.onStart.shared` – see the tests below

val subs2 = List(
Expand Down
Loading

0 comments on commit 9acbea5

Please sign in to comment.