Skip to content

Commit

Permalink
fix(solo): make solo-to-chain more robust
Browse files Browse the repository at this point in the history
The solo's messagePool is now a proper store-and-forward
implementation to prevent failures in delivery from impacting
the message stream.

Also, wake the sender if there is a new WebSocket to an RPC node.
This prevents the stream from being wedged if the RPC connection
changes.
  • Loading branch information
michaelfig committed Aug 22, 2021
1 parent b43df5a commit b266666
Showing 1 changed file with 23 additions and 18 deletions.
41 changes: 23 additions & 18 deletions packages/solo/src/chain-cosmos-sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ export async function connectToChain(
fullArgs,
{ maxBuffer: MAX_BUFFER_SIZE },
(_error, stdout, stderr) => {
return resolve({ stdout, stderr });
resolve({ stdout, stderr });
},
);
if (stdin) {
Expand Down Expand Up @@ -332,6 +332,10 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress(
};
// Send that message, and wait for the subscription.
ws.send(JSON.stringify(obj));

// Ensure our sender wakes up again.
// eslint-disable-next-line no-use-before-define
sendUpdater.updateState(true);
});
ws.addEventListener('message', ev => {
// We received a message.
Expand Down Expand Up @@ -359,7 +363,11 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress(

const { notifier: sendNotifier, updater: sendUpdater } = makeNotifierKit();

// An array of [seqnum, message] ordered by unique seqnum.
/**
* @typedef {bigint} SeqNum
* @type {Array<[SeqNum, any]>}
* Ordered by seqnum
*/
let messagePool = [];

// Atomically add to the message pool, ensuring the pool is sorted by unique
Expand All @@ -384,15 +392,18 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress(
messagePool = uniquePool;
};

const removeAckedFromMessagePool = ack => {
// Remove all messages sent at earlier acks.
messagePool = messagePool.filter(m => m[0] > ack);
};

let totalDeliveries = 0;
let highestAck = -1;
let sequenceNumber = 0n;
const sendFromMessagePool = async () => {
let tmpInfo;

// We atomically drain the message pool.
const messages = messagePool;
messagePool = [];

try {
totalDeliveries += 1;
Expand Down Expand Up @@ -478,15 +489,12 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress(
X`Unexpected output: ${stdout.trimRight()}`,
);

// We submitted the transaction successfully.
// We submitted the transaction to the mempool successfully.
// Preemptively increment our sequence number to avoid needing to
// retry next time.
sequenceNumber += 1n;
}
}
} catch (e) {
// Put back the deliveries we tried to make.
messagePool = messages.concat(messagePool);
messagePool.sort((a, b) => a[0] - b[0]);
throw e;
} finally {
if (tmpInfo) {
await fs.promises.unlink(tmpInfo.path);
Expand All @@ -503,11 +511,9 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress(
* @param {number=} lastBlockUpdate
*/
const recurseEachNewBlock = async (lastBlockUpdate = undefined) => {
const { updateCount, value } = await blockNotifier.getUpdateSince(
lastBlockUpdate,
);
assert(value, X`${GCI} unexpectedly finished!`);
const { updateCount } = await blockNotifier.getUpdateSince(lastBlockUpdate);
console.debug(`new block on ${GCI}, fetching mailbox`);
assert(updateCount, X`${GCI} unexpectedly finished!`);
await getMailbox()
.then(ret => {
if (!ret) {
Expand All @@ -516,6 +522,7 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress(
const { outbox, ack } = ret;
// console.debug('have outbox', outbox, ack);
inbound(GCI, outbox, ack);
removeAckedFromMessagePool(ack);
})
.catch(e => console.error(`Failed to fetch ${GCI} mailbox:`, e));
recurseEachNewBlock(updateCount);
Expand Down Expand Up @@ -543,10 +550,8 @@ ${chainID} chain does not yet know of address ${clientAddr}${adviseEgress(
// This function ensures we only have one outgoing send operation at a time.
const recurseEachSend = async (lastSendUpdate = undefined) => {
// See when there is another requested send since our last time.
const { updateCount, value } = await sendNotifier.getUpdateSince(
lastSendUpdate,
);
assert(value, X`Sending unexpectedly finished!`);
const { updateCount } = await sendNotifier.getUpdateSince(lastSendUpdate);
assert(updateCount, X`Sending unexpectedly finished!`);

await sendFromMessagePool().catch(retrySend);
recurseEachSend(updateCount);
Expand Down

0 comments on commit b266666

Please sign in to comment.