From b2666665b1881d5f98fa853ba05627d945783c7c Mon Sep 17 00:00:00 2001 From: Michael FIG Date: Sun, 22 Aug 2021 17:02:39 -0600 Subject: [PATCH] fix(solo): make solo-to-chain more robust The solo's messagePool is now a proper store-and-forward implementation to prevent failures in delivery from impacting the message stream. Also, wake the sender if there is a new WebSocket to an RPC node. This prevents the stream from being wedged if the RPC connection changes. --- packages/solo/src/chain-cosmos-sdk.js | 41 +++++++++++++++------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/packages/solo/src/chain-cosmos-sdk.js b/packages/solo/src/chain-cosmos-sdk.js index 0aedfafb361..f243ddd54a1 100644 --- a/packages/solo/src/chain-cosmos-sdk.js +++ b/packages/solo/src/chain-cosmos-sdk.js @@ -189,7 +189,7 @@ export async function connectToChain( fullArgs, { maxBuffer: MAX_BUFFER_SIZE }, (_error, stdout, stderr) => { - return resolve({ stdout, stderr }); + resolve({ stdout, stderr }); }, ); if (stdin) { @@ -332,6 +332,10 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress( }; // Send that message, and wait for the subscription. ws.send(JSON.stringify(obj)); + + // Ensure our sender wakes up again. + // eslint-disable-next-line no-use-before-define + sendUpdater.updateState(true); }); ws.addEventListener('message', ev => { // We received a message. @@ -359,7 +363,11 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress( const { notifier: sendNotifier, updater: sendUpdater } = makeNotifierKit(); - // An array of [seqnum, message] ordered by unique seqnum. + /** + * @typedef {bigint} SeqNum + * @type {Array<[SeqNum, any]>} + * Ordered by seqnum + */ let messagePool = []; // Atomically add to the message pool, ensuring the pool is sorted by unique @@ -384,15 +392,18 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress( messagePool = uniquePool; }; + const removeAckedFromMessagePool = ack => { + // Remove all messages sent at earlier acks. + messagePool = messagePool.filter(m => m[0] > ack); + }; + let totalDeliveries = 0; let highestAck = -1; let sequenceNumber = 0n; const sendFromMessagePool = async () => { let tmpInfo; - // We atomically drain the message pool. const messages = messagePool; - messagePool = []; try { totalDeliveries += 1; @@ -478,15 +489,12 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress( X`Unexpected output: ${stdout.trimRight()}`, ); - // We submitted the transaction successfully. + // We submitted the transaction to the mempool successfully. + // Preemptively increment our sequence number to avoid needing to + // retry next time. sequenceNumber += 1n; } } - } catch (e) { - // Put back the deliveries we tried to make. - messagePool = messages.concat(messagePool); - messagePool.sort((a, b) => a[0] - b[0]); - throw e; } finally { if (tmpInfo) { await fs.promises.unlink(tmpInfo.path); @@ -503,11 +511,9 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress( * @param {number=} lastBlockUpdate */ const recurseEachNewBlock = async (lastBlockUpdate = undefined) => { - const { updateCount, value } = await blockNotifier.getUpdateSince( - lastBlockUpdate, - ); - assert(value, X`${GCI} unexpectedly finished!`); + const { updateCount } = await blockNotifier.getUpdateSince(lastBlockUpdate); console.debug(`new block on ${GCI}, fetching mailbox`); + assert(updateCount, X`${GCI} unexpectedly finished!`); await getMailbox() .then(ret => { if (!ret) { @@ -516,6 +522,7 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress( const { outbox, ack } = ret; // console.debug('have outbox', outbox, ack); inbound(GCI, outbox, ack); + removeAckedFromMessagePool(ack); }) .catch(e => console.error(`Failed to fetch ${GCI} mailbox:`, e)); recurseEachNewBlock(updateCount); @@ -543,10 +550,8 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress( // This function ensures we only have one outgoing send operation at a time. const recurseEachSend = async (lastSendUpdate = undefined) => { // See when there is another requested send since our last time. - const { updateCount, value } = await sendNotifier.getUpdateSince( - lastSendUpdate, - ); - assert(value, X`Sending unexpectedly finished!`); + const { updateCount } = await sendNotifier.getUpdateSince(lastSendUpdate); + assert(updateCount, X`Sending unexpectedly finished!`); await sendFromMessagePool().catch(retrySend); recurseEachSend(updateCount);