Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible bug: can no longer merge and takeUntil on the same stream #1360

Closed
bgerm opened this issue Feb 18, 2016 · 7 comments
Closed

Possible bug: can no longer merge and takeUntil on the same stream #1360

bgerm opened this issue Feb 18, 2016 · 7 comments

Comments

@bgerm
Copy link

bgerm commented Feb 18, 2016

In RxJS 4 the code below would print 'up' and return to 0, 0 upon releasing the mouse from a drag. In RxJS 5 this is no longer happens.

var box = document.getElementById('box');

var mouseDown = Rx.Observable.fromEvent(document, 'mousedown');
var mouseUp = Rx.Observable.fromEvent(document, 'mouseup');
var mouseMove = Rx.Observable.fromEvent(document, 'mousemove');

var mouseDrag = mouseDown.flatMap(function(md){
  var startX = md.offsetX;
  var startY = md.offsetY;

  return mouseMove
    .merge(mouseUp.map(x => 'up')) // <----
    .map(function(mm){
      if (mm === 'up') {
        console.log('up'); // to show it's no longer hit
        return {
          left: 0,
          top: 0
        };
      }
      return {
        left: mm.clientX - startX,
        top: mm.clientY - startY
      };
    }).takeUntil(mouseUp); // <----
});


mouseDrag.subscribe(function(e){
  box.style.top = e.top + 'px';
  box.style.left = e.left + 'px';
});

RxJS 4 jsbin: https://jsbin.com/cigirovohe/1/edit?html,js,console,output
RxJS 5 jsbin: https://jsbin.com/metosonayi/1/edit?html,js,console,output

@trxcllnt
Copy link
Member

@bgerm this is technically working as designed, though I can understand your confusion. In RxJS 4 takeUntil subscribes to the source Observable before subscribing to the notifier Observable, but takeUntil in RxJS 5 subscribes to the notifier before subscribing to the source.

In your example, takeUntil hears about the mouseup event before merge, so it disposes the inner Observable before merge can next a value through.

Whether this behavior is desired can be debated. @Blesh @mattpodwysocki @staltz do you guys feel strongly either way?

@mattpodwysocki
Copy link
Collaborator

@trxcllnt then I would certainly say that's a bug here and not expected behavior for what was perfectly working code before.

@staltz
Copy link
Member

staltz commented Feb 20, 2016

I believe it makes more sense to subscribe to the source before subscribing to the notifier, but I don't think the example code above is a correct use of RxJS because mouseUp.takeUntil(mouseUp) is a confusing way of achieving a solution.

@benlesh
Copy link
Member

benlesh commented Feb 21, 2017

This is something we're looking at changing for the next major version.

@bgerm
Copy link
Author

bgerm commented Feb 21, 2017

@Blesh, thanks for the heads up. @staltz was right that the solution I had above was confusing. Furthermore, I've written a lot of RxJS code since this ticket was open and haven't had a need for such a use case.

Finally, thanks for redux-observable.

@jbmusso
Copy link

jbmusso commented Jun 4, 2017

@benlesh I'm migrating code from RxJS 4 to RxJS 5 and I just ran into this issue with merge() and takeUntil(). takeUntil() v5 behavior is a bit confusing compared to v4. Use case: I'm building a database client which receives various protocol messages, including partial results and termination messages:

  messageObservable(script, bindings, rawMessage) {
    const command = {
      message: this.buildMessage(script, bindings, rawMessage),
    }

    // This actually sends the command to Gremlin Server
    this.commands$.next(command);

    // Create a new Observable of incoming messages, but filter only
    // incoming messages to the command we just sent.
    const commandMessages$ = this.incomingMessages$
      .filter(({ requestId }) => requestId === command.message.requestId);

    // Off of these messages, create new Observables for each message code
    // TODO: this could be a custom operator.
    const successMessage$ = commandMessages$
      .filter(hasCode(200));
    const continuationMessages$ = commandMessages$
      .filter(hasCode(206));
    const noContentMessage$ = commandMessages$
      .filter(hasCode(204));

    // That Observable will ultimately emit a single object which indicates
    // that we should not expect any other messages;
    const terminationMessages$ = Rx.Observable.merge(
      successMessage$, noContentMessage$
    );
      
    const errorMessages$ = commandMessages$
      .filter(isErrorMessage)
      .flatMap(({ status: { code, message } }) =>
        Rx.Observable.throw(new Error(message + ' (Error '+ code +')'))
      );

    const results$ = Rx.Observable.merge(
        successMessage$,
        continuationMessages$,
        noContentMessage$,
        errorMessages$
      )
      .takeUntil(terminationMessages$);

    return results$;
  }

In the above code, this.commands$ is an Rx.Subject() to which I next() outgoing commands (= database queries).

This was working ok with RxJS 4. Would you suggest a different approach to make that code work with the current behavior (using RxJS v5.4.0 at the moment) of takeUntil(), and if so which one? Thanks for your feedback, happy to discuss this.

@benlesh
Copy link
Member

benlesh commented May 21, 2018

Closing as stale

@benlesh benlesh closed this as completed May 21, 2018
@lock lock bot locked as resolved and limited conversation to collaborators Jun 20, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants