Skip to content

Commit

Permalink
Merge pull request #221 from wormhole-foundation/missed-vaa-failed-to…
Browse files Browse the repository at this point in the history
…-recover-issue

Missed failed to recover processing improvements
  • Loading branch information
iturricf authored Oct 26, 2023
2 parents e3c5100 + d542922 commit 63069da
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 18 deletions.
4 changes: 1 addition & 3 deletions examples/simple/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,5 @@
"allowSyntheticDefaultImports": true
},
"include": ["src"],
"exclude": [
"node_modules",
]
"exclude": ["node_modules"]
}
23 changes: 15 additions & 8 deletions relayer/middleware/missedVaasV3/check.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,22 @@ export async function checkForMissedVaas(
let missingSequences: bigint[] = [];

if (seenSequences.length) {
const first = seenSequences[0];
const first =
previousSafeSequence !== null && previousSafeSequence < seenSequences[0]
? previousSafeSequence
: seenSequences[0];
const last = seenSequences[seenSequences.length - 1];
logger?.info(
`Scanning sequences from ${first} to ${last} for missing sequences`,
);
// Check if there is any leap between the sequences seen,
// and try reprocessing them if any:
if (
previousSafeSequence !== null &&
previousSafeSequence < seenSequences[0]
) {
seenSequences.unshift(previousSafeSequence);
}
missingSequences = scanForSequenceLeaps(seenSequences);

await mapConcurrent(
Expand Down Expand Up @@ -171,15 +180,13 @@ export async function checkForMissedVaas(
failedToRecover,
);

const allSeenVaas = processed.concat(failedToRecover);

if (allSeenVaas.length)
if (processed.length)
await batchMarkAsSeen(
redis,
prefix,
emitterChain,
emitterAddress,
allSeenVaas,
processed,
);

return {
Expand Down Expand Up @@ -311,11 +318,11 @@ async function lookAhead(
}`,
);

let lastVisitedSequence: bigint = lastSeenSequence;
let lastVisitedSequence = BigInt(lastSeenSequence);

for (const vaa of vaas) {
lookAheadSequences.push(vaa.sequence.toString());
const sequenceGap = BigInt(vaa.sequence) - BigInt(lastVisitedSequence);
const sequenceGap = BigInt(vaa.sequence) - lastVisitedSequence;
if (sequenceGap > 0) {
const missing = Array.from({ length: Number(sequenceGap - 1n) }, (_, i) =>
(lastVisitedSequence + BigInt(i + 1)).toString(),
Expand All @@ -335,7 +342,7 @@ async function lookAhead(
);
}

lastVisitedSequence = vaa.sequence;
lastVisitedSequence = BigInt(vaa.sequence);
}

return { lookAheadSequences, processed, failedToRecover };
Expand Down
31 changes: 26 additions & 5 deletions relayer/middleware/missedVaasV3/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,18 @@ export function batchMarkAsSeen(
emitterAddress: string,
sequences: string[],
) {
return batchAddToSet(
redis,
getSeenVaaKey(prefix, emitterChain, emitterAddress),
sequences,
);
return Promise.all([
batchAddToSet(
redis,
getSeenVaaKey(prefix, emitterChain, emitterAddress),
sequences,
),
batchRemoveFromSet(
redis,
getFailedToFetchKey(prefix, emitterChain, emitterAddress),
sequences,
),
]);
}

export function batchMarkAsFailedToRecover(
Expand Down Expand Up @@ -331,3 +338,17 @@ async function batchAddToSet(

await pipeline.exec();
}

async function batchRemoveFromSet(
redis: Cluster | Redis,
key: string,
sequences: string[],
) {
const pipeline = redis.pipeline();

for (const sequence of sequences) {
pipeline.zrem(key, sequence);
}

return pipeline.exec();
}
4 changes: 2 additions & 2 deletions test/middleware/missedVaasV3/check.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ describe("MissedVaaV3.check", () => {

const markedSeen = batchMarkAsSeenMock.mock.calls[0][4];

// VAAs marked as failed to recover are marked as seen
expect(markedSeen.length).toEqual(2);
// VAAs marked as failed to recover are NOT marked as seen
expect(markedSeen.length).toEqual(1);
});

test("If a sequence fails to be reprocessed it won't be marked as seen and won't interrupt the processing of other seqs", async () => {
Expand Down

0 comments on commit 63069da

Please sign in to comment.