Skip to content

Commit

Permalink
Fix: split child signal does not sync with parent signal on restart. F…
Browse files Browse the repository at this point in the history
…ixes #120
  • Loading branch information
raquo committed Feb 26, 2024
1 parent dc6cb03 commit 57b0938
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ trait SingleParentSignal[I, O] extends WritableSignal[O] with InternalTryObserve

/** Note: this is overriden in:
* - [[com.raquo.airstream.misc.SignalFromStream]] because parent can be stream, and it has cacheInitialValue logic
* - [[com.raquo.airstream.split.SplitChildSignal]] because its parent is a special timing stream, not the real parent
*/
override protected def onWillStart(): Unit = {
// println(s"${this} > onWillStart")
Expand All @@ -37,6 +38,9 @@ trait SingleParentSignal[I, O] extends WritableSignal[O] with InternalTryObserve
setCurrentValue(nextValue)
}

/** Note: this is overridden in:
* - [[com.raquo.airstream.split.SplitChildSignal]] because its parent is a special timing stream, not the real parent
*/
override protected def onTry(nextParentValue: Try[I], transaction: Transaction): Unit = {
if (parentIsSignal) {
_parentLastUpdateId = Protected.lastUpdateId(parent.asInstanceOf[Signal[_]])
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/com/raquo/airstream/core/WritableSignal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ trait WritableSignal[A] extends Signal[A] with WritableObservable[A] {
override protected[airstream] def tryNow(): Try[A] = {
// println(s"${this} > tryNow (maybeLastSeenCurrentValue = ${maybeLastSeenCurrentValue})")
maybeLastSeenCurrentValue.getOrElse {
// #TODO[Integrity] I'm not sure if updating `_lastUpdateId` here is right.
// - I expected `0` to indicate "initial value" (no events emitted yet),
// as mentioned in Signal.nextUpdateId, but it seems that it's only `0`
// until the signal's initial value is evaluated, then we increment it here.
// - I'm not sure if this is done on purpose or not, it's possible that the
// comment is incorrect. Either way, need to figure this out some time.
// - Currently, tests pass either way, but it's hard to tell definitely
// if this change would really be ok.
_lastUpdateId = Signal.nextUpdateId()
val nextValue = currentValueFromParent()
setCurrentValue(nextValue)
Expand Down
45 changes: 41 additions & 4 deletions src/main/scala/com/raquo/airstream/split/SplitChildSignal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ import scala.util.{Success, Try}
* still propagating, it would ALSO cause this signal to fire that event – which is the same as its initial value. This
* duplicity is undesirable of course, so we take care of it by dropping the first event after initialization IF
* it happens during the transaction in which this signal was initialized.
*
* @param getMemoizedValue get the latest memoized value and its corresponding parentLastUpdateId.
*/
private[airstream] class SplitChildSignal[M[_], A](
override protected[this] val parent: SyncDelayStream[M[A]],
initial: A,
getMemoizedValue: () => Option[A]
initialParentLastUpdateId: Int,
getMemoizedValue: () => Option[(A, Int)]
) extends SingleParentSignal[M[A], A] with InternalTryObserver[M[A]] {

/** Note: initial value is not an "event" and is not "emitted",
* so its propagation when starting the signal does not count here.
*/
private var hasEmittedEvents = false

private var maybeInitialTransaction: js.UndefOr[Transaction] = Transaction.currentTransaction()
Expand All @@ -31,20 +37,51 @@ private[airstream] class SplitChildSignal[M[_], A](

override protected val topoRank: Int = Protected.topoRank(parent) + 1

// onWillStart & currentValueFromParent:
// If this child signal is started immediately after it's first initialized,
// for example if we add an observer to it in the split operator's render callback,
// then while it's being started, its initial value was not memoized yet,
// so we pull it via the special channel (`initial` and `initialLastParentUpdateId`)

override protected def onWillStart(): Unit = {
// Sync to parent signal. This is similar to standard SingleParentSignal logic,
// except `val parent` is a special timing stream, not the real parent signal,
// so we need to ge the parent's value and lastUpdateId in a special manner.
// dom.console.log(s"${this} > onWillStart")
Protected.maybeWillStart(parent)
// dom.console.log(s" getMemoizedValue() = ${getMemoizedValue()} ")
val newParentLastUpdateId = getMemoizedValue().map(_._2).getOrElse(initialParentLastUpdateId)
if (newParentLastUpdateId != _parentLastUpdateId) {
// Note: We only update the value and the parent update id on re-start if
// the parent has updated while this signal was stopped.
// Note that there is no deduplication at this stage. The typical distinctCompose
// filtering is applied LATER, on top of this child signal's output.
updateCurrentValueFromParent()
_parentLastUpdateId = newParentLastUpdateId
}
}

override protected def currentValueFromParent(): Try[A] = {
// dom.console.log(s"$this -> currentValueFromParent")
// #TODO[Sync] is this right, or is this right only in the context of Laminar usage of split?
// #Note See also SignalFromStream for similar logic
// #Note This can be called from inside tryNow(), so make sure to avoid an infinite loop
if (maybeLastSeenCurrentValue.nonEmpty && hasEmittedEvents) {
tryNow()
val m = getMemoizedValue()
// dom.console.log(s" = $m (memoized)")
// #Note memoized value should always be available at this point. It's only unavailable
// under very specific conditions (see comment above), and we don't call this method
// in those conditions.
Success(getMemoizedValue().get._1)
} else {
// dom.console.log(s" = $initial (initial)")
Success(initial)
}
}

override protected def onTry(nextParentValue: Try[M[A]], transaction: Transaction): Unit = {
super.onTry(nextParentValue, transaction)
getMemoizedValue().foreach { freshMemoizedInput =>
getMemoizedValue().foreach { case (freshMemoizedInput, lastParentUpdateId) =>
_parentLastUpdateId = lastParentUpdateId
// #Note I do think we want to compare both `None` and `Some` cases of maybeTransaction.
// I'm not sure if None is possible, but if it is, this is probably the right thing to do.
// I think None might be possible when evaluating this signal's initial value when starting it
Expand Down
8 changes: 5 additions & 3 deletions src/main/scala/com/raquo/airstream/split/SplitSignal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ class SplitSignal[M[_], Input, Output, Key](

override protected def currentValueFromParent(): Try[M[Output]] = parent.tryNow().map(memoizedProject)

private[this] val memoized: mutable.Map[Key, (Input, Output)] = mutable.Map.empty
/** key -> (inputValue, outputValue, lastParentUpdateId) */ // #nc
private[this] val memoized: mutable.Map[Key, (Input, Output, Int)] = mutable.Map.empty

override protected def onTry(nextParentValue: Try[M[Input]], transaction: Transaction): Unit = {
super.onTry(nextParentValue, transaction)
Expand Down Expand Up @@ -101,7 +102,8 @@ class SplitSignal[M[_], Input, Output, Key](
new SplitChildSignal[M, Input](
sharedDelayedParent,
initialInput,
() => memoized.get(memoizedKey).map(_._1)
initialParentLastUpdateId = Protected.lastUpdateId(parent),
() => memoized.get(memoizedKey).map(t => (t._1, t._3))
)
)

Expand All @@ -111,7 +113,7 @@ class SplitSignal[M[_], Input, Output, Key](
}

// Cache this key for the first time, or update the input so that inputSignal can fetch it
memoized.update(memoizedKey, (nextInput, nextOutput))
memoized.update(memoizedKey, (nextInput, nextOutput, Protected.lastUpdateId(parent)))

nextOutput
})
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/com/raquo/airstream/core/GlitchSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ class GlitchSpec extends UnitSpec {
// once for a given observable before it finally starts. This was because maybeWillStart
// used to check (!isStarted) condition, which only becomes true AFTER onWillStart of the
// observable that triggered the willStart chain has finished. However, apparently it was
// possible for willStart to trigger addition of external observer (seee inner-signal),
// possible for willStart to trigger addition of external observer (see inner-signal),
// which would again call up the onWillStart chain on its parents, and if the two observables
// share parents, that means the same observables would have onWillStart executed on them
// again, which they don't expect.
Expand Down
170 changes: 169 additions & 1 deletion src/test/scala/com/raquo/airstream/misc/SplitSignalSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.raquo.airstream.UnitSpec
import com.raquo.airstream.core.{Observer, Signal, Transaction}
import com.raquo.airstream.eventbus.EventBus
import com.raquo.airstream.fixtures.{Effect, TestableOwner}
import com.raquo.airstream.ownership.{DynamicOwner, DynamicSubscription, Subscription}
import com.raquo.airstream.ownership.{DynamicOwner, DynamicSubscription, ManualOwner, Subscription}
import com.raquo.airstream.split.DuplicateKeysConfig
import com.raquo.airstream.state.Var
import org.scalatest.{Assertion, BeforeAndAfter}
Expand Down Expand Up @@ -1174,4 +1174,172 @@ class SplitSignalSpec extends UnitSpec with BeforeAndAfter {

val _ = bus.events.split(_ => ())((_, _, _) => 100).map(_.getOrElse(0))
}

it("child split signal re-syncs with parent signal") {
// https://github.com/raquo/Airstream/issues/120

val owner = new ManualOwner

val foosVar = Var[List[Foo]](Nil)

var ownersById = Map[String, ManualOwner]()
var fooSById = Map[String, Signal[Foo]]()
var mapFooSById = Map[String, Signal[Foo]]()

val splitSignal = foosVar.signal.split(_.id)((id, _, fooS) => {
ownersById.get(id).foreach(_.killSubscriptions())

val newOwner = new ManualOwner
ownersById = ownersById.updated(id, newOwner)

fooSById = fooSById.updated(id, fooS)

val mapFooS = foosVar.signal.map(_.find(_.id == id).get)
mapFooSById = mapFooSById.updated(id, mapFooS)
})

// --

val splitSub = splitSignal.addObserver(Observer.empty)(owner)

assert(ownersById.isEmpty)
assert(fooSById.isEmpty)
assert(mapFooSById.isEmpty)

// --

foosVar.set(Foo("a", 1) :: Nil)

val owner_A = ownersById("a")
val fooS_A = fooSById("a")
val mapFooS_A = mapFooSById("a")

val fooS_A_observed_1 = fooS_A.observe(owner)
val mapFooS_A_observed_1 = mapFooS_A.observe(owner)

foosVar.set(Foo("a", 2) :: Foo("b", 1) :: Nil)

assert(ownersById("a") eq owner_A)
assert(fooSById("a") eq fooS_A)
assert(mapFooSById("a") eq mapFooS_A)

assert(fooS_A_observed_1.now() == Foo("a", 2))
assert(mapFooS_A_observed_1.now() == Foo("a", 2))

// --

fooS_A_observed_1.killOriginalSubscription()
mapFooS_A_observed_1.killOriginalSubscription()

foosVar.set(Foo("b", 1) :: Foo("a", 3) :: Nil)

foosVar.set(Foo("a", 4) :: Nil)

assert(ownersById("a") eq owner_A)
assert(fooSById("a") eq fooS_A)
assert(mapFooSById("a") eq mapFooS_A)

// --

val fooS_A_observed_2 = fooS_A.observe(owner)
val mapFooS_A_observed_2 = mapFooS_A.observe(owner)

assert(fooS_A_observed_2.now() == Foo("a", 4))
assert(mapFooS_A_observed_2.now() == Foo("a", 4))

// --

foosVar.set(Foo("a", 5) :: Nil)

assert(ownersById("a") eq owner_A)
assert(fooSById("a") eq fooS_A)
assert(mapFooSById("a") eq mapFooS_A)

assert(fooS_A_observed_2.now() == Foo("a", 5))
assert(mapFooS_A_observed_2.now() == Foo("a", 5))
}

it("child split signal re-syncs with parent stream") {
// https://github.com/raquo/Airstream/issues/120

val owner = new ManualOwner

val foosVar = new EventBus[List[Foo]]

var ownersById = Map[String, ManualOwner]()
var fooSById = Map[String, Signal[Foo]]()
var mapFooSById = Map[String, Signal[Foo]]()

val splitSignal = foosVar.stream.split(_.id)((id, _, fooS) => {
ownersById.get(id).foreach(_.killSubscriptions())

val newOwner = new ManualOwner
ownersById = ownersById.updated(id, newOwner)

fooSById = fooSById.updated(id, fooS)

val mapFooS = foosVar.stream.startWith(Nil).map(_.find(_.id == id).get)
mapFooSById = mapFooSById.updated(id, mapFooS)
})

// --

val splitSub = splitSignal.addObserver(Observer.empty)(owner)

assert(ownersById.isEmpty)
assert(fooSById.isEmpty)
assert(mapFooSById.isEmpty)

// --

foosVar.emit(Foo("a", 1) :: Nil)

val owner_A = ownersById("a")
val fooS_A = fooSById("a")
val mapFooS_A = mapFooSById("a")

val fooS_A_observed_1 = fooS_A.observe(owner)
val mapFooS_A_observed_1 = mapFooS_A.observe(owner)

foosVar.emit(Foo("a", 2) :: Foo("b", 1) :: Nil)

assert(ownersById("a") eq owner_A)
assert(fooSById("a") eq fooS_A)
assert(mapFooSById("a") eq mapFooS_A)

assert(fooS_A_observed_1.now() == Foo("a", 2))
assert(mapFooS_A_observed_1.now() == Foo("a", 2))

// --

fooS_A_observed_1.killOriginalSubscription()
mapFooS_A_observed_1.killOriginalSubscription()

foosVar.emit(Foo("b", 1) :: Foo("a", 3) :: Nil)

foosVar.emit(Foo("a", 4) :: Nil)

assert(ownersById("a") eq owner_A)
assert(fooSById("a") eq fooS_A)
assert(mapFooSById("a") eq mapFooS_A)

// --

val fooS_A_observed_2 = fooS_A.observe(owner)
val mapFooS_A_observed_2 = mapFooS_A.observe(owner)

assert(fooS_A_observed_2.now() == Foo("a", 4))
assert(mapFooS_A_observed_2.now() == Foo("a", 2)) // this is based on stream so it can't actually re-sync

// --

foosVar.emit(Foo("a", 5) :: Nil)

assert(ownersById("a") eq owner_A)
assert(fooSById("a") eq fooS_A)
assert(mapFooSById("a") eq mapFooS_A)

assert(fooS_A_observed_2.now() == Foo("a", 5))
assert(mapFooS_A_observed_2.now() == Foo("a", 5))
}
}

0 comments on commit 57b0938

Please sign in to comment.