Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into change-sourcetx-client
Browse files Browse the repository at this point in the history
  • Loading branch information
mat1asm committed Oct 23, 2023
2 parents 3696a9e + af0e27d commit 4161781
Show file tree
Hide file tree
Showing 17 changed files with 355 additions and 168 deletions.
5 changes: 2 additions & 3 deletions examples/advanced/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@wormhole-foundation/relayer-engine",
"version": "0.3.1",
"version": "0.3.2",
"description": "Relayer Engine",
"type": "module",
"main": "lib/cjs/index.js",
Expand All @@ -10,7 +10,7 @@
"test-redis": "docker run --rm -p 6301:6379 --name relayer-engine-test -d redis; npm run test; docker kill relayer-engine-test",
"test": "jest --silent=false",
"test-watch": "jest --silent=false --watch",
"build": "tsc -b ./tsconfig.cjs.json ./tsconfig.esm.json && bin/create-package.json.sh",
"build": "tsc -b ./tsconfig.cjs.json && tsc -b ./tsconfig.esm.json && bin/create-package.json.sh",
"watch": "tsc --watch",
"typecheck": "tsc --noEmit --skipLibCheck",
"prettier": "prettier --write $(git diff main --name-only --diff-filter u | grep '.ts$' | xargs)",
Expand Down
5 changes: 4 additions & 1 deletion relayer/application-standard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { createBullBoard } from "@bull-board/api";
import { Environment } from "./environment.js";
import { TokensByChain } from "./middleware/wallet/wallet-management.js";
import { Registry } from "prom-client";
import Koa from "koa";

