Skip to content

Commit

Permalink
fix: post the request in request-receive communication when subscribi…
Browse files Browse the repository at this point in the history
…ng to the observable

The request was immediately posted when invoking 'requestReceive$' method, which is wrong. Instead, it is now posted upon subscription.
It is unlike the 'requestReply' method, which starts resolving the promise immediately.

fixes: #160
  • Loading branch information
danielwiehl authored and ReToCode committed Jul 26, 2019
1 parent ab57d4b commit f8a7f8c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { map } from 'rxjs/operators';
* Allows issuing an intent to interact with the platform.
*/
export class IntentService implements Service {

/**
* Issues an intent to the application platform and receives a series of replies.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
* SPDX-License-Identifier: EPL-2.0
*/

import { fromEvent, Observable, Subject } from 'rxjs';
import { fromEvent, merge, Observable, Observer, Subject, TeardownLogic } from 'rxjs';
import { filter, first, takeUntil } from 'rxjs/operators';
import { UUID } from './uuid.util';
import { Service } from './metadata';
Expand Down Expand Up @@ -102,14 +102,22 @@ export class DefaultMessageBus implements MessageBus {
envelope.replyToUid = replyToUid;
envelope.protocol = PROTOCOL;

const reply$ = this._stream$
.pipe(
filter(env => env.channel === 'reply'),
filter(env => env.replyToUid === replyToUid),
takeUntil(this._destroy$),
);
this.postMessage(envelope);
return reply$;
return new Observable((observer: Observer<MessageEnvelope>): TeardownLogic => {
const destroy$ = new Subject<void>();
this._stream$
.pipe(
filter(env => env.channel === 'reply'),
filter(env => env.replyToUid === replyToUid),
takeUntil(merge(destroy$, this._destroy$)),
)
.subscribe(observer);

this.postMessage(envelope);

return (): void => {
destroy$.next();
};
});
}

public requestReply(envelope: MessageEnvelope): Promise<MessageEnvelope> {
Expand Down

0 comments on commit f8a7f8c

Please sign in to comment.