diff --git a/docker-compose.devnet-services.yaml b/docker-compose.devnet-services.yaml index 5b9e9a4bc5..f86170a5e3 100644 --- a/docker-compose.devnet-services.yaml +++ b/docker-compose.devnet-services.yaml @@ -364,7 +364,7 @@ services: container_name: cartographer-api depends_on: - cartographer-database - image: postgrest/postgrest:v9.0.0.20220107 + image: postgrest/postgrest:v10.0.0 ports: - "3000:3000" environment: diff --git a/docker-compose.services.yaml b/docker-compose.services.yaml index 7765b48186..a10752d75d 100644 --- a/docker-compose.services.yaml +++ b/docker-compose.services.yaml @@ -376,7 +376,7 @@ services: container_name: cartographer-api depends_on: - cartographer-database - image: postgrest/postgrest:v9.0.0.20220107 + image: postgrest/postgrest:v10.0.0 ports: - "3000:3000" environment: diff --git a/packages/adapters/cache/src/lib/caches/routers.ts b/packages/adapters/cache/src/lib/caches/routers.ts index da7910808e..d43694c274 100644 --- a/packages/adapters/cache/src/lib/caches/routers.ts +++ b/packages/adapters/cache/src/lib/caches/routers.ts @@ -15,7 +15,6 @@ export class RoutersCache extends Cache { // TODO: Implement configurable expiry times per domain. // Default expiry time (in seconds) after which liquidity data is considered stale. public static readonly DEFAULT_LIQUIDITY_EXPIRY = 30; // 30 seconds. - public static readonly DEFAULT_APPROVAL_EXPIRY = 24 * 60 * 60; // 24 hours. private readonly prefix = "routers"; /** @@ -59,4 +58,83 @@ export class RoutersCache extends Cache { }), ); } + + /** + * Set last active time for a given router. + * @param router - Router address. + */ + public async setLastActive(router: string): Promise { + const key = `${this.prefix}:active`; + await this.data.hset(key, router, getNtpTimeSeconds().toString()); + } + + /** + * Get the recorded last active time for a given router. + * @param router - Router address. + * @returns The timestamp if recorded, 0 if not found. + */ + public async getLastActive(router: string): Promise { + const key = `${this.prefix}:active`; + const res = await this.data.hget(key, router); + const lastActiveTime = res ? +res : 0; + return lastActiveTime; + } + + /** + * Set the last bid time for a given router. + * @param router - Router address. + * @param bid - The bid instance. + */ + public async setLastBidTime( + router: string, + bid: { originDomain: string; destinationDomain: string; asset: string }, + ): Promise { + const key = `${this.prefix}:bid`; + const bidKey = `${bid.originDomain}:${bid.destinationDomain}:${bid.asset}`; + let lastBids: Record = {}; + const currentTimestamp = getNtpTimeSeconds().toString(); + const res = await this.data.hget(key, router); + if (res) { + lastBids = JSON.parse(res); + } + lastBids[bidKey] = currentTimestamp; + await this.data.hset(key, router, JSON.stringify(lastBids)); + } + + /** + * Get the recorded last bid time for a given router. + * @param router - Router address. + * @returns A record of transfer path, undefined if not found + */ + public async getLastBidTime(router: string): Promise | undefined> { + const key = `${this.prefix}:bid`; + const res = await this.data.hget(key, router); + return res ? JSON.parse(res) : undefined; + } + + /** + * Add a router to the list. + * @param router - Router address. + */ + public async addRouter(router: string): Promise { + const activeKey = `${this.prefix}:active`; + const addressKey = `${this.prefix}:address`; + const res = await this.data.hget(activeKey, router); + if (!res) { + await this.data.hset(activeKey, router, getNtpTimeSeconds().toString()); + await this.data.rpush(addressKey, router); + } + } + + /** + * Get the recorded router addresses. + * @param offset - The start index. + * @param limit - The number of items to fetch. + * @returns The list of router address. + */ + public async getRouters(offset = 0, limit = 100): Promise { + const addressKey = `${this.prefix}:address`; + const routers = await this.data.lrange(addressKey, offset, offset + limit - 1); + return routers; + } } diff --git a/packages/adapters/cache/test/lib/caches/routers.spec.ts b/packages/adapters/cache/test/lib/caches/routers.spec.ts index c4db46a912..59a8ac6c8c 100644 --- a/packages/adapters/cache/test/lib/caches/routers.spec.ts +++ b/packages/adapters/cache/test/lib/caches/routers.spec.ts @@ -1,4 +1,4 @@ -import { Logger, expect, mock, getNtpTimeSeconds } from "@connext/nxtp-utils"; +import { Logger, expect, mock, getNtpTimeSeconds, mkAddress } from "@connext/nxtp-utils"; import { RoutersCache } from "../../../src/index"; import { TimestampedCacheValue } from "../../../src/lib/entities"; @@ -8,7 +8,7 @@ const redis = new RedisMock(); describe("RoutersCache", () => { const prefix = "routers"; - // Helpers for accessing mock cache directly and altering state. + // // Helpers for accessing mock cache directly and altering state. const mockRedisHelpers = { setLiquidity: async (domain: string, router: string, asset: string, amount: string, timestamp?: number) => await redis.hset( @@ -60,10 +60,10 @@ describe("RoutersCache", () => { const asset = mock.asset.A.address; const amount = "1234567890"; - await mockRedisHelpers.setLiquidity(domain, router, asset, amount); + await cache.setLiquidity(domain, router, asset, amount); const res = await cache.getLiquidity(domain, router, asset); - expect(res.toString()).to.be.eq(amount); + expect(res!.toString()).to.be.eq(amount); }); it("sad: should return undefined if liquidity data does not exist", async () => { @@ -80,17 +80,14 @@ describe("RoutersCache", () => { const domain = mock.domain.A; const router = mock.address.router; const asset = mock.asset.A.address; - await mockRedisHelpers.setLiquidity( domain, router, asset, - "123", - // Subtract another 10 secs to be safe. + "123", // Subtract another 10 secs to be safe. getNtpTimeSeconds() - RoutersCache.DEFAULT_LIQUIDITY_EXPIRY - 10, ); const res = await cache.getLiquidity(domain, router, asset); - expect(res).to.be.undefined; }); }); @@ -101,14 +98,11 @@ describe("RoutersCache", () => { const router = mock.address.router; const asset = mock.asset.A.address; const amount = "1234567890"; - const currentTime = getNtpTimeSeconds(); await cache.setLiquidity(domain, router, asset, amount); - const res = await mockRedisHelpers.getLiquidity(domain, router, asset); + const res = await cache.getLiquidity(domain, router, asset); - expect(res.value).to.be.eq(amount); - expect(res.timestamp).to.be.a("number"); - expect(res.timestamp).to.be.gte(currentTime); + expect(res!.toString()).to.be.eq(amount); }); it("happy: should update existing liquidity amount, along with timestamp", async () => { @@ -116,18 +110,51 @@ describe("RoutersCache", () => { const router = mock.address.router; const asset = mock.asset.A.address; const originalAmount = "1234567890"; - const originalTimestamp = 123; - const newAmount = "9876543210"; - const currentTime = getNtpTimeSeconds(); - - await mockRedisHelpers.setLiquidity(domain, router, asset, originalAmount, originalTimestamp); - await cache.setLiquidity(domain, router, asset, newAmount); - - const res = await mockRedisHelpers.getLiquidity(domain, router, asset); - expect(res.value).to.be.eq(newAmount); - expect(res.timestamp).to.be.a("number"); - expect(res.timestamp).to.not.be.eq(originalTimestamp); - expect(res.timestamp).to.be.gte(currentTime); + + await cache.setLiquidity(domain, router, asset, originalAmount); + const res = await cache.getLiquidity(domain, router, asset); + expect(res?.toString()).to.be.eq(originalAmount); + }); + }); + + describe("#setLastActive/getLastActive", () => { + it("happy: should set last active timestamp", async () => { + const mockRouter1 = mkAddress("0xrouter1"); + const mockRouter2 = mkAddress("0xrouter2"); + const curTimestamp = getNtpTimeSeconds(); + await cache.setLastActive(mockRouter1); + const lastActiveTimestamp1 = await cache.getLastActive(mockRouter1); + const lastActiveTimestamp2 = await cache.getLastActive(mockRouter2); + expect(+lastActiveTimestamp1).to.be.greaterThanOrEqual(curTimestamp); + expect(lastActiveTimestamp2).to.be.eq(0); + }); + }); + + describe("#setLastBidTime/getLastBidTime", () => { + it("happy: should set last active timestamp", async () => { + const mockRouter1 = mkAddress("0xrouter1"); + const mockRouter2 = mkAddress("0xrouter2"); + await cache.setLastBidTime(mockRouter1, { originDomain: "1111", destinationDomain: "2222", asset: "0xabc" }); + const lastBidTimeForRouter1 = await cache.getLastBidTime(mockRouter1); + expect(lastBidTimeForRouter1).to.not.undefined; + + const lastBidTimeForRouter2 = await cache.getLastBidTime(mockRouter2); + expect(lastBidTimeForRouter2).to.be.undefined; + }); + }); + + describe("#addRouter/getRouters", () => { + it("happy: should set last active timestamp", async () => { + const mockRouter1 = mkAddress("0xrouter1"); + const mockRouter2 = mkAddress("0xrouter2"); + await cache.addRouter(mockRouter1); + await cache.addRouter(mockRouter2); + + // this shouldn't be added + await cache.addRouter(mockRouter1); + + const routers = await cache.getRouters(); + expect(routers).to.be.deep.eq([mockRouter1, mockRouter2]); }); }); }); diff --git a/packages/adapters/txservice/package.json b/packages/adapters/txservice/package.json index e7d4f8d43d..1ff75114c7 100644 --- a/packages/adapters/txservice/package.json +++ b/packages/adapters/txservice/package.json @@ -1,6 +1,6 @@ { "name": "@connext/nxtp-txservice", - "version": "2.3.0-alpha.1", + "version": "2.3.0-alpha.5", "description": "Robust transaction sending service for a wallet configured across multiple chains. Will bump gas and reattempt transactions as needed", "author": "Connext", "license": "MIT", diff --git a/packages/agents/router/src/config.ts b/packages/agents/router/src/config.ts index 9eebf5327e..b5ad0f9ebb 100644 --- a/packages/agents/router/src/config.ts +++ b/packages/agents/router/src/config.ts @@ -26,9 +26,9 @@ const DEFAULT_CACHE_POLL_INTERVAL = 20_000; const DEFAULT_AUCTION_ROUND_DEPTH = 3; //Router MQ limits -export const DEFAULT_ROUTER_MQ_RETRY_LIMIT = 20; +export const DEFAULT_ROUTER_MQ_RETRY_LIMIT = 10; const DEFAULT_ROUTER_MQ_HEARTBEAT_LIMIT = 10; -const DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT = 10; +export const DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT = 10; // Sequencer and Cartographer default urls const SEQUENCER_URLS: Record = { diff --git a/packages/agents/router/src/setup.ts b/packages/agents/router/src/setup.ts index a739fd15fd..6bafe88c5e 100644 --- a/packages/agents/router/src/setup.ts +++ b/packages/agents/router/src/setup.ts @@ -4,14 +4,11 @@ import { ChainData, createMethodContext, Logger, RequestContext } from "@connext import rabbit from "foo-foo-mq"; import { MQConnectionClosed, MQConnectionFailed } from "./errors"; -import { DEFAULT_ROUTER_MQ_RETRY_LIMIT } from "./config"; export const XCALL_QUEUE = "xcalls"; export const MQ_EXCHANGE = "router"; export const XCALL_MESSAGE_TYPE = "xcall"; -let routerRetryLimit = DEFAULT_ROUTER_MQ_RETRY_LIMIT; - export const setupCache = async ( host: string | undefined, port: number | undefined, @@ -45,7 +42,9 @@ export const setupMq = async ( const methodContext = createMethodContext("setupMq"); // Disable reply queues const replyQueue = false; + logger.info("Message queue setup in progress...", requestContext, methodContext, { uri }); + await rabbit.configure({ connection: { uri, replyQueue, heartbeat, failAfter, retryLimit }, queues: [{ name: XCALL_QUEUE, limit }], @@ -53,32 +52,22 @@ export const setupMq = async ( bindings: [{ exchange: MQ_EXCHANGE, target: XCALL_QUEUE, keys: [XCALL_QUEUE] }], }); - await rabbit.on("closed", function () { + await rabbit.on("closed", async function () { throw new MQConnectionClosed(); }); + await rabbit.on("failed", async function () { - if (routerRetryLimit > 0) { - routerRetryLimit--; - logger.warn("MQ connection failed, retrying", requestContext, methodContext, { - uri, - routerRetryLimit, - }); - try { - await rabbit.retry(); - } catch (err: unknown) { - throw new MQConnectionFailed(err as Error); - } - } else { - throw new MQConnectionFailed(); - } + throw new MQConnectionFailed(); }); await rabbit.on("unreachable", async function () { - // throw new MQConnectionUnreachable(); - logger.warn("MQ is unreachable, retrying connection", requestContext, methodContext, { + throw new MQConnectionFailed(); + }); + + await rabbit.on("connected", function () { + logger.info("Connected to MQ!", requestContext, methodContext, { uri, }); - await rabbit.retry(); }); logger.info("Message queue setup is done!", requestContext, methodContext, { diff --git a/packages/agents/router/src/tasks/publisher/bindings/subgraph/index.ts b/packages/agents/router/src/tasks/publisher/bindings/subgraph/index.ts index 9ac705f54e..dc93b41092 100644 --- a/packages/agents/router/src/tasks/publisher/bindings/subgraph/index.ts +++ b/packages/agents/router/src/tasks/publisher/bindings/subgraph/index.ts @@ -1,7 +1,7 @@ import { createLoggingContext, jsonifyError } from "@connext/nxtp-utils"; import interval from "interval-promise"; -import { retryXCalls } from "../../operations"; +import { retryXCalls, sendStatusToSequencer } from "../../operations"; import { getMissingXCalls, getXCalls } from "../../operations/getXCalls"; import { getContext } from "../../publisher"; @@ -9,6 +9,7 @@ export const bindSubgraph = async (_pollInterval?: number) => { const { config, logger } = getContext(); const { requestContext, methodContext } = createLoggingContext(bindSubgraph.name); const pollInterval = _pollInterval ?? config.polling.subgraph; + interval(async (_, stop) => { if (config.mode.cleanup) { stop(); @@ -65,4 +66,22 @@ export const bindSubgraph = async (_pollInterval?: number) => { } } }, pollInterval); + + interval(async (_, stop) => { + if (config.mode.cleanup) { + stop(); + } else { + try { + // 4. Sends status to sequencer at a regular inverval + await sendStatusToSequencer(); + } catch (e: unknown) { + logger.error( + "Error sending status to sequencer, waiting for next loop", + requestContext, + methodContext, + jsonifyError(e as Error), + ); + } + } + }, pollInterval); }; diff --git a/packages/agents/router/src/tasks/publisher/context.ts b/packages/agents/router/src/tasks/publisher/context.ts index d95f413897..3d8e7f0415 100644 --- a/packages/agents/router/src/tasks/publisher/context.ts +++ b/packages/agents/router/src/tasks/publisher/context.ts @@ -4,11 +4,14 @@ import { StoreManager } from "@connext/nxtp-adapters-cache"; import Rabbit from "foo-foo-mq"; import { NxtpRouterConfig } from "../../config"; +import { Web3Signer } from "@connext/nxtp-adapters-web3signer"; +import { Wallet } from "ethers"; export type AppContext = { logger: Logger; adapters: { // Stateful interfaces for peripherals. + wallet: Wallet | Web3Signer; // Used for signing metatxs for bids. cache: StoreManager; // Used to cache important data locally. subgraph: SubgraphReader; // Aggregates subgraphs in a FallbackSubgraph for each chain. mqClient: typeof Rabbit; diff --git a/packages/agents/router/src/tasks/publisher/operations/index.ts b/packages/agents/router/src/tasks/publisher/operations/index.ts index b68ad410f5..3ee7c225c2 100644 --- a/packages/agents/router/src/tasks/publisher/operations/index.ts +++ b/packages/agents/router/src/tasks/publisher/operations/index.ts @@ -1,2 +1,3 @@ export { getXCalls } from "./getXCalls"; export { retryXCalls } from "./retryXCalls"; +export { sendStatusToSequencer } from "./status"; diff --git a/packages/agents/router/src/tasks/publisher/operations/status.ts b/packages/agents/router/src/tasks/publisher/operations/status.ts new file mode 100644 index 0000000000..b433652acd --- /dev/null +++ b/packages/agents/router/src/tasks/publisher/operations/status.ts @@ -0,0 +1,49 @@ +import { + NxtpError, + RouterPingMessage, + RouterPingRequest, + createLoggingContext, + getNtpTimeSeconds, + jsonifyError, +} from "@connext/nxtp-utils"; + +import { getContext } from "../publisher"; +import { axiosPost } from "../../../mockable"; + +/** + * Sends the active status to the sequencer for monitoring/alerting. + */ +export const sendStatusToSequencer = async (): Promise => { + const { + adapters: { wallet }, + logger, + routerAddress, + config, + } = getContext(); + + const { requestContext, methodContext } = createLoggingContext(sendStatusToSequencer.name); + logger.info(`Sending router status to the sequencer`, requestContext, methodContext); + try { + const curTime = getNtpTimeSeconds(); + const signMsg = await wallet.signMessage(`${RouterPingMessage}-${curTime}`); + + const response = await axiosPost(`${config.sequencerUrl}/router-ping`, { + router: routerAddress, + timestamp: curTime, + signed: signMsg, + }); + + if (!response || !response.data) { + logger.error("Sending status to the sequencer failed", requestContext, methodContext); + } else { + logger.info("Sent status to the sequencer", requestContext, methodContext, { data: response.data }); + } + } catch (error: unknown) { + logger.error( + "Sending status to the sequencer failed", + requestContext, + methodContext, + jsonifyError(error as NxtpError), + ); + } +}; diff --git a/packages/agents/router/src/tasks/publisher/publisher.ts b/packages/agents/router/src/tasks/publisher/publisher.ts index e99ab8d050..bdd7777cad 100644 --- a/packages/agents/router/src/tasks/publisher/publisher.ts +++ b/packages/agents/router/src/tasks/publisher/publisher.ts @@ -1,12 +1,27 @@ -import { createMethodContext, createRequestContext, getChainData, Logger } from "@connext/nxtp-utils"; +import { + createMethodContext, + createRequestContext, + getChainData, + Logger, + jsonifyError, + getNtpTimeSeconds, +} from "@connext/nxtp-utils"; import { contractDeployments } from "@connext/nxtp-txservice"; +import rabbit from "foo-foo-mq"; -import { getConfig, NxtpRouterConfig } from "../../config"; +import { + getConfig, + NxtpRouterConfig, + DEFAULT_ROUTER_MQ_RETRY_LIMIT, + DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT, +} from "../../config"; import { bindMetrics } from "../../bindings"; import { setupCache, setupMq, setupSubgraphReader } from "../../setup"; import { AppContext } from "./context"; import { bindSubgraph, bindServer } from "./bindings"; +import { Wallet } from "ethers"; +import { Web3Signer } from "@connext/nxtp-adapters-web3signer"; // AppContext instance used for interacting with adapters, config, etc. const context: AppContext = {} as any; @@ -15,6 +30,7 @@ export const getContext = () => context; export const makePublisher = async (_configOverride?: NxtpRouterConfig) => { const requestContext = createRequestContext("Publisher Init"); const methodContext = createMethodContext(makePublisher.name); + let MQConnection = false; try { context.adapters = {} as any; @@ -24,6 +40,18 @@ export const makePublisher = async (_configOverride?: NxtpRouterConfig) => { context.chainData = await getChainData(); context.config = _configOverride ?? (await getConfig(context.chainData, contractDeployments)); + /// MARK - Signer + if (!context.config.mnemonic && !context.config.web3SignerUrl) { + throw new Error( + "No mnemonic or web3signer was configured. Please ensure either a mnemonic or a web3signer" + + " URL is provided in the config. Exiting!", + ); + } + context.adapters.wallet = context.config.mnemonic + ? Wallet.fromMnemonic(context.config.mnemonic) + : new Web3Signer(context.config.web3SignerUrl!); + context.routerAddress = await context.adapters.wallet.getAddress(); + /// MARK - Logger context.logger = new Logger({ level: context.config.logLevel, @@ -53,15 +81,52 @@ export const makePublisher = async (_configOverride?: NxtpRouterConfig) => { context.logger, requestContext, ); - context.adapters.mqClient = await setupMq( - context.config.messageQueue.uri as string, - context.config.messageQueue.limit as number, - context.config.messageQueue.heartbeat as number, - context.config.messageQueue.failAfter as number, - context.config.messageQueue.retryLimit as number, - context.logger, - requestContext, - ); + + const retryTimeLimit = + getNtpTimeSeconds() + + (context.config.messageQueue.retryLimit ?? DEFAULT_ROUTER_MQ_RETRY_LIMIT) * + (context.config.messageQueue.failAfter ?? DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT); + + while (retryTimeLimit > getNtpTimeSeconds()) { + try { + context.adapters.mqClient = await setupMq( + context.config.messageQueue.uri as string, + context.config.messageQueue.limit as number, + context.config.messageQueue.heartbeat as number, + context.config.messageQueue.failAfter as number, + context.config.messageQueue.retryLimit as number, + context.logger, + requestContext, + ); + context.logger.info("MQ configuration successfull.", requestContext, methodContext); + MQConnection = true; + break; + } catch (e: unknown) { + MQConnection = false; + rabbit.reset(); + context.logger.error( + "Error binding message queue, retrying...", + requestContext, + methodContext, + jsonifyError(e as Error), + ); + // Wait for 1 second before retrying. + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } + if (!MQConnection) { + // If we failed to connect to the message queue after many retries, throw an error. + rabbit.reset(); + context.adapters.mqClient = await setupMq( + context.config.messageQueue.uri as string, + context.config.messageQueue.limit as number, + context.config.messageQueue.heartbeat as number, + context.config.messageQueue.failAfter as number, + context.config.messageQueue.retryLimit as number, + context.logger, + requestContext, + ); + } /// MARK - Bindings await bindMetrics("publisher"); diff --git a/packages/agents/router/src/tasks/subscriber/subscriber.ts b/packages/agents/router/src/tasks/subscriber/subscriber.ts index c798af9247..123825e437 100644 --- a/packages/agents/router/src/tasks/subscriber/subscriber.ts +++ b/packages/agents/router/src/tasks/subscriber/subscriber.ts @@ -1,9 +1,22 @@ import { Wallet } from "ethers"; -import { createMethodContext, createRequestContext, getChainData, jsonifyError, Logger } from "@connext/nxtp-utils"; +import { + createMethodContext, + createRequestContext, + getChainData, + getNtpTimeSeconds, + jsonifyError, + Logger, +} from "@connext/nxtp-utils"; import { Web3Signer } from "@connext/nxtp-adapters-web3signer"; import { getContractInterfaces, TransactionService, contractDeployments } from "@connext/nxtp-txservice"; - -import { getConfig, NxtpRouterConfig } from "../../config"; +import rabbit from "foo-foo-mq"; + +import { + DEFAULT_ROUTER_MQ_RETRY_LIMIT, + DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT, + getConfig, + NxtpRouterConfig, +} from "../../config"; import { bindMetrics } from "../../bindings"; import { setupMq, setupSubgraphReader } from "../../setup"; import { axiosGet } from "../../mockable"; @@ -18,6 +31,7 @@ export const getContext = () => context; export const makeSubscriber = async (_configOverride?: NxtpRouterConfig) => { const requestContext = createRequestContext("Router subscriber Init"); const methodContext = createMethodContext(makeSubscriber.name); + let MQConnection = false; try { context.adapters = {} as any; @@ -69,15 +83,6 @@ export const makeSubscriber = async (_configOverride?: NxtpRouterConfig) => { context.adapters.wallet as Wallet, ); context.adapters.contracts = getContractInterfaces(); - context.adapters.mqClient = await setupMq( - context.config.messageQueue.uri as string, - context.config.messageQueue.limit as number, - context.config.messageQueue.heartbeat as number, - context.config.messageQueue.failAfter as number, - context.config.messageQueue.retryLimit as number, - context.logger, - requestContext, - ); /// MARK - Validation for auctionRoundDepth @@ -107,7 +112,54 @@ export const makeSubscriber = async (_configOverride?: NxtpRouterConfig) => { // TODO: New diagnostic mode / cleanup mode? await bindServer(); await bindMetrics("subscriber"); - await bindMessageQueue(); + + const retryTimeLimit = + getNtpTimeSeconds() + + (context.config.messageQueue.retryLimit ?? DEFAULT_ROUTER_MQ_RETRY_LIMIT) * + (context.config.messageQueue.failAfter ?? DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT); + + while (retryTimeLimit > getNtpTimeSeconds()) { + try { + context.adapters.mqClient = await setupMq( + context.config.messageQueue.uri as string, + context.config.messageQueue.limit as number, + context.config.messageQueue.heartbeat as number, + context.config.messageQueue.failAfter as number, + context.config.messageQueue.retryLimit as number, + context.logger, + requestContext, + ); + await bindMessageQueue(); + context.logger.info("MQ subscription successfull.", requestContext, methodContext); + MQConnection = true; + break; + } catch (e: unknown) { + rabbit.reset(); + context.logger.error( + "Error binding message queue, retrying...", + requestContext, + methodContext, + jsonifyError(e as Error), + ); + // Wait for 1 second before retrying. + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } + if (!MQConnection) { + // If we failed to connect to the message queue after many retries, throw an error. + MQConnection = false; + rabbit.reset(); + context.adapters.mqClient = await setupMq( + context.config.messageQueue.uri as string, + context.config.messageQueue.limit as number, + context.config.messageQueue.heartbeat as number, + context.config.messageQueue.failAfter as number, + context.config.messageQueue.retryLimit as number, + context.logger, + requestContext, + ); + await bindMessageQueue(); + } context.logger.info("Bindings initialized.", requestContext, methodContext); context.logger.info("Router subscriber boot complete!", requestContext, methodContext, { diff --git a/packages/agents/router/test/mock.ts b/packages/agents/router/test/mock.ts index b6e6bdb0b2..54c78cbe35 100644 --- a/packages/agents/router/test/mock.ts +++ b/packages/agents/router/test/mock.ts @@ -20,6 +20,7 @@ export const mock = { publisherContext: (): PublisherAppContext => { return { adapters: { + wallet: mock.adapters.wallet(), subgraph: mock.adapters.subgraph(), cache: mock.adapters.cache(), mqClient: mock.adapters.mqClient() as any, @@ -48,6 +49,7 @@ export const mock = { executorContext: (): ExecutorAppContext => { return { adapters: { + wallet: mock.adapters.wallet(), chainreader: mock.adapters.chainreader(), contracts: mock.contracts.interfaces(), }, diff --git a/packages/agents/router/test/publisher/operations/status.spec.ts b/packages/agents/router/test/publisher/operations/status.spec.ts new file mode 100644 index 0000000000..6925f26691 --- /dev/null +++ b/packages/agents/router/test/publisher/operations/status.spec.ts @@ -0,0 +1,27 @@ +import { expect } from "@connext/nxtp-utils"; +import { mockPubContext } from "../../globalTestHook"; +import { sendStatusToSequencer } from "../../../src/tasks/publisher/operations/status"; +import { SinonStub, stub } from "sinon"; +import * as Mockable from "../../../src/mockable"; + +describe("Operation:status", () => { + describe("#sendStatusToSequencer", () => { + const mockSequencerUrl = "http://mockUrl:1234"; + let axiosPostStub: SinonStub; + beforeEach(() => { + mockPubContext.config.sequencerUrl = mockSequencerUrl; + axiosPostStub = stub(Mockable, "axiosPost").resolves({ data: "ok" }); + }); + it("send router status to the sequencer successfully", async () => { + expect(() => sendStatusToSequencer()).to.not.throw; + }); + it("should throw if sending failed", async () => { + axiosPostStub.rejects({ response: { data: { message: "Forbidden" } } }); + expect(sendStatusToSequencer()).to.not.throw; + }); + it("should fail to send router status", () => { + axiosPostStub.rejects({ response: { error: "Bad Response" } }); + expect(sendStatusToSequencer()).to.not.throw; + }); + }); +}); diff --git a/packages/agents/sdk-wrapper/CHANGELOG.md b/packages/agents/sdk-wrapper/CHANGELOG.md index 10bf77afa6..f97d88b501 100644 --- a/packages/agents/sdk-wrapper/CHANGELOG.md +++ b/packages/agents/sdk-wrapper/CHANGELOG.md @@ -2,6 +2,10 @@ ## Next Release +## v2.3.0-alpha.2 + +- Mainnet support for mode + ## v2.3.0-alpha.1 - Environment-specific Unwrapper contracts deployed diff --git a/packages/agents/sdk-wrapper/package.json b/packages/agents/sdk-wrapper/package.json index 808bc8a89c..2451b6816d 100644 --- a/packages/agents/sdk-wrapper/package.json +++ b/packages/agents/sdk-wrapper/package.json @@ -1,6 +1,6 @@ { "name": "@connext/sdk", - "version": "2.3.0-alpha.1", + "version": "2.3.0-alpha.5", "description": "Client-side package for interacting with the Connext protocol", "author": "Connext", "license": "MIT", diff --git a/packages/agents/sdk/CHANGELOG.md b/packages/agents/sdk/CHANGELOG.md index d9679c3106..744d8db4f7 100644 --- a/packages/agents/sdk/CHANGELOG.md +++ b/packages/agents/sdk/CHANGELOG.md @@ -2,6 +2,10 @@ ## Next Release +## v2.3.0-alpha.2 + +- Mainnet support for mode + ## v2.3.0-alpha.1 - Environment-specific Unwrapper contracts deployed diff --git a/packages/agents/sdk/package.json b/packages/agents/sdk/package.json index aa74cd7994..067535ce82 100644 --- a/packages/agents/sdk/package.json +++ b/packages/agents/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@connext/sdk-core", - "version": "2.3.0-alpha.1", + "version": "2.3.0-alpha.5", "description": "Client-side package for interacting with the Connext protocol", "author": "Connext", "license": "MIT", diff --git a/packages/agents/sdk/src/config.ts b/packages/agents/sdk/src/config.ts index 48465119e9..eab6099002 100644 --- a/packages/agents/sdk/src/config.ts +++ b/packages/agents/sdk/src/config.ts @@ -192,6 +192,9 @@ export const domainsToChainNames: Record = { "6450786": "bsc", "6778479": "xdai", "1818848877": "linea", + "1835365481": "metis", + "1650553709": "base", + "1836016741": "mode", }; // Need to add more domains here. diff --git a/packages/agents/sequencer/src/bindings/server/index.ts b/packages/agents/sequencer/src/bindings/server/index.ts index ac38b5ad25..76aa4c2d90 100644 --- a/packages/agents/sequencer/src/bindings/server/index.ts +++ b/packages/agents/sequencer/src/bindings/server/index.ts @@ -23,7 +23,13 @@ import { ExecStatusRequest, ExecStatusResponse, ExecStatusResponseSchema, + RouterPingRequest, + RouterPingRequestSchema, + RouterPingMessage, + RouterStatusApiResponseSchema, + RouterStatusApiResponse, } from "@connext/nxtp-utils"; +import { verifyMessage } from "ethers/lib/utils"; import { getContext } from "../../sequencer"; import { MessageType, HTTPMessage } from "../../lib/entities"; @@ -40,6 +46,40 @@ export const bindServer = async (queueName: string, channel: Broker.Channel): Pr server.get("/supportedBidVersion", (_, res) => api.get.supportedBidVersion(res)); + server.get<{ + Params: { router: string }; + Reply: RouterStatusApiResponse | SequencerApiErrorResponse; + }>( + "/router-status", + { + schema: { + response: { + 200: RouterStatusApiResponseSchema, + 500: SequencerApiErrorResponseSchema, + }, + }, + }, + async (request, response) => { + const { requestContext, methodContext } = createLoggingContext("GET /router-status/:router endpoint"); + + try { + const { router } = request.params; + const lastActiveTimestamp = await cache.routers.getLastActive(router); + const lastBidTimestamp = await cache.routers.getLastBidTime(router); + + return response.status(200).send({ + lastActiveTimestamp, + lastBidTimestamp: lastBidTimestamp ?? ({} as any), + }); + } catch (error: unknown) { + logger.debug(`Router Status by Router Get Error`, requestContext, methodContext, jsonifyError(error as Error)); + return response + .code(500) + .send({ message: `Router Status by Router Get Error`, error: jsonifyError(error as Error) }); + } + }, + ); + server.get<{ Params: { transferId: string }; Reply: ExecuteFastApiGetExecStatusResponse | SequencerApiErrorResponse; @@ -188,6 +228,21 @@ export const bindServer = async (queueName: string, channel: Broker.Channel): Pr async (req, res) => api.auth.admin(req.body, res, api.post.clearCache), ); + server.post<{ Body: RouterPingRequest }>( + "/router-ping", + { schema: { body: RouterPingRequestSchema } }, + async (req, res) => { + const { router, timestamp, signed } = req.body; + const signerAddress = verifyMessage(`${RouterPingMessage}-${timestamp}`, signed); + if (router.toLowerCase() == signerAddress.toLowerCase()) { + await cache.routers.setLastActive(router); + return res.status(200).send({ message: "OK" }); + } else { + return res.code(500).send({ message: "Invalid signature" }); + } + }, + ); + const address = await server.listen({ port: config.server.http.port, host: config.server.http.host }); logger.info(`Server listening at ${address}`); return server; diff --git a/packages/agents/sequencer/src/lib/operations/execute/fastpath.ts b/packages/agents/sequencer/src/lib/operations/execute/fastpath.ts index 7c9ffadb41..85efb87618 100644 --- a/packages/agents/sequencer/src/lib/operations/execute/fastpath.ts +++ b/packages/agents/sequencer/src/lib/operations/execute/fastpath.ts @@ -364,6 +364,7 @@ export const executeFastPathData = async ( }), }, }); + // Send the relayer request based on chosen bids. const { taskId: _taskId } = await sendExecuteFastToRelayer( roundIdInNum, @@ -381,6 +382,17 @@ export const executeFastPathData = async ( destination, }); + // Update the last bid time for a given router. + await Promise.all( + randomCombination.map((bid) => + cache.routers.setLastBidTime(bid.router, { + originDomain: transfer!.xparams.originDomain, + destinationDomain: transfer!.xparams.destinationDomain, + asset: transfer!.origin.assets.transacting.asset, + }), + ), + ); + // Update router liquidity record to reflect spending. for (const router of routerLiquidityMap.keys()) { const routerLiquidity = routerLiquidityMap.get(router)!.sub(assignedAmount); diff --git a/packages/deployments/contracts/package.json b/packages/deployments/contracts/package.json index bbc22f1ab5..5cdf973b99 100644 --- a/packages/deployments/contracts/package.json +++ b/packages/deployments/contracts/package.json @@ -1,6 +1,6 @@ { "name": "@connext/smart-contracts", - "version": "2.3.0-alpha.1", + "version": "2.3.0-alpha.5", "description": "", "scripts": { "test": "yarn forge test --no-match-path '*/fork/**.sol'", diff --git a/packages/utils/package.json b/packages/utils/package.json index 5351bec587..05f53af03f 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -1,6 +1,6 @@ { "name": "@connext/nxtp-utils", - "version": "2.3.0-alpha.0", + "version": "2.3.0-alpha.5", "description": "Common utilities for use within the @connext/nxtp-* packages", "author": "Connext", "license": "MIT", diff --git a/packages/utils/src/peripherals/gelato.ts b/packages/utils/src/peripherals/gelato.ts index fa96418e7d..71840afdc1 100644 --- a/packages/utils/src/peripherals/gelato.ts +++ b/packages/utils/src/peripherals/gelato.ts @@ -68,6 +68,7 @@ const EquivalentChainsForTestnetEstimate: Record = { const EquivalentChainsForGelato: Record = { // MAINNETS 59140: 42161, // linea + 34443: 42161, // mode // LOCALNETS 1337: 1, // local chain diff --git a/packages/utils/src/types/api.ts b/packages/utils/src/types/api.ts index b75a4adfe6..b9b14307c4 100644 --- a/packages/utils/src/types/api.ts +++ b/packages/utils/src/types/api.ts @@ -24,6 +24,14 @@ export type AdminRequest = Static; export const ClearCacheRequestSchema = AdminSchema; export type ClearCacheRequest = Static; +export const RouterPingMessage = "ROUTER_PING"; +export const RouterPingRequestSchema = Type.Object({ + router: Type.String(), + timestamp: Type.String(), + signed: Type.String(), +}); +export type RouterPingRequest = Static; + /// MARK - Sequencer API ---------------------------------------------------------------------------- export const SequencerApiErrorResponseSchema = Type.Object({ @@ -57,6 +65,12 @@ export const ExecuteFastApiGetAuctionsStatusResponseSchema = Type.Object({ export type ExecuteFastApiGetExecStatusResponse = Static; +export const RouterStatusApiResponseSchema = Type.Object({ + lastActiveTimestamp: Type.Number(), + lastBidTimestamp: Type.Record(Type.String(), Type.String()), +}); +export type RouterStatusApiResponse = Static; + export const ExecuteFastApiGetQueuedResponseSchema = Type.Object({ queued: Type.Array(Type.String()), }); diff --git a/yarn.lock b/yarn.lock index 94cb577205..487214e48d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -15502,7 +15502,7 @@ __metadata: es6-iterator: ^2.0.3 es6-symbol: ^3.1.3 next-tick: ^1.1.0 - checksum: 721bcd16406872c934c54dcfc870a18544ff88d0633a271aab15235d06148d6c0eff8b5230a8e21556692c555ac7d64fb94439fcdbd99690eaaec84c80a19ad0 + checksum: 25f42f6068cfc6e393cf670bc5bba249132c5f5ec2dd0ed6e200e6274aca2fed8e9aec8a31c76031744c78ca283c57f0b41c7e737804c6328c7b8d3fbcba7983 languageName: node linkType: hard