diff --git a/src/client.ts b/src/client.ts index 368cac7e..14d5b29c 100644 --- a/src/client.ts +++ b/src/client.ts @@ -557,11 +557,12 @@ export function createClient(options: ClientOptions): Client { subscribe(payload, sink) { const id = generateID(); - let completed = false, + let done = false, + errored = false, releaser = () => { // for handling completions before connect locks--; - completed = true; + done = true; }; (async () => { @@ -574,8 +575,8 @@ export function createClient(options: ClientOptions): Client { waitForReleaseOrThrowOnClose, ] = await connect(); - // if completed while waiting for connect, release the connection lock right away - if (completed) return release(); + // if done while waiting for connect, release the connection lock right away + if (done) return release(); const unlisten = emitter.onMessage(id, (message) => { switch (message.type) { @@ -585,15 +586,13 @@ export function createClient(options: ClientOptions): Client { return; } case MessageType.Error: { - completed = true; + (errored = true), (done = true); sink.error(message.payload); releaser(); - // TODO-db-201025 calling releaser will complete the sink, meaning that both the `error` and `complete` will be - // called. neither promises or observables care; once they settle, additional calls to the resolvers will be ignored return; } case MessageType.Complete: { - completed = true; + done = true; releaser(); // release completes the sink return; } @@ -609,7 +608,7 @@ export function createClient(options: ClientOptions): Client { ); releaser = () => { - if (!completed && socket.readyState === WebSocketImpl.OPEN) + if (!done && socket.readyState === WebSocketImpl.OPEN) // if not completed already and socket is open, send complete message to server on release socket.send( stringifyMessage({ @@ -618,7 +617,7 @@ export function createClient(options: ClientOptions): Client { }), ); locks--; - completed = true; + done = true; release(); }; @@ -634,11 +633,14 @@ export function createClient(options: ClientOptions): Client { } })() .catch(sink.error) // rejects on close events and errors - .then(sink.complete); // resolves on release or normal closure + .then(() => { + // delivering either an error or a complete terminates the sequence + if (!errored) sink.complete(); + }); // resolves on release or normal closure return () => { - // allow disposing only if not already completed - if (!completed) releaser(); + // dispose only of active subscriptions + if (!done) releaser(); }; }, async dispose() { diff --git a/src/tests/client.ts b/src/tests/client.ts index 3063988b..dd0b5837 100644 --- a/src/tests/client.ts +++ b/src/tests/client.ts @@ -329,6 +329,29 @@ it('should not send the complete message if the socket is not open', async () => await sub.waitForComplete(); }); +it('should not call complete after subscription error', async () => { + const { url } = await startTServer(); + + const client = createClient({ + url, + lazy: true, + retryAttempts: 0, + }); + + // invalid subscription + const sub = tsubscribe(client, { + query: '{ iDontExist }', + }); + + // report error + await sub.waitForError(); + + // but not complete + await sub.waitForComplete(() => { + fail("shouldn't have completed"); + }, 20); +}); + describe('query operation', () => { it('should execute the query, "next" the result and then complete', async () => { const { url } = await startTServer();