Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

testnet-prod <- main Sync #5807

Merged
merged 23 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
57e73a3
feat: cache methods
wanglonghong Feb 6, 2024
81d051f
chore: apis
wanglonghong Feb 7, 2024
61cdeb8
feat: bunch of router stuff
wanglonghong Feb 27, 2024
d9caeb4
chore: progress
wanglonghong Feb 28, 2024
e0a0a57
chore: refactor
wanglonghong Feb 28, 2024
23cf164
Mode: bump version for sdk and contracts (#5780)
prathmeshkhandelwal1 Feb 28, 2024
59b4aae
fix: postgres docker updated
prathmeshkhandelwal1 Feb 29, 2024
79c29cb
Merge pull request #5790 from connext/hotfix-postgres-update
wanglonghong Feb 29, 2024
b3143c6
fix: utils updated
prathmeshkhandelwal1 Feb 29, 2024
64a2225
Merge pull request #5792 from connext/fix/sdk-mode
wanglonghong Feb 29, 2024
b62df94
fix: utils
prathmeshkhandelwal1 Feb 29, 2024
9b1d2c9
Merge pull request #5796 from connext/feat/sdk-utils-2
wanglonghong Feb 29, 2024
1e4975d
fix: retry mq conn on restart
preethamr Mar 1, 2024
395b170
chore: more
wanglonghong Mar 1, 2024
f05577a
feat: use arbitrum as equivalent
wanglonghong Mar 1, 2024
cd39e5d
Merge pull request #5801 from connext/use-mainnet-for-mode-conversion…
wanglonghong Mar 1, 2024
54e56b5
fix: bump
prathmeshkhandelwal1 Mar 1, 2024
b70a927
Merge pull request #5802 from connext/hotfix-mode
wanglonghong Mar 1, 2024
2285a69
ci: adding coverage
wanglonghong Mar 1, 2024
8b4747c
ci: lint
wanglonghong Mar 1, 2024
af8d5c8
fix: better restart with retries than reconnect
preethamr Mar 1, 2024
4b0a647
Merge pull request #5675 from connext/feat/router-monitor
preethamr Mar 1, 2024
9b6806c
Merge pull request #5799 from connext/router_retry
preethamr Mar 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.devnet-services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ services:
container_name: cartographer-api
depends_on:
- cartographer-database
image: postgrest/postgrest:v9.0.0.20220107
image: postgrest/postgrest:v10.0.0
ports:
- "3000:3000"
environment:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ services:
container_name: cartographer-api
depends_on:
- cartographer-database
image: postgrest/postgrest:v9.0.0.20220107
image: postgrest/postgrest:v10.0.0
ports:
- "3000:3000"
environment:
Expand Down
80 changes: 79 additions & 1 deletion packages/adapters/cache/src/lib/caches/routers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ export class RoutersCache extends Cache {
// TODO: Implement configurable expiry times per domain.
// Default expiry time (in seconds) after which liquidity data is considered stale.
public static readonly DEFAULT_LIQUIDITY_EXPIRY = 30; // 30 seconds.
public static readonly DEFAULT_APPROVAL_EXPIRY = 24 * 60 * 60; // 24 hours.
private readonly prefix = "routers";

/**
Expand Down Expand Up @@ -59,4 +58,83 @@ export class RoutersCache extends Cache {
}),
);
}

/**
* Set last active time for a given router.
* @param router - Router address.
*/
public async setLastActive(router: string): Promise<void> {
const key = `${this.prefix}:active`;
await this.data.hset(key, router, getNtpTimeSeconds().toString());
}

/**
* Get the recorded last active time for a given router.
* @param router - Router address.
* @returns The timestamp if recorded, 0 if not found.
*/
public async getLastActive(router: string): Promise<number> {
const key = `${this.prefix}:active`;
const res = await this.data.hget(key, router);
const lastActiveTime = res ? +res : 0;
return lastActiveTime;
}

/**
* Set the last bid time for a given router.
* @param router - Router address.
* @param bid - The bid instance.
*/
public async setLastBidTime(
router: string,
bid: { originDomain: string; destinationDomain: string; asset: string },
): Promise<void> {
const key = `${this.prefix}:bid`;
const bidKey = `${bid.originDomain}:${bid.destinationDomain}:${bid.asset}`;
let lastBids: Record<string, string> = {};
const currentTimestamp = getNtpTimeSeconds().toString();
const res = await this.data.hget(key, router);
if (res) {
lastBids = JSON.parse(res);
}
lastBids[bidKey] = currentTimestamp;
await this.data.hset(key, router, JSON.stringify(lastBids));
}

/**
* Get the recorded last bid time for a given router.
* @param router - Router address.
* @returns A record of transfer path, undefined if not found
*/
public async getLastBidTime(router: string): Promise<Record<string, string> | undefined> {
const key = `${this.prefix}:bid`;
const res = await this.data.hget(key, router);
return res ? JSON.parse(res) : undefined;
}

/**
* Add a router to the list.
* @param router - Router address.
*/
public async addRouter(router: string): Promise<void> {
const activeKey = `${this.prefix}:active`;
const addressKey = `${this.prefix}:address`;
const res = await this.data.hget(activeKey, router);
if (!res) {
await this.data.hset(activeKey, router, getNtpTimeSeconds().toString());
await this.data.rpush(addressKey, router);
}
}

/**
* Get the recorded router addresses.
* @param offset - The start index.
* @param limit - The number of items to fetch.
* @returns The list of router address.
*/
public async getRouters(offset = 0, limit = 100): Promise<string[]> {
const addressKey = `${this.prefix}:address`;
const routers = await this.data.lrange(addressKey, offset, offset + limit - 1);
return routers;
}
}
77 changes: 52 additions & 25 deletions packages/adapters/cache/test/lib/caches/routers.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Logger, expect, mock, getNtpTimeSeconds } from "@connext/nxtp-utils";
import { Logger, expect, mock, getNtpTimeSeconds, mkAddress } from "@connext/nxtp-utils";

import { RoutersCache } from "../../../src/index";
import { TimestampedCacheValue } from "../../../src/lib/entities";
Expand All @@ -8,7 +8,7 @@ const redis = new RedisMock();

describe("RoutersCache", () => {
const prefix = "routers";
// Helpers for accessing mock cache directly and altering state.
// // Helpers for accessing mock cache directly and altering state.
const mockRedisHelpers = {
setLiquidity: async (domain: string, router: string, asset: string, amount: string, timestamp?: number) =>
await redis.hset(
Expand Down Expand Up @@ -60,10 +60,10 @@ describe("RoutersCache", () => {
const asset = mock.asset.A.address;
const amount = "1234567890";

await mockRedisHelpers.setLiquidity(domain, router, asset, amount);
await cache.setLiquidity(domain, router, asset, amount);
const res = await cache.getLiquidity(domain, router, asset);

expect(res.toString()).to.be.eq(amount);
expect(res!.toString()).to.be.eq(amount);
});

it("sad: should return undefined if liquidity data does not exist", async () => {
Expand All @@ -80,17 +80,14 @@ describe("RoutersCache", () => {
const domain = mock.domain.A;
const router = mock.address.router;
const asset = mock.asset.A.address;

await mockRedisHelpers.setLiquidity(
domain,
router,
asset,
"123",
// Subtract another 10 secs to be safe.
"123", // Subtract another 10 secs to be safe.
getNtpTimeSeconds() - RoutersCache.DEFAULT_LIQUIDITY_EXPIRY - 10,
);
const res = await cache.getLiquidity(domain, router, asset);

expect(res).to.be.undefined;
});
});
Expand All @@ -101,33 +98,63 @@ describe("RoutersCache", () => {
const router = mock.address.router;
const asset = mock.asset.A.address;
const amount = "1234567890";
const currentTime = getNtpTimeSeconds();

await cache.setLiquidity(domain, router, asset, amount);
const res = await mockRedisHelpers.getLiquidity(domain, router, asset);
const res = await cache.getLiquidity(domain, router, asset);

expect(res.value).to.be.eq(amount);
expect(res.timestamp).to.be.a("number");
expect(res.timestamp).to.be.gte(currentTime);
expect(res!.toString()).to.be.eq(amount);
});

it("happy: should update existing liquidity amount, along with timestamp", async () => {
const domain = mock.domain.A;
const router = mock.address.router;
const asset = mock.asset.A.address;
const originalAmount = "1234567890";
const originalTimestamp = 123;
const newAmount = "9876543210";
const currentTime = getNtpTimeSeconds();

await mockRedisHelpers.setLiquidity(domain, router, asset, originalAmount, originalTimestamp);
await cache.setLiquidity(domain, router, asset, newAmount);

const res = await mockRedisHelpers.getLiquidity(domain, router, asset);
expect(res.value).to.be.eq(newAmount);
expect(res.timestamp).to.be.a("number");
expect(res.timestamp).to.not.be.eq(originalTimestamp);
expect(res.timestamp).to.be.gte(currentTime);

await cache.setLiquidity(domain, router, asset, originalAmount);
const res = await cache.getLiquidity(domain, router, asset);
expect(res?.toString()).to.be.eq(originalAmount);
});
});

describe("#setLastActive/getLastActive", () => {
it("happy: should set last active timestamp", async () => {
const mockRouter1 = mkAddress("0xrouter1");
const mockRouter2 = mkAddress("0xrouter2");
const curTimestamp = getNtpTimeSeconds();
await cache.setLastActive(mockRouter1);
const lastActiveTimestamp1 = await cache.getLastActive(mockRouter1);
const lastActiveTimestamp2 = await cache.getLastActive(mockRouter2);
expect(+lastActiveTimestamp1).to.be.greaterThanOrEqual(curTimestamp);
expect(lastActiveTimestamp2).to.be.eq(0);
});
});

describe("#setLastBidTime/getLastBidTime", () => {
it("happy: should set last active timestamp", async () => {
const mockRouter1 = mkAddress("0xrouter1");
const mockRouter2 = mkAddress("0xrouter2");
await cache.setLastBidTime(mockRouter1, { originDomain: "1111", destinationDomain: "2222", asset: "0xabc" });
const lastBidTimeForRouter1 = await cache.getLastBidTime(mockRouter1);
expect(lastBidTimeForRouter1).to.not.undefined;

const lastBidTimeForRouter2 = await cache.getLastBidTime(mockRouter2);
expect(lastBidTimeForRouter2).to.be.undefined;
});
});

describe("#addRouter/getRouters", () => {
it("happy: should set last active timestamp", async () => {
const mockRouter1 = mkAddress("0xrouter1");
const mockRouter2 = mkAddress("0xrouter2");
await cache.addRouter(mockRouter1);
await cache.addRouter(mockRouter2);

// this shouldn't be added
await cache.addRouter(mockRouter1);

const routers = await cache.getRouters();
expect(routers).to.be.deep.eq([mockRouter1, mockRouter2]);
});
});
});
2 changes: 1 addition & 1 deletion packages/adapters/txservice/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@connext/nxtp-txservice",
"version": "2.3.0-alpha.1",
"version": "2.3.0-alpha.5",
"description": "Robust transaction sending service for a wallet configured across multiple chains. Will bump gas and reattempt transactions as needed",
"author": "Connext",
"license": "MIT",
Expand Down
4 changes: 2 additions & 2 deletions packages/agents/router/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ const DEFAULT_CACHE_POLL_INTERVAL = 20_000;
const DEFAULT_AUCTION_ROUND_DEPTH = 3;

//Router MQ limits
export const DEFAULT_ROUTER_MQ_RETRY_LIMIT = 20;
export const DEFAULT_ROUTER_MQ_RETRY_LIMIT = 10;
const DEFAULT_ROUTER_MQ_HEARTBEAT_LIMIT = 10;
const DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT = 10;
export const DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT = 10;

// Sequencer and Cartographer default urls
const SEQUENCER_URLS: Record<string, any> = {
Expand Down
31 changes: 10 additions & 21 deletions packages/agents/router/src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ import { ChainData, createMethodContext, Logger, RequestContext } from "@connext
import rabbit from "foo-foo-mq";

import { MQConnectionClosed, MQConnectionFailed } from "./errors";
import { DEFAULT_ROUTER_MQ_RETRY_LIMIT } from "./config";

export const XCALL_QUEUE = "xcalls";
export const MQ_EXCHANGE = "router";
export const XCALL_MESSAGE_TYPE = "xcall";

let routerRetryLimit = DEFAULT_ROUTER_MQ_RETRY_LIMIT;

export const setupCache = async (
host: string | undefined,
port: number | undefined,
Expand Down Expand Up @@ -45,40 +42,32 @@ export const setupMq = async (
const methodContext = createMethodContext("setupMq");
// Disable reply queues
const replyQueue = false;

logger.info("Message queue setup in progress...", requestContext, methodContext, { uri });

await rabbit.configure({
connection: { uri, replyQueue, heartbeat, failAfter, retryLimit },
queues: [{ name: XCALL_QUEUE, limit }],
exchanges: [{ name: MQ_EXCHANGE, type: "direct" }],
bindings: [{ exchange: MQ_EXCHANGE, target: XCALL_QUEUE, keys: [XCALL_QUEUE] }],
});

await rabbit.on("closed", function () {
await rabbit.on("closed", async function () {
throw new MQConnectionClosed();
});

await rabbit.on("failed", async function () {
if (routerRetryLimit > 0) {
routerRetryLimit--;
logger.warn("MQ connection failed, retrying", requestContext, methodContext, {
uri,
routerRetryLimit,
});
try {
await rabbit.retry();
} catch (err: unknown) {
throw new MQConnectionFailed(err as Error);
}
} else {
throw new MQConnectionFailed();
}
throw new MQConnectionFailed();
});

await rabbit.on("unreachable", async function () {
// throw new MQConnectionUnreachable();
logger.warn("MQ is unreachable, retrying connection", requestContext, methodContext, {
throw new MQConnectionFailed();
});

await rabbit.on("connected", function () {
logger.info("Connected to MQ!", requestContext, methodContext, {
uri,
});
await rabbit.retry();
});

logger.info("Message queue setup is done!", requestContext, methodContext, {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { createLoggingContext, jsonifyError } from "@connext/nxtp-utils";
import interval from "interval-promise";

import { retryXCalls } from "../../operations";
import { retryXCalls, sendStatusToSequencer } from "../../operations";
import { getMissingXCalls, getXCalls } from "../../operations/getXCalls";
import { getContext } from "../../publisher";

export const bindSubgraph = async (_pollInterval?: number) => {
const { config, logger } = getContext();
const { requestContext, methodContext } = createLoggingContext(bindSubgraph.name);
const pollInterval = _pollInterval ?? config.polling.subgraph;

interval(async (_, stop) => {
if (config.mode.cleanup) {
stop();
Expand Down Expand Up @@ -65,4 +66,22 @@ export const bindSubgraph = async (_pollInterval?: number) => {
}
}
}, pollInterval);

interval(async (_, stop) => {
if (config.mode.cleanup) {
stop();
} else {
try {
// 4. Sends status to sequencer at a regular inverval
await sendStatusToSequencer();
} catch (e: unknown) {
logger.error(
"Error sending status to sequencer, waiting for next loop",
requestContext,
methodContext,
jsonifyError(e as Error),
);
}
}
}, pollInterval);
};
3 changes: 3 additions & 0 deletions packages/agents/router/src/tasks/publisher/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import { StoreManager } from "@connext/nxtp-adapters-cache";
import Rabbit from "foo-foo-mq";

import { NxtpRouterConfig } from "../../config";
import { Web3Signer } from "@connext/nxtp-adapters-web3signer";
import { Wallet } from "ethers";

export type AppContext = {
logger: Logger;
adapters: {
// Stateful interfaces for peripherals.
wallet: Wallet | Web3Signer; // Used for signing metatxs for bids.
cache: StoreManager; // Used to cache important data locally.
subgraph: SubgraphReader; // Aggregates subgraphs in a FallbackSubgraph for each chain.
mqClient: typeof Rabbit;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export { getXCalls } from "./getXCalls";
export { retryXCalls } from "./retryXCalls";
export { sendStatusToSequencer } from "./status";
Loading
Loading