export interface StandardMissedVaaOpts {
concurrency?: number;
Expand Down Expand Up @@ -192,7 +193,9 @@ export class StandardRelayerApp<
* A UI that you can mount in a KOA app to show the status of the queue / jobs.
* @param path
*/
storageKoaUI(path: string) {
storageKoaUI(
path: string,
): Koa.Middleware<Koa.DefaultState, Koa.DefaultContext, any> {
// UI
const serverAdapter = new KoaAdapter();
serverAdapter.setBasePath(path);
Expand Down
6 changes: 6 additions & 0 deletions relayer/application.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ export const defaultWormholeRpcs = {
[Environment.DEVNET]: [""],
};

export const defaultWormscanUrl = {
[Environment.MAINNET]: "https://api.wormholescan.io",
[Environment.TESTNET]: "https://api.testnet.wormholescan.io",
[Environment.DEVNET]: "https://api.testnet.wormholescan.io",
};

const defaultOpts = (env: Environment): RelayerAppOpts => ({
wormholeRpcs: defaultWormholeRpcs[env],
concurrency: 1,
Expand Down
104 changes: 48 additions & 56 deletions relayer/middleware/missedVaasV3/check.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Cluster, Redis } from "ioredis";
import { ChainId } from "@certusone/wormhole-sdk";
import { Logger } from "winston";
import { Pool } from "generic-pool";
import { GetSignedVAAResponse } from "@certusone/wormhole-spydk/lib/cjs/proto/publicrpc/v1/publicrpc.js";
import {
ParsedVaaWithBytes,
RelayerApp,
Expand All @@ -21,6 +20,7 @@ import {
updateSeenSequences,
} from "./storage.js";
import { FilterIdentifier, MissedVaaOpts } from "./worker.js";
import { Wormholescan } from "../../rpc/wormholescan-client.js";

export type ProcessVaaFn = (x: Buffer) => Promise<void>;

Expand All @@ -30,6 +30,7 @@ export async function checkForMissedVaas(
processVaa: ProcessVaaFn,
opts: MissedVaaOpts,
prefix: string,
wormholescan: Wormholescan,
previousSafeSequence?: bigint | null,
logger?: Logger,
): Promise<MissedVaaRunStats> {
Expand Down Expand Up @@ -134,7 +135,7 @@ export async function checkForMissedVaas(
// look ahead of greatest seen sequence in case the next vaa was missed
// continue looking ahead until a vaa can't be fetched
const lastSeq = seenSequences[seenSequences.length - 1]
? seenSequences[seenSequences.length - 1] + 1n
? seenSequences[seenSequences.length - 1]
: null;

let lookAheadSequence =
Expand All @@ -151,7 +152,7 @@ export async function checkForMissedVaas(
} = await lookAhead(
lookAheadSequence,
filter,
opts.wormholeRpcs,
wormholescan,
opts.fetchVaaRetries,
opts.maxLookAhead,
processVaa,
Expand Down Expand Up @@ -258,18 +259,18 @@ export async function registerEventListeners(
}

async function lookAhead(
lookAheadSequence: bigint,
lastSeenSequence: bigint,
filter: FilterIdentifier,
wormholeRpcs: string[],
wormholescan: Wormholescan,
maxRetries: number,
maxLookAhead: number = 10,
processVaa: ProcessVaaFn,
logger?: Logger,
) {
const lookAheadSequences: string[] = [];
const processed: string[] = [];
const failedToRecover: string[] = [];
if (!lookAheadSequence) {
let failedToRecover: string[] = [];
if (!lastSeenSequence) {
logger?.warn(
`No VAAs seen and no starting sequence was configured. Won't look ahead for missed VAAs.`,
);
Expand All @@ -278,72 +279,63 @@ async function lookAhead(
}

logger?.info(
`Looking ahead for missed VAAs from sequence: ${lookAheadSequence}`,
`Looking ahead for missed VAAs from sequence: ${lastSeenSequence}`,
);

let vaasNotFound: string[] = [];
let latestVaas = await wormholescan.listVaas(
filter.emitterChain,
filter.emitterAddress,
{ pageSize: maxLookAhead, retries: maxRetries },
);
if (latestVaas.error) {
logger?.error(
`Error FETCHING Look Ahead VAAs. Error: ${latestVaas.error.message}`,
latestVaas.error,
);
throw latestVaas.error;
}

for (let seq = lookAheadSequence; true; seq++) {
const vaaKey = {
...filter,
sequence: seq.toString(),
} as SerializableVaaId;
// latestVaas.data is sorted DESC based on timestamp, so we sort ASC by sequence
const vaas = latestVaas.data
.filter(vaa => vaa.sequence > lastSeenSequence)
.sort((a, b) => Number(a.sequence - b.sequence));

let vaa: GetSignedVAAResponse | null = null;
try {
vaa = await tryFetchVaa(vaaKey, wormholeRpcs, maxRetries);
// reset failure counter if we successfully fetched a vaa
if (vaa) {
if (vaasNotFound.length > 0) {
logger?.warn(
`Look Ahead existing VAAs not found in the guardian: [${vaasNotFound.join(
", ",
)}]`,
);
failedToRecover.push(...vaasNotFound);
}
vaasNotFound = [];
}
} catch (error) {
let message = "unknown";
if (error instanceof Error) {
message = error.message;
}
logger?.error(
`Error FETCHING Look Ahead VAA. Sequence ${seq}. Error: ${message} `,
error,
);
throw error;
}
if (vaas.length === 0) {
logger?.debug(`No Look Ahead VAAs found.`);
return { lookAheadSequences, processed, failedToRecover };
}

if (!vaa && vaasNotFound.length < maxLookAhead) {
logger?.debug(`Look Ahead VAA not found. Sequence: ${seq.toString()}`);
vaasNotFound.push(seq.toString());
continue;
}
logger?.debug(
`Found ${vaas.length} Look Ahead VAAs. From ${vaas[0].sequence} to ${
vaas[vaas.length - 1].sequence
}`,
);

let lastVisitedSequence: bigint = lastSeenSequence;

if (!vaa && vaasNotFound.length >= maxLookAhead) {
logger?.debug(
`Look Ahead VAA reached max look ahead. Sequence: ${seq.toString()}`,
for (const vaa of vaas) {
lookAheadSequences.push(vaa.sequence.toString());
const sequenceGap = BigInt(vaa.sequence) - BigInt(lastVisitedSequence);
if (sequenceGap > 0) {
const missing = Array.from({ length: Number(sequenceGap - 1n) }, (_, i) =>
(lastVisitedSequence + BigInt(i + 1)).toString(),
);
break;
failedToRecover = failedToRecover.concat(missing);
}

lookAheadSequences.push(seq.toString());

logger?.debug(`Found Look Ahead VAA. Sequence: ${seq.toString()}`);

try {
// since we add this VAA to the queue, there's no need to mark it as seen
// (it will be automatically marked as seen when the "added" event is fired)
await processVaa(Buffer.from(vaa.vaaBytes));
processed.push(seq.toString());
await processVaa(vaa.vaa);
processed.push(vaa.sequence.toString());
} catch (error) {
logger?.error(
`Error PROCESSING Look Ahead VAA. Sequence: ${seq.toString()}. Error:`,
`Error PROCESSING Look Ahead VAA. Sequence: ${vaa.sequence.toString()}. Error:`,
error,
);
}

lastVisitedSequence = vaa.sequence;
}

return { lookAheadSequences, processed, failedToRecover };
Expand Down
19 changes: 18 additions & 1 deletion relayer/middleware/missedVaasV3/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ import { ChainId } from "@certusone/wormhole-sdk";
import { createPool, Pool } from "generic-pool";
import { Logger } from "winston";

import { defaultWormholeRpcs, RelayerApp } from "../../application.js";
import {
defaultWormholeRpcs,
defaultWormscanUrl,
RelayerApp,
} from "../../application.js";
import { mapConcurrent, sleep } from "../../utils.js";
import { RedisConnectionOpts } from "../../storage/redis-storage.js";
import { initMetrics, MissedVaaMetrics } from "./metrics.js";
Expand All @@ -25,13 +29,18 @@ import {
tryGetLastSafeSequence,
trySetLastSafeSequence,
} from "./storage.js";
import {
Wormholescan,
WormholescanClient,
} from "../../rpc/wormholescan-client.js";

const DEFAULT_PREFIX = "MissedVaaWorkerV3";

export interface MissedVaaOpts extends RedisConnectionOpts {
registry?: Registry;
logger?: Logger;
wormholeRpcs?: string[];
wormscanUrl?: string;
// How many "source" chains will be scanned for missed VAAs concurrently.
concurrency?: number;
// Interval at which the worker will check for missed VAAs.
Expand Down Expand Up @@ -74,6 +83,7 @@ export async function spawnMissedVaaWorker(
opts: MissedVaaOpts,
): Promise<void> {
opts.wormholeRpcs = opts.wormholeRpcs ?? defaultWormholeRpcs[app.env];
opts.wormscanUrl = opts.wormscanUrl ?? defaultWormscanUrl[app.env];
if (!metrics) {
metrics = opts.registry ? initMetrics(opts.registry) : {};
}
Expand All @@ -83,6 +93,10 @@ export async function spawnMissedVaaWorker(
}

const redisPool = createRedisPool(opts);
const wormholescan = new WormholescanClient(new URL(opts.wormscanUrl), {
maxDelay: 60_000,
noCache: true,
});

if (!app.filters.length) {
opts.logger?.warn(
Expand Down Expand Up @@ -127,6 +141,7 @@ export async function spawnMissedVaaWorker(
app.processVaa.bind(app),
opts,
prefix,
wormholescan,
filterLogger,
);
updateMetrics(
Expand Down Expand Up @@ -160,6 +175,7 @@ export async function runMissedVaaCheck(
processVaa: ProcessVaaFn,
opts: MissedVaaOpts,
storagePrefix: string,
wormholescan: Wormholescan,
logger?: Logger,
) {
const { emitterChain, emitterAddress } = filter;
Expand All @@ -178,6 +194,7 @@ export async function runMissedVaaCheck(
processVaa,
opts,
storagePrefix,
wormholescan,
previousSafeSequence,
logger,
);
Expand Down
17 changes: 10 additions & 7 deletions relayer/rpc/fail-fast-grpc-transport.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { grpc } from "@improbable-eng/grpc-web";
import * as pkg from "@improbable-eng/grpc-web";
import * as http from "http";
import * as https from "https";

const grpc = pkg.grpc ?? pkg;

/**
* Transport factory for grpc-web that applies a timeout.
* Also allows for some more customization of the underlying http request.
Expand All @@ -14,22 +16,22 @@ import * as https from "https";
export function FailFastGrpcTransportFactory(
timeoutMs: number = 10_000,
httpOptions?: http.RequestOptions,
): grpc.TransportFactory {
return function (opts: grpc.TransportOptions) {
): pkg.grpc.TransportFactory {
return function (opts: pkg.grpc.TransportOptions) {
return new TimeoutableTransport(opts, timeoutMs, httpOptions);
};
}

export class TimeoutError extends Error {}

class TimeoutableTransport implements grpc.Transport {
class TimeoutableTransport implements pkg.grpc.Transport {
private readonly timeoutMs: number;
private readonly options: grpc.TransportOptions;
private readonly options: pkg.grpc.TransportOptions;
private readonly httpOptions?: http.RequestOptions;
private request?: http.ClientRequest;

constructor(
opts: grpc.TransportOptions,
opts: pkg.grpc.TransportOptions,
timeoutMs: number,
httpOptions?: http.RequestOptions,
) {
Expand All @@ -38,7 +40,7 @@ class TimeoutableTransport implements grpc.Transport {
this.httpOptions = httpOptions;
}

start(metadata: grpc.Metadata): void {
start(metadata: pkg.grpc.Metadata): void {
const headers: Record<string, string> = {};
metadata.forEach(function (key: string, values: string[]) {
headers[key] = values.join(", ");
Expand Down Expand Up @@ -71,6 +73,7 @@ class TimeoutableTransport implements grpc.Transport {

responseCallback(response: http.IncomingMessage): void {
const headers = this.filterHeadersForUndefined(response.headers);
// @ts-ignore (typing problem with @improbable-eng/grpc-web that they won't fix due to deprecation)
this.options.onHeaders(new grpc.Metadata(headers), response.statusCode);
response.on("data", chunk => {
this.options.onChunk(this.toArrayBuffer(chunk));
Expand Down
Loading

0 comments on commit 4161781

Please sign in to comment.