Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add overlapping option to flatMap #320

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

bpinto
Copy link
Contributor

@bpinto bpinto commented Aug 19, 2021

This will cause the next stream to be added before any old streams are removed.

const overlapping = true // or false

const channelA = Kefir.stream((emitter) => {
  console.log('connect a');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect a'); clearInterval(id); };
});

const channelB = Kefir.stream((emitter) => {
  console.log('connect b');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect b'); clearInterval(id); };
});

const data = {
  a: channelA,
  b: Kefir.combine([channelA, channelB]),
  c: channelB,
};

Kefir.sequentially(1000, ['a', 'b', 'c', undefined])
  .flatMapLatest(p => p ? data[p] : Kefir.never(), { overlapping })
  .log('result');

With overlapping option disabled (default):

connect a
result <value> 0
result <value> 1
result <value> 2
disconnect a
connect a
connect b
result <value> [ 0, 0 ]
result <value> [ 1, 0 ]
result <value> [ 1, 1 ]
result <value> [ 2, 1 ]
result <value> [ 2, 2 ]
disconnect a
disconnect b
connect b
result <value> 0
result <value> 1
result <value> 2
disconnect b
result <end>

With overlapping option enabled:

connect a
result <value> 0
result <value> 1
result <value> 2
connect b
result <value> [ 3, 0 ]
result <value> [ 4, 0 ]
result <value> [ 4, 1 ]
result <value> [ 5, 1 ]
result <value> [ 5, 2 ]
result <value> [ 6, 2 ]
disconnect a
result <value> 3
result <value> 4
result <value> 5
result <value> 6
disconnect b
result <end>

Closes: #235 #236

This will cause the next stream to be added before any old streams are
removed.

```
const channelA = Kefir.stream((emitter) => {
  console.log('connect a');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect a'); clearInterval(id); };
});

const channelB = Kefir.stream((emitter) => {
  console.log('connect b');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect b'); clearInterval(id); };
});

const data = {
  a: channelA,
  b: Kefir.combine([channelA, channelB]),
  c: channelB,
};

Kefir.sequentially(1000, ['a', 'b', 'c', undefined])
  .flatMapLatest(p => p ? data[p] : Kefir.never())
  .log('result');
```

With overlapping option disabled (default):

```
> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect a
> connect a
> connect b
> result <value> [0, 0]
> result <value> [1, 0]
> result <value> [1, 1]
> result <value> [2, 1]
> result <value> [2, 2]
> disconnect a
> disconnect b
> connect b
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect b
> result <end>
```

With overlapping option enabled:

```
> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> connect b
> result <value> [3, 0]
> result <value> [3, 1]
> result <value> [4, 1]
> result <value> [4, 2]
> disconnect a
> result <value> 3
> result <value> 4
> result <value> 5
> disconnect b
> result <end>
```

Closes: kefirjs#235 kefirjs#236
@bpinto
Copy link
Contributor Author

bpinto commented Aug 19, 2021

This is my first time diving into kefir codebase so I most likely did something silly while trying to implement this feature. In any case, please let me know what you think!

Observable.prototype.flatMapLatest = function(fn) {
return new FlatMap(this, fn, {concurLim: 1, drop: 'old'}).setName(this, 'flatMapLatest')
Observable.prototype.flatMapLatest = function(fn, options = {}) {
options.concurLim = 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If only we had object rest spread... 😬


describe('overlapping with a concurrency limit that has maxed out', () => {
describe('and with a queue limit', () => {
it('not maxed out, should add to the queue', () => {
Copy link
Contributor Author

@bpinto bpinto Aug 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these "should add to the queue" tests are kinda silly because I can't really confirm that the queue has changed... not sure how to improve these tests.

@@ -148,8 +154,8 @@ inherit(AbstractPool, Stream, {
return index
},

_removeCur(obs) {
if (this._active) {
_removeCur(obs, after) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like passing a boolean, not sure about performance impact, but wouldn't it be better to have two methods?

_removeCur and _removeCurWithDelayedUnsubscription

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

flatMapLatest – add next observable before removing previous
1 participant