Skip to content

Commit

Permalink
fix(nats): Improve message handling.
Browse files Browse the repository at this point in the history
- Removed the setTimeout line that was causing a fixed delay of 5 seconds.
- Ensured consistent error handling by adding message.ack() in the try block after processing a message and moving it up from the processMessage function.
- Introduced a processing variable to control the message processing loop. When the server shuts down, the processing variable is set to false, causing the loop to exit.
- Updated the onClose hook to properly terminate the done function by setting processing to false, draining the subscription, and unsubscribing from the subscription.
- Remove unnecessary console log.
- Move handlers into dictionary.

Closes #28
  • Loading branch information
Mango Habanero committed Apr 28, 2023
1 parent b9dad51 commit 554ffd9
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 33 deletions.
33 changes: 16 additions & 17 deletions src/lib/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,22 @@ const handleRegistration = createHandler(
);

export async function processMessage(db: PostgresDb, graphql: GraphQLClient, message: JsMsg, provider: Provider, redis: RedisClient) {
if (message.subject === `${config.NATS.STREAM_NAME}.register`) {
try {
await handleRegistration(db, graphql, message, provider, redis);
message.ack()
} catch (error: any) {
throw new SystemError(`Error handling registration: ${error.message}`);
}
} else if (message.subject === `${config.NATS.STREAM_NAME}.transfer`) {
try {
console.log('Handling transfer')
await handleTransfer(db, graphql, message, provider, redis);
message.ack()
} catch (error: any) {
throw new SystemError(`Error handling transfer: ${error.message}`);
const subjectHandlers = {
[`${config.NATS.STREAM_NAME}.register`]: handleRegistration,
[`${config.NATS.STREAM_NAME}.transfer`]: handleTransfer,
};

const handler = subjectHandlers[message.subject];

try {
if (handler) {
await handler(db, graphql, message, provider, redis);
} else {
logger.debug(`Unsupported subject: ${message.subject}`);
message.ack();
}
} else {
logger.debug(`Unsupported subject: ${message.subject}`);
message.ack()
} catch (error: any) {
throw new SystemError(`Error handling ${message.subject}: ${error.message}`);
}
}

34 changes: 18 additions & 16 deletions src/plugins/nats.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { FastifyPluginAsync } from 'fastify';
import fp from 'fastify-plugin';
import { AckPolicy, connect, consumerOpts, DeliverPolicy, JsMsg } from 'nats';
import { AckPolicy, connect, consumerOpts, DeliverPolicy } from 'nats';
import { processMessage } from '@lib/nats';
import { config } from '@/config';

Expand Down Expand Up @@ -31,31 +31,33 @@ const natsPlugin: FastifyPluginAsync<NatsPluginOptions> = async (fastify, option
opts.bind(options.streamName, options.durableName)

const subscription = await jetStreamClient.subscribe(`${options.streamName}.${options.subject}`, opts)
let processing = true;

const done = async () => {
for await (const message of subscription) {
if(message){
try {
await processMessage(fastify.pg, fastify.graphql, message, fastify.provider, fastify.p_redis);
// Add delay before processing the next message
await new Promise((resolve) => setTimeout(resolve, 5000));
} catch (error: any) {
fastify.log.error(`Error processing NATS message: ${error.message}`);
// requeue message after 50 seconds
message.nak(50000);
}
for await (const message of subscription) {
if (!processing) break;

try {
await processMessage(fastify.pg, fastify.graphql, message, fastify.provider, fastify.p_redis);
message.ack();
} catch (error: any) {
fastify.log.error(`Error processing NATS message: ${error.message}`);
message.nak(50000);
}
}
}
};

done().catch((err) => {
fastify.log.error(`Error processing NATS message: ${err.message}`);
});

fastify.addHook("onClose", async (_) => {
fastify.addHook('onClose', async (_) => {
processing = false;
await subscription.drain();
subscription.unsubscribe();
await natsConnection.drain();
await natsConnection.close();
})

});
};

export default fp(natsPlugin, {
Expand Down

0 comments on commit 554ffd9

Please sign in to comment.