Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#235 flatMapLatest adding overlapping option #236

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

32bitkid
Copy link
Contributor

passing an options object of {overlapping:true} will cause the next stream to be added before any old streams are removed.

passing an options object of {overlapping:true} will cause the next stream to be added before any old streams are removed.
this._addToCur(toObs(obj));
while (this._curSources.length > this._concurLim) {
this._removeOldest();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this loop seemed a little weird when I was working on it, however, I think to have correct parity with the other side of the condition it needs to be here. Given that this._curSources.length - this._concurLim > 1, the other side, implicitly loops due to recursion. Since we want to add first, using _addToCur, then we need to make sure that we aren't over the _concurLim and blindly removing one might be not enough. Although, I'm not sure how an AbstractPool could get into that state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should just remove one. Yes, there is a recursion on the other side, but it should always do only one iteration because:

  1. We always expect this condition to be true: this._curSources.length <= this._concurLim.
  2. When we get to this branch of the code this._curSources.length === this._concurLim should be true.
  3. We remove one from this._curSources, so now this._curSources.length === this._concurLim - 1.
  4. At the following this._add() call, we fall into branch at line 33. And that is the end.

But more importantly, with the condition you added we might actually remove none of the oldest. After this._addToCur(toObs(obj)) we not necessarily have an observable added to _curSources because if toObs(obj) already anded or ends immediately after we subscribe to it, we don't add it to the _curSources (take a look at _addToCur). Btw, we need a test case for this.

Copy link
Member

@rpominov rpominov Feb 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also maybe it make sense to remove recursion on the other side too.

  this._removeOldest();
- this._add(obj, toObs);
+ this._addToCur(toObs(obj));

This way we get the correct parity.

Copy link
Contributor Author

@32bitkid 32bitkid Feb 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure thats quite equivalent; there may not be a public api to aggravate the condition, but I think that when { concurLim: 1, queueLim: 1, drop: 'old' }, and given that both _curSources and _queue are full and I add another one, I would want to:

  1. drop the eldest item out of _curSources
  2. promote the eldest item from _queue
  3. add_ the new_ stream to the _queue

But, I'll need to think about it more. However, I think the same concern is valid for the overlapping side of the branch. If there is a queue, then one should add to _queue and let _removeOldest pull it out of the queue, but that doesn't really resolve the actual use-case for using overlapping to begin with. I think I need to step back and think about it harder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right! I didn't realize that _removeOldest pulls from the queue. Replacing this._add -> this._addToCur isn't equivalent then indeed. I'll think about it too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't test it, but maybe we could do something like this?

_add(obj, toObs /* Function | falsey */, allowOverflow) {
  toObs = toObs || id;
  if (this._concurLim === -1 || this._curSources.length < this._concurLim) {
    this._addToCur(toObs(obj));
  } else {
    if (this._queueLim === -1 || this._queue.length < this._queueLim || allowOverflow) {
      this._addToQueue(toObs(obj));
    } else if (this._drop === 'old') {
      if (this._overlapping) {
        this._add(obj, toObs, true);
        this._removeOldest();
      } else {
        this._removeOldest();
        this._add(obj, toObs);
      }
    }
  }
}

@rpominov
Copy link
Member

Thank you for the PR! I'll take a look tomorrow. And sorry for the delays, quite busy at work.

} else {
this._removeOldest();
this._add(obj, toObs);
this._addToCur(toObs(obj));
Copy link
Contributor Author

@32bitkid 32bitkid Feb 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while cleaner, this doesn't feel right. see my previous comment, I need to think more about this/this is more subtle than I initially projected.

@mAAdhaTTah
Copy link
Collaborator

This would be a nice-to-have. Anything blocking?

@mAAdhaTTah
Copy link
Collaborator

@32bitkid Would love to land this, if possible. There's been some changes to our test suite in particular (it's in JS now woo!), so the .coffee changes would need to be ported over, and there are a couple other conflicts. Is this something you're still interested in pushing?

@rpominov
Copy link
Member

@mAAdhaTTah There are also some unresolved concerns in folded comments.

@32bitkid
Copy link
Contributor Author

@mAAdhaTTah This is a feature that I'd really like to get in as well, but have been swamped with work lately. As @rpominov mentioned, there are some unresolved issues that still need to be addressed/reworked. I'll try to spend some time this weekend on this.

@bpinto
Copy link
Contributor

bpinto commented Aug 18, 2021

This PR is now 4 years old, I've worked on a patch to be applied on the current codebase, and I wonder if I should open a PR with it. What do you think?

diff --git a/src/index.js b/src/index.js
index daf4001..5eaf803 100644
--- a/src/index.js
+++ b/src/index.js
@@ -331,8 +331,9 @@ import FlatMap from './many-sources/flat-map'
 Observable.prototype.flatMap = function(fn) {
   return new FlatMap(this, fn).setName(this, 'flatMap')
 }
-Observable.prototype.flatMapLatest = function(fn) {
-  return new FlatMap(this, fn, {concurLim: 1, drop: 'old'}).setName(this, 'flatMapLatest')
+import flatMapLatest from './many-sources/flat-map-latest'
+Observable.prototype.flatMapLatest = function(...args) {
+  return flatMapLatest(this, ...args);
 }
 Observable.prototype.flatMapFirst = function(fn) {
   return new FlatMap(this, fn, {concurLim: 1}).setName(this, 'flatMapFirst')
diff --git a/src/many-sources/abstract-pool.js b/src/many-sources/abstract-pool.js
index 683314c..19865db 100644
--- a/src/many-sources/abstract-pool.js
+++ b/src/many-sources/abstract-pool.js
@@ -5,12 +5,13 @@ import {concat, forEach, findByPred, find, remove, cloneArray} from '../utils/co
 
 const id = x => x
 
-function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new'} = {}) {
+function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new', overlapping = false} = {}) {
   Stream.call(this)
 
   this._queueLim = queueLim < 0 ? -1 : queueLim
   this._concurLim = concurLim < 0 ? -1 : concurLim
   this._drop = drop
+  this._overlapping = overlapping
   this._queue = []
   this._curSources = []
   this._$handleSubAny = event => this._handleSubAny(event)
@@ -33,8 +34,13 @@ inherit(AbstractPool, Stream, {
       if (this._queueLim === -1 || this._queue.length < this._queueLim) {
         this._addToQueue(toObs(obj))
       } else if (this._drop === 'old') {
-        this._removeOldest()
-        this._add(obj, toObs)
+        if (this._overlapping) {
+          this._addToCur(obj, toObs)
+          this._removeOldest()
+        } else {
+          this._removeOldest()
+          this._add(obj, toObs)
+        }
       }
     }
   },
diff --git a/src/many-sources/flat-map-latest.js b/src/many-sources/flat-map-latest.js
new file mode 100644
index 0000000..1e4e9af
--- /dev/null
+++ b/src/many-sources/flat-map-latest.js
@@ -0,0 +1,13 @@
+import FlatMap from './flat-map'
+
+function flatMapLatest(source, fn, options = {}) {
+  if (fn && typeof fn !== 'function') {
+    options = fn;
+    fn = undefined;
+  }
+
+  const { overlapping = false } = options;
+  return new FlatMap(source, fn, {concurLim: 1, drop: 'old', overlapping }).setName(source, 'flatMapLatest');
+}
+
+export default flatMapLatest
diff --git a/test/specs/flat-map-latest.js b/test/specs/flat-map-latest.js
index 3893b39..9c2c0e1 100644
--- a/test/specs/flat-map-latest.js
+++ b/test/specs/flat-map-latest.js
@@ -1,3 +1,4 @@
+const sinon = require('sinon')
 const {stream, prop, send, value, error, end, activate, deactivate, Kefir, expect} = require('../test-helpers')
 
 describe('flatMapLatest', () => {
@@ -82,6 +83,59 @@ describe('flatMapLatest', () => {
         })
       ).to.emit([value(3), value(4), value(5)], () => send(a, [value(1), value(2), value(3), value(4), value(5)]))
     })
+
+    describe('non-overlapping', () => {
+      it('should remove the previous stream before adding the next', () => {
+        onDeactivate = sinon.spy()
+        a = Kefir.stream(() => onDeactivate)
+        b = stream()
+        map = b.flatMapLatest()
+        activate(map)
+        send(b, [value(a)])
+        send(b, [value(a)])
+        deactivate(map)
+        expect(onDeactivate.callCount).to.equal(2)
+      })
+    })
+
+    describe('overlapping', () => {
+      it('should add the next stream before removing the previous', () => {
+        onDeactivate = sinon.spy()
+        a = stream()
+        b = Kefir.stream(() => onDeactivate)
+        map = a.flatMapLatest({ overlapping: true })
+        activate(map)
+        send(a, [value(b)])
+        send(a, [value(b)])
+        deactivate(map)
+        expect(onDeactivate.callCount).to.equal(1)
+      })
+
+      it('should accept optional map fn', () => {
+        onDeactivate = sinon.spy()
+        a = stream()
+        b = Kefir.stream(() => onDeactivate)
+        map = a.flatMapLatest(x => x.obs, { overlapping: true })
+        activate(map)
+        send(a, [value({ obs: b })])
+        send(a, [value({ obs: b })])
+        deactivate(map)
+        expect(onDeactivate.callCount).to.equal(1)
+      })
+
+      it('should work nicely with Kefir.constant and Kefir.never', () => {
+        const a = stream()
+        expect(
+          a.flatMapLatest(x => {
+            if (x > 2) {
+              return Kefir.constant(x)
+            } else {
+              return Kefir.never()
+            }
+          }, { overlapping: true })
+        ).to.emit([value(3), value(4), value(5)], () => send(a, [value(1), value(2), value(3), value(4), value(5)]))
+      })
+    })
   })
 
   describe('property', () => {

@mAAdhaTTah
Copy link
Collaborator

@bpinto Can't hurt to open a PR!

@bpinto
Copy link
Contributor

bpinto commented Aug 19, 2021

I'll go through the comments on this PR again and see what I can do, I believe I have a better understanding of the feature than I had when I wrote the patch. I'll open a PR then.

bpinto added a commit to bpinto/kefir that referenced this pull request Aug 19, 2021
This will cause the next stream to be added before any old streams are
removed.

```
const channelA = Kefir.stream((emitter) => {
  console.log('connect a');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect a'); clearInterval(id); };
});

const channelB = Kefir.stream((emitter) => {
  console.log('connect b');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect b'); clearInterval(id); };
});

const data = {
  a: channelA,
  b: Kefir.combine([channelA, channelB]),
  c: channelB,
};

Kefir.sequentially(1000, ['a', 'b', 'c', undefined])
  .flatMapLatest(p => p ? data[p] : Kefir.never())
  .log('result');
```

With overlapping option disabled (default):

```
> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect a
> connect a
> connect b
> result <value> [0, 0]
> result <value> [1, 0]
> result <value> [1, 1]
> result <value> [2, 1]
> result <value> [2, 2]
> disconnect a
> disconnect b
> connect b
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect b
> result <end>
```

With overlapping option enabled:

```
> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> connect b
> result <value> [3, 0]
> result <value> [3, 1]
> result <value> [4, 1]
> result <value> [4, 2]
> disconnect a
> result <value> 3
> result <value> 4
> result <value> 5
> disconnect b
> result <end>
```

Closes: kefirjs#235 kefirjs#236
bpinto added a commit to bpinto/kefir that referenced this pull request Aug 19, 2021
This will cause the next stream to be added before any old streams are
removed.

```
const channelA = Kefir.stream((emitter) => {
  console.log('connect a');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect a'); clearInterval(id); };
});

const channelB = Kefir.stream((emitter) => {
  console.log('connect b');
  let count = 0, id = setInterval(() => emitter.value(count++), 250);
  return () => { console.log('disconnect b'); clearInterval(id); };
});

const data = {
  a: channelA,
  b: Kefir.combine([channelA, channelB]),
  c: channelB,
};

Kefir.sequentially(1000, ['a', 'b', 'c', undefined])
  .flatMapLatest(p => p ? data[p] : Kefir.never())
  .log('result');
```

With overlapping option disabled (default):

```
> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect a
> connect a
> connect b
> result <value> [0, 0]
> result <value> [1, 0]
> result <value> [1, 1]
> result <value> [2, 1]
> result <value> [2, 2]
> disconnect a
> disconnect b
> connect b
> result <value> 0
> result <value> 1
> result <value> 2
> disconnect b
> result <end>
```

With overlapping option enabled:

```
> connect a
> result <value> 0
> result <value> 1
> result <value> 2
> connect b
> result <value> [3, 0]
> result <value> [3, 1]
> result <value> [4, 1]
> result <value> [4, 2]
> disconnect a
> result <value> 3
> result <value> 4
> result <value> 5
> disconnect b
> result <end>
```

Closes: kefirjs#235 kefirjs#236
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants