Skip to content

Commit

Permalink
Refactor: use .map instead of .forEarch + .push to populate parentObs…
Browse files Browse the repository at this point in the history
…ervers in CombineObservable
  • Loading branch information
raquo committed Nov 25, 2023
1 parent 3a7a58e commit 2c63e72
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ trait CombineObservable[A] extends SyncObservable[A] { this: WritableObservable[
protected[this] def combinedValue: Try[A]

/** Parent observers are not immediately active. onStart/onStop regulates that. */
protected[this] val parentObservers: JsArray[InternalParentObserver[_]] = JsArray()
protected[this] val parentObservers: JsArray[InternalParentObserver[_]]

// @TODO[Elegance] Not a fan of how inputsReady couples this to its subclasses
/** Check whether inputs (parent observables' values) are all available to be combined. */
Expand Down
17 changes: 8 additions & 9 deletions src/main/scala/com/raquo/airstream/combine/CombineSignalN.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@ class CombineSignalN[A, Out](

override protected val topoRank: Int = Protected.maxTopoRank(0, parents) + 1

override protected[this] val parentObservers: JsArray[InternalParentObserver[_]] = {
parents.map { parent =>
InternalParentObserver.fromTry[A](parent, (_, trx) => {
onInputsReady(trx)
})
}
}

override protected[this] def inputsReady: Boolean = true

override protected[this] def combinedValue: Try[Out] = {
CombineObservable.jsArrayCombinator(parents.map(_.tryNow()), combinator)
}

override protected def currentValueFromParent(): Try[Out] = combinedValue

parents.forEach { parent =>
parentObservers.push(
InternalParentObserver.fromTry[A](parent, (_, transaction) => {
onInputsReady(transaction)
})
)
}

}
29 changes: 14 additions & 15 deletions src/main/scala/com/raquo/airstream/combine/CombineStreamN.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ class CombineStreamN[A, Out](

private[this] val maybeLastParentValues: JsArray[js.UndefOr[Try[A]]] = parents.map(_ => js.undefined)

override protected[this] val parentObservers: JsArray[InternalParentObserver[_]] = {
parents.mapWithIndex { (parent, ix) =>
InternalParentObserver.fromTry[A](
parent,
(nextParentValue, trx) => {
maybeLastParentValues.update(ix, nextParentValue)
if (inputsReady) {
onInputsReady(trx)
}
}
)
}
}

override protected[this] def inputsReady: Boolean = {
var allReady: Boolean = true
maybeLastParentValues.forEach { lastValue =>
Expand All @@ -42,19 +56,4 @@ class CombineStreamN[A, Out](
// inputs are ready, otherwise this asInstanceOf will not be safe.
CombineObservable.jsArrayCombinator(maybeLastParentValues.asInstanceOf[JsArray[Try[A]]], combinator)
}

parents.forEachWithIndex { (parent, ix) =>
parentObservers.push(
InternalParentObserver.fromTry[A](
parent,
(nextParentValue, transaction) => {
maybeLastParentValues.update(ix, nextParentValue)
if (inputsReady) {
onInputsReady(transaction)
}
}
)
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import scala.util.Try
*
* Works similar to Rx's "withLatestFrom", except without glitches (see a diamond case test for this in GlitchSpec).
*
* @param sampledSignals - Never update this array - this signal owns it.
*
* @param combinator Note: Must not throw! Must be pure.
* @param sampledSignals Never update this array - this signal owns it.
* @param combinator Note: Must not throw! Must be pure.
*/
class SampleCombineSignalN[A, Out](
samplingSignal: Signal[A],
Expand All @@ -27,38 +26,37 @@ class SampleCombineSignalN[A, Out](

override protected[this] def inputsReady: Boolean = true

override protected[this] val parents: JsArray[Signal[A]] = combineWithArray(
samplingSignal,
sampledSignals
)

override protected[this] def combinedValue: Try[Out] = {
val values = combineWithArray(
samplingSignal.tryNow(),
sampledSignals.map(_.tryNow())
)
CombineObservable.jsArrayCombinator(values, combinator)
override protected[this] val parents: JsArray[Signal[A]] = {
val arr = JsArray(samplingSignal)
sampledSignals.forEach { sampledSignal =>
arr.push(sampledSignal)
}
arr
}

override protected def currentValueFromParent(): Try[Out] = combinedValue

parentObservers.push(
InternalParentObserver.fromTry[A](samplingSignal, (_, transaction) => {
onInputsReady(transaction)
})
)

sampledSignals.forEach { sampledSignal =>
parentObservers.push(
InternalParentObserver.fromTry[A](sampledSignal, (_, _) => {
// Do nothing, we just want to ensure that sampledSignal is started.
override protected[this] val parentObservers: JsArray[InternalParentObserver[_]] = {
val arr = JsArray[InternalParentObserver[_]](
InternalParentObserver.fromTry[A](samplingSignal, (_, trx) => {
onInputsReady(trx)
})
)
sampledSignals.forEach { sampledSignal =>
arr.push(
InternalParentObserver.fromTry[A](sampledSignal, (_, _) => {
// Do nothing, we just want to ensure that sampledSignal is started.
})
)
}
arr
}

private[this] def combineWithArray[V](sampling: V, sampled: JsArray[V]): JsArray[V] = {
val values = sampled.concat() // There's also JsArray.from, but it does not work in IE11
values.unshift(sampling)
values
override protected[this] def combinedValue: Try[Out] = {
val values = JsArray(samplingSignal.tryNow())
sampledSignals.forEach { sampledSignal =>
values.push(sampledSignal.tryNow())
}
CombineObservable.jsArrayCombinator(values, combinator)
}

override protected def currentValueFromParent(): Try[Out] = combinedValue
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,45 +29,44 @@ class SampleCombineStreamN[A, Out](

override protected[this] def inputsReady: Boolean = maybeLastSamplingValue.nonEmpty

override protected[this] val parents: JsArray[Observable[A]] = combineWithArray(
samplingStream,
sampledSignals
)

override protected[this] def combinedValue: Try[Out] = {
val values = combineWithArray(
maybeLastSamplingValue.get,
sampledSignals.map(_.tryNow())
)
CombineObservable.jsArrayCombinator(values, combinator)
override protected[this] val parents: JsArray[Observable[A]] = {
val arr = JsArray[Observable[A]](samplingStream)
sampledSignals.forEach { sampledSignal =>
arr.push(sampledSignal)
}
arr
}

parentObservers.push(
InternalParentObserver.fromTry[A](
samplingStream,
(nextSamplingValue, transaction) => {
maybeLastSamplingValue = nextSamplingValue
onInputsReady(transaction)
}
override protected[this] val parentObservers: JsArray[InternalParentObserver[_]] = {
val arr = JsArray[InternalParentObserver[_]](
InternalParentObserver.fromTry[A](
samplingStream,
(nextSamplingValue, trx) => {
maybeLastSamplingValue = nextSamplingValue
onInputsReady(trx)
}
)
)
)
sampledSignals.forEach { sampledSignal =>
arr.push(
InternalParentObserver.fromTry[A](sampledSignal, (_, _) => {
// Do nothing, we just want to ensure that sampledSignal is started.
})
)
}
arr
}

sampledSignals.forEach { sampledSignal =>
parentObservers.push(
InternalParentObserver.fromTry[A](sampledSignal, (_, _) => {
// Do nothing, we just want to ensure that sampledSignal is started.
})
)
override protected[this] def combinedValue: Try[Out] = {
val values = JsArray(maybeLastSamplingValue.get)
sampledSignals.forEach { sampledSignal =>
values.push(sampledSignal.tryNow())
}
CombineObservable.jsArrayCombinator(values, combinator)
}

override private[airstream] def syncFire(transaction: Transaction): Unit = {
super.syncFire(transaction)
maybeLastSamplingValue = js.undefined // Clean up memory, as we don't need this reference anymore
}

private[this] def combineWithArray[BaseV >: V, V](sampling: BaseV, sampled: JsArray[V]): JsArray[BaseV] = {
val values = sampled.concat[BaseV]() // There's also JsArray.from, but it does not work in IE11
values.unshift(sampling)
values
}
}

0 comments on commit 2c63e72

Please sign in to comment.