Skip to content

Commit

Permalink
ws multiplexing is disabled by default as we investigate issues with …
Browse files Browse the repository at this point in the history
…intermittent web socket closures (#11657)

CHANGELOG_BEGIN
[TS-Bindings] Ws multiplexing for stream queries is disabled by default as we investigate issues
of intermittent websocket closures.
CHANGELOG_END
  • Loading branch information
akshayshirahatti-da authored Nov 11, 2021
1 parent 0f0a865 commit d938a44
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ test('create + fetch & exercise', async () => {
const personRawStream = aliceLedger.streamQuery(buildAndLint.Main.Person);
const personStream = promisifyStream(personRawStream);
const personStreamLive = pEvent(personRawStream, 'live');
expect(await personStream.next()).toEqual([[alice6Contract], [{created: alice6Contract, matchedQueries:[1]}]]);
expect(await personStream.next()).toEqual([[alice6Contract], [{created: alice6Contract, matchedQueries:[0]}]]);

// end of non-live data, first offset
expect(await personStreamLive).toEqual([alice6Contract]);
Expand All @@ -226,7 +226,7 @@ test('create + fetch & exercise', async () => {
const bob4Contract = await bobLedger.create(buildAndLint.Main.Person, bob4);
expect(bob4Contract.payload).toEqual(bob4);
expect(bob4Contract.key).toEqual(bob4Key);
expect(await personStream.next()).toEqual([[alice6Contract, bob4Contract], [{created: bob4Contract, matchedQueries:[1]}]]);
expect(await personStream.next()).toEqual([[alice6Contract, bob4Contract], [{created: bob4Contract, matchedQueries:[0]}]]);


// Alice changes her name.
Expand All @@ -243,7 +243,7 @@ test('create + fetch & exercise', async () => {
expect(cooper6Contract.key).toEqual(alice6Key);
expect(await aliceStream.next()).toEqual([[cooper6Contract], [{archived: alice6Archived}, {created: cooper6Contract, matchedQueries:[0]}]]);
expect(await alice6KeyStream.next()).toEqual([cooper6Contract, [{archived: alice6Archived}, {created: cooper6Contract}]]);
expect(await personStream.next()).toEqual([[bob4Contract, cooper6Contract], [{archived: alice6Archived}, {created: cooper6Contract, matchedQueries:[1]}]]);
expect(await personStream.next()).toEqual([[bob4Contract, cooper6Contract], [{archived: alice6Archived}, {created: cooper6Contract, matchedQueries:[0]}]]);

personContracts = await aliceLedger.query(buildAndLint.Main.Person);
expect(personContracts).toEqual([bob4Contract, cooper6Contract]);
Expand Down
30 changes: 28 additions & 2 deletions language-support/ts/daml-ledger/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ describe("streamSubmit", () => {
expect(mockChange).not.toHaveBeenCalled();
});

test("reconnect on server close with appropriate offsets", async () => {
test("reconnect on server close with appropriate offsets, when ws multiplexing is enabled", async () => {
const reconnectThreshold = 200;
const ledger = new Ledger({...mockOptions, reconnectThreshold: reconnectThreshold});
const ledger = new Ledger({...mockOptions, reconnectThreshold: reconnectThreshold, multiplexQueryStreams: true});
const stream = ledger.streamQuery(Foo, {"key": "1"});
stream.on("live", mockLive);
stream.on("close", mockClose);
Expand All @@ -223,6 +223,32 @@ describe("streamSubmit", () => {
expect(mockConstructor).not.toHaveBeenCalled();
});


test("reconnect on server close with appropriate offsets, when ws multiplexing is disabled", async () => {
const reconnectThreshold = 200;
const ledger = new Ledger({...mockOptions, reconnectThreshold: reconnectThreshold, multiplexQueryStreams: false});
const stream = ledger.streamQuery(Foo);
stream.on("live", mockLive);
stream.on("close", mockClose);
mockInstance.serverOpen();
mockInstance.serverSend({events: [], offset: "3"});
await new Promise(resolve => setTimeout(resolve, reconnectThreshold * 2));
mockConstructor.mockClear();
mockInstance.serverClose({code: 1, reason: 'test close'});
expect(mockConstructor).toHaveBeenCalled();
mockInstance.serverOpen();
expect(mockSend).toHaveBeenNthCalledWith(1, [{"templateIds": ["foo-id"]}]); //initial query
expect(mockSend).toHaveBeenNthCalledWith(2, {offset: "3"}); // offsets sent when reconnecting.
expect(mockSend).toHaveBeenNthCalledWith(3, [{"templateIds": ["foo-id"]}]); //reconnect query request.
mockSend.mockClear();
mockConstructor.mockClear();

// check that the client doesn't try to reconnect again. it should only reconnect if it
// received an event confirming the stream is live again, i.e. {events: [], offset: '4'}
mockInstance.serverClose({code: 1, reason: 'test close'});
expect(mockConstructor).not.toHaveBeenCalled();
});

test("do not reconnect on client close", () => {
const ledger = new Ledger(mockOptions);
const stream = ledger.streamQuery(Foo);
Expand Down
10 changes: 5 additions & 5 deletions language-support/ts/daml-ledger/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ class Ledger {
/**
* Construct a new `Ledger` object. See [[LedgerOptions]] for the constructor arguments.
*/
constructor({token, httpBaseUrl, wsBaseUrl, reconnectThreshold = 30000, multiplexQueryStreams = true}: LedgerOptions) {
constructor({token, httpBaseUrl, wsBaseUrl, reconnectThreshold = 30000, multiplexQueryStreams = false}: LedgerOptions) {
if (!httpBaseUrl) {
httpBaseUrl = `${window.location.protocol}//${window.location.host}/`;
}
Expand Down Expand Up @@ -1054,11 +1054,11 @@ class Ledger {
}
}
} else if (isRecordWith('warnings', json)) {
console.warn(`Ledger.${callerName} warnings`, json);
console.warn(`${callerName} warnings`, json);
} else if (isRecordWith('errors', json)) {
console.error(`Ledger.${callerName} errors`, json);
console.error(`${callerName} errors`, json);
} else {
console.error(`Ledger.${callerName} unknown message`, json);
console.error(`${callerName} unknown message`, json);
}
};
const closeStream = (status: { code: number; reason: string }): void => {
Expand Down Expand Up @@ -1242,7 +1242,7 @@ class Ledger {
lastContractId = contract ? contract.contractId : null
return contract;
}
return this.streamSubmit("streamFetchByKey", template, 'v1/stream/fetch', request, reconnectRequest, null, change);
return this.streamSubmit("Ledger.streamFetchByKey", template, 'v1/stream/fetch', request, reconnectRequest, null, change);
}

/**
Expand Down

0 comments on commit d938a44

Please sign in to comment.