Skip to content

Commit

Permalink
Merge pull request #5747 from connext/5738-mq-connection-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
preethamr authored Feb 23, 2024
2 parents 8a253b8 + bc957d4 commit 9720098
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 6 deletions.
2 changes: 1 addition & 1 deletion packages/agents/router/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"dotenv": "16.0.3",
"ethers": "5.7.2",
"fastify": "4.13.0",
"foo-foo-mq": "7.1.0",
"foo-foo-mq": "9.0.3",
"graphql": "16.6.0",
"prom-client": "14.1.1"
},
Expand Down
20 changes: 20 additions & 0 deletions packages/agents/router/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const MIN_CACHE_POLL_INTERVAL = 2_000;
const DEFAULT_CACHE_POLL_INTERVAL = 20_000;
const DEFAULT_AUCTION_ROUND_DEPTH = 3;

//Router MQ limits
const DEFAULT_ROUTER_MQ_RETRY_LIMIT = 20;
const DEFAULT_ROUTER_MQ_HEARTBEAT_LIMIT = 10;
const DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT = 10;

// Sequencer and Cartographer default urls
const SEQUENCER_URLS: Record<string, any> = {
testnet: {
Expand Down Expand Up @@ -234,6 +239,21 @@ export const getEnvConfig = (
configFile.messageQueue?.uri ||
"amqp://guest:guest@localhost:5672",
limit: process.env.MESSAGE_QUEUE_LIMIT || configJson.messageQueue?.limit || configFile.messageQueue?.limit || 25,
heartbeat:
process.env.ROUTER_MQ_HEARTBEAT_LIMIT ||
configJson.messageQueue?.heartbeat ||
configFile.messageQueue?.hearbeat ||
DEFAULT_ROUTER_MQ_HEARTBEAT_LIMIT,
failAfter:
process.env.ROUTER_MQ_FAILAFTER_LIMIT ||
configJson.messageQueue?.heartbeat ||
configFile.messageQueue?.hearbeat ||
DEFAULT_ROUTER_MQ_FAILAFTER_LIMIT,
retryLimit:
process.env.ROUTER_MQ_RETRY_LIMIT ||
configJson.messageQueue?.heartbeat ||
configFile.messageQueue?.hearbeat ||
DEFAULT_ROUTER_MQ_RETRY_LIMIT,
},
};

Expand Down
15 changes: 11 additions & 4 deletions packages/agents/router/src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { SubgraphReader } from "@connext/nxtp-adapters-subgraph";
import { ChainData, createMethodContext, Logger, RequestContext } from "@connext/nxtp-utils";
import rabbit from "foo-foo-mq";

import { MQConnectionClosed, MQConnectionFailed, MQConnectionUnreachable } from "./errors";
import { MQConnectionClosed, MQConnectionFailed } from "./errors";

export const XCALL_QUEUE = "xcalls";
export const MQ_EXCHANGE = "router";
Expand Down Expand Up @@ -33,6 +33,9 @@ export const setupCache = async (
export const setupMq = async (
uri: string,
limit: number,
heartbeat: number,
failAfter: number,
retryLimit: number,
logger: Logger,
requestContext: RequestContext,
): Promise<typeof rabbit> => {
Expand All @@ -41,7 +44,7 @@ export const setupMq = async (
const replyQueue = false;
logger.info("Message queue setup in progress...", requestContext, methodContext, { uri });
await rabbit.configure({
connection: { uri, replyQueue },
connection: { uri, replyQueue, heartbeat, failAfter, retryLimit },
queues: [{ name: XCALL_QUEUE, limit }],
exchanges: [{ name: MQ_EXCHANGE, type: "direct" }],
bindings: [{ exchange: MQ_EXCHANGE, target: XCALL_QUEUE, keys: [XCALL_QUEUE] }],
Expand All @@ -55,8 +58,12 @@ export const setupMq = async (
throw new MQConnectionFailed();
});

await rabbit.on("unreachable", function () {
throw new MQConnectionUnreachable();
await rabbit.on("unreachable", async function () {
// throw new MQConnectionUnreachable();
logger.warn("MQ is unreachable, retrying connection", requestContext, methodContext, {
uri,
});
await rabbit.retry();
});

logger.info("Message queue setup is done!", requestContext, methodContext, {
Expand Down
3 changes: 3 additions & 0 deletions packages/agents/router/src/tasks/publisher/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ export const makePublisher = async (_configOverride?: NxtpRouterConfig) => {
context.adapters.mqClient = await setupMq(
context.config.messageQueue.uri as string,
context.config.messageQueue.limit as number,
context.config.messageQueue.heartbeat as number,
context.config.messageQueue.failAfter as number,
context.config.messageQueue.retryLimit as number,
context.logger,
requestContext,
);
Expand Down
3 changes: 3 additions & 0 deletions packages/agents/router/src/tasks/subscriber/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ export const makeSubscriber = async (_configOverride?: NxtpRouterConfig) => {
context.adapters.mqClient = await setupMq(
context.config.messageQueue.uri as string,
context.config.messageQueue.limit as number,
context.config.messageQueue.heartbeat as number,
context.config.messageQueue.failAfter as number,
context.config.messageQueue.retryLimit as number,
context.logger,
requestContext,
);
Expand Down
3 changes: 3 additions & 0 deletions packages/utils/src/types/primitives.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,7 @@ export const TRequiredPeripheralConfig = Type.Object({
user: Type.Optional(Type.String()),
pass: Type.Optional(Type.String()),
limit: Type.Optional(Type.Integer({ minimum: 1, maximum: 100 })),
heartbeat: Type.Optional(Type.Integer({ minimum: 1, maximum: 100 })),
failAfter: Type.Optional(Type.Integer({ minimum: 1, maximum: 100 })),
retryLimit: Type.Optional(Type.Integer({ minimum: 1, maximum: 100 })),
});
17 changes: 16 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2663,7 +2663,7 @@ __metadata:
eslint: 8.34.0
ethers: 5.7.2
fastify: 4.13.0
foo-foo-mq: 7.1.0
foo-foo-mq: 9.0.3
graphql: 16.6.0
interval-promise: 1.4.0
mocha: 10.2.0
Expand Down Expand Up @@ -17592,6 +17592,21 @@ __metadata:
languageName: node
linkType: hard

"foo-foo-mq@npm:9.0.3":
version: 9.0.3
resolution: "foo-foo-mq@npm:9.0.3"
dependencies:
amqplib: ~0.8.0
bole: ^4.0.0
debug: ^4.1.1
machina: ^4.0.2
node-monologue: ^2.0.0
postal: ^2.0.5
uuid: ^8.2.0
checksum: fa1eee1ac36960977272532267a1ed2f74a4c139a86c710d6516900283f096c6c2b1dfa6e9dcaa81db10d27777f206e8606f87ecec5f2e0198325e6a8dad66cb
languageName: node
linkType: hard

"for-each@npm:^0.3.3, for-each@npm:~0.3.3":
version: 0.3.3
resolution: "for-each@npm:0.3.3"
Expand Down

0 comments on commit 9720098

Please sign in to comment.