Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flatMapLatest – add next observable before removing previous #235

Open
32bitkid opened this issue Feb 15, 2017 · 5 comments · May be fixed by #320
Open

flatMapLatest – add next observable before removing previous #235

32bitkid opened this issue Feb 15, 2017 · 5 comments · May be fixed by #320

Comments

@32bitkid
Copy link
Contributor

32bitkid commented Feb 15, 2017

In AbstractPool there is an option drop that is used by flatMapLatest(), when drop is set to old then the old source is unplugged from the pool before the next source is added:

      /* snip */
      } else if (this._drop === 'old') {
        this._removeOldest();
        this._add(obj, toObs);
      }
      /* snip */

https://github.com/rpominov/kefir/blob/master/src/many-sources/abstract-pool.js#L37

Is there a strong opinion against either a) adding another option, perhaps overlapping, to softly drop the old stream (add the next stream before removing the eldest) or b) making the default behavior for old add the next stream before removing the old stream.

Option (b) seems perhaps a little risky and a breaking change. But having control over this would be beneficial in some cases where you are switching between streams, but don't want them to deactivate it unless they are unused in the next streams activation chain.

Example:

const channelA = Kefir.stream((emitter) => {
  console.log('connect a');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect a'); clearInterval(id); };
});

const channelB = Kefir.stream((emitter) => {
  console.log('connect b');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);  
  return () => { console.log('disconnect b'); clearInterval(id); };
});

const data = {
  a: channelA,
  b: Kefir.combine([channelA, channelB]),
  c: channelB,
};

Kefir.sequentially(1000, ['a', 'b', 'c', undefined])
  .flatMapLatest(p => p ? data[p] : Kefir.never())
  .log('result');

Console:

> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect a
> connect a
> connect b
> result <value> [0, 0]
> result <value> [1, 0]
> result <value> [1, 1]
> result <value> [2, 1]
> result <value> [2, 2]
> disconnect a
> disconnect b
> connect b
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect b
> result <end>

Notice how both a and b disconnect and immediately reconnect on the swap. What, I think, I would like in some situations:

> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> connect b
> result <value> [3, 0]
> result <value> [3, 1]
> result <value> [4, 1]
> result <value> [4, 2]
> disconnect a
> result <value> 3
> result <value> 4
> result <value> 5
> disconnect b
> result <end>

Is there a different way to achieve this behavior without having to alter the behavior of AbstractPool. In my real use case, these Kefir.stream() observables wrap over a socket.io connection, so it's less-than-ideal to tear down the connection between sibling projections, but at the same time, its less-than-ideal to keep the socket going for unused/unneeded projections.

@rpominov
Copy link
Member

rpominov commented Feb 16, 2017

Yeah, that makes sense. Although I'm also worry that option (b) can be too dangerous, it can introduce subtle hard to track changes in behavior in many people's programs.

Seems like nothing stops us from doing option (a) though. We can then also add an argument to flatMapLatest so it would support following signatures:

// supports now
()
(fn: Function)

// new
(fn: Function, overlapping: bool)
(overlapping: bool)

@32bitkid
Copy link
Contributor Author

32bitkid commented Feb 16, 2017 via email

32bitkid pushed a commit to 32bitkid/kefir that referenced this issue Feb 16, 2017
passing an options object of {overlapping:true} will cause the next stream to be added before any old streams are removed.
@polytypic
Copy link

polytypic commented Oct 7, 2018

I just got bitten by this non-overlapping behaviour of flatMapLatest.

I do think that the overlapping behaviour should be the default, because that way combine style semantics can be explained in terms of flatMapLatest semantics (fixed):

const combine = (lhs, rhs) =>
  lhs.flatMapLatest(lhs => rhs.map(rhs => [lhs, rhs]))

Usually when you can reduce the number of primitive operators you end up with simpler and more robust algebraic properties.

The use case I have for the overlapping behaviour (currently) looks like this:

export const apParallel = I.curry(function apParallel(f, x) {
  return flatMapProperty(
    f =>
      hasFailed(f)
        ? f
        : mapProperty(
            x =>
              hasSucceeded(f)
                ? hasSucceeded(x)
                  ? of(result(f)(result(x)), mergeXHRs(0, f, x))
                  : hasFailed(x)
                    ? x
                    : mergeXHRs(-1, f, x)
                : mergeXHRs(-1, f, x),
            x
          ),
    f
  )
})

The idea above is to have an Applicative that performs XHRs in parallel (above, both f and x are XHRs wrapped as observable properties). The reason why the above does not use combine is that using flatMapLatest (followed by toProperty) allows XHRs to be aborted (by unsubscribing from them) in case XHRs fail. (In case of failure, the failure from first (leftmost) failing XHR is reported.)

Is there some use-case or general property that the current non-overlapping behaviour of flatMapLatest provides?

The current behaviour seems like an odd discontinuity in an otherwise continuous function.

In my case, to work around the non-overlapping behaviour, I'm introducing a hacky delayUnsub combinator:

import * as I from 'infestines'
import * as K from 'kefir'

const TIMER = 't'
const SOURCE = 's'
const HANDLER = 'h'

const TYPE = 'type'
const VALUE = 'value'
const END = 'end'

const DelayUnsub = I.inherit(
  function DelayUnsub(source) {
    const self = this
    K.Property.call(self)
    self[SOURCE] = source
    self[HANDLER] = self[TIMER] = 0
  },
  K.Property,
  {
    _onActivation() {
      const self = this
      if (self[TIMER]) {
        clearTimeout(self[TIMER])
        self[TIMER] = 0
      } else {
        self[SOURCE].onAny(
          (self[HANDLER] = e => {
            const t = e[TYPE]
            if (t === VALUE) {
              self._emitValue(e[VALUE])
            } else if (t === END) {
              self._emitEnd()
            } else {
              self._emitError(e[VALUE])
            }
          })
        )
      }
    },
    _onDeactivation() {
      const self = this
      self[TIMER] = setTimeout(() => {
        self[SOURCE].offAny(self[HANDLER])
        self[HANDLER] = this[TIMER] = 0
      }, 0)
    }
  }
)

export const delayUnsub = source => new DelayUnsub(source)

By wrapping the XHR observables with delayUnsub they can then withstand a synchronous subscription discontinuity.

@mAAdhaTTah
Copy link
Collaborator

There's an open PR to implement this as an option #236. If you're interested in picking that up, I'm definitely still interested in seeing it land.

bpinto added a commit to bpinto/kefir that referenced this issue Aug 19, 2021
This will cause the next stream to be added before any old streams are
removed.

```
const channelA = Kefir.stream((emitter) => {
  console.log('connect a');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect a'); clearInterval(id); };
});

const channelB = Kefir.stream((emitter) => {
  console.log('connect b');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect b'); clearInterval(id); };
});

const data = {
  a: channelA,
  b: Kefir.combine([channelA, channelB]),
  c: channelB,
};

Kefir.sequentially(1000, ['a', 'b', 'c', undefined])
  .flatMapLatest(p => p ? data[p] : Kefir.never())
  .log('result');
```

With overlapping option disabled (default):

```
> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect a
> connect a
> connect b
> result <value> [0, 0]
> result <value> [1, 0]
> result <value> [1, 1]
> result <value> [2, 1]
> result <value> [2, 2]
> disconnect a
> disconnect b
> connect b
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect b
> result <end>
```

With overlapping option enabled:

```
> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> connect b
> result <value> [3, 0]
> result <value> [3, 1]
> result <value> [4, 1]
> result <value> [4, 2]
> disconnect a
> result <value> 3
> result <value> 4
> result <value> 5
> disconnect b
> result <end>
```

Closes: kefirjs#235 kefirjs#236
bpinto added a commit to bpinto/kefir that referenced this issue Aug 19, 2021
This will cause the next stream to be added before any old streams are
removed.

```
const channelA = Kefir.stream((emitter) => {
  console.log('connect a');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect a'); clearInterval(id); };
});

const channelB = Kefir.stream((emitter) => {
  console.log('connect b');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect b'); clearInterval(id); };
});

const data = {
  a: channelA,
  b: Kefir.combine([channelA, channelB]),
  c: channelB,
};

Kefir.sequentially(1000, ['a', 'b', 'c', undefined])
  .flatMapLatest(p => p ? data[p] : Kefir.never())
  .log('result');
```

With overlapping option disabled (default):

```
> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect a
> connect a
> connect b
> result <value> [0, 0]
> result <value> [1, 0]
> result <value> [1, 1]
> result <value> [2, 1]
> result <value> [2, 2]
> disconnect a
> disconnect b
> connect b
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect b
> result <end>
```

With overlapping option enabled:

```
> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> connect b
> result <value> [3, 0]
> result <value> [3, 1]
> result <value> [4, 1]
> result <value> [4, 2]
> disconnect a
> result <value> 3
> result <value> 4
> result <value> 5
> disconnect b
> result <end>
```

Closes: kefirjs#235 kefirjs#236
@bpinto bpinto linked a pull request Aug 19, 2021 that will close this issue
@bpinto
Copy link
Contributor

bpinto commented Aug 20, 2021

I've implemented this on #320, please have a look when you have a chance. The console output is slightly different but that's correct IMO as the intervals inside each stream are different.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants