From 07ff78360c3db30db11ec2f457517c285cf66bb3 Mon Sep 17 00:00:00 2001 From: Nimrod Date: Thu, 10 Oct 2024 22:36:49 +0300 Subject: [PATCH] Add doc strings --- src/classes/pgmq.ts | 68 ++++++++++++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 19 deletions(-) diff --git a/src/classes/pgmq.ts b/src/classes/pgmq.ts index 98182bf..d9c673d 100644 --- a/src/classes/pgmq.ts +++ b/src/classes/pgmq.ts @@ -13,6 +13,8 @@ interface DbMessage { message: string } +/** This is the central class this library exports + * @constructor requires a valid PG connection string. Example: postgresql://user:password@localhost:5432/pgmq **/ export class Pgmq { private pool: Pool @@ -26,6 +28,10 @@ export class Pgmq { await connection.query(query) } + /** + * Creates a queue and a matching archive if does not exist. If queue or archive already exist does not throw error + * @param name - the name of the queue + * **/ public async createQueue(name: string) { const connection = await this.pool.connect() const query = ` @@ -50,6 +56,10 @@ export class Pgmq { connection.release() } + /** + * Deletes a queue and its matching archive if exists. If queue or archive do not exist does not throw error + * @param name - the name of the queue + * **/ public async deleteQueue(name: string) { const connection = await this.pool.connect() const query = ` @@ -59,11 +69,16 @@ export class Pgmq { connection.release() } - public async sendMessage( - queue: string, - message: T, - vt = 0 - ): Promise { + /** + * Write a message to the queue. + * If queue doesn't exist, will throw error. See [createQueue]{@link createQueue} + * @param queue - the name of the queue to send the message to. + * @param message - the object to put as the payload of the message. + * @param vt - the visibility timeout of the message. The visibility timeout + * defines the time a message will stay hidden after being saved. + * @return the id of the message that was created + * **/ + public async sendMessage(queue: string, message: T, vt = 0) { const connection = await this.pool.connect() const query = `INSERT INTO ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} (vt, message) @@ -74,27 +89,37 @@ export class Pgmq { return parseInt(res.rows[0].msg_id) } - public async readMessage( - queue: string, - vt: number - ): Promise | null> { + /** + * Read a message from the queue + * @param queue - the name of the queue + * @param vt - the visibility timeout of the message. The visibility timeout + * defines the time a message will stay hidden after being retrieved, allowing other + * consumers to process it later if it was not removed from the queue + * @return the whole [message]{@link Message}, including the id, read count and the actual message within + */ + public async readMessage(queue: string, vt: number) { const connection = await this.pool.connect() const query = `WITH cte AS - (SELECT msg_id - FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} - ORDER BY msg_id ASC - LIMIT 1 FOR UPDATE SKIP LOCKED) - UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t - SET vt = now() + interval '${vt} seconds', - read_ct = read_ct + 1 - FROM cte - WHERE t.msg_id = cte.msg_id - RETURNING *;` + (SELECT msg_id + FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} + ORDER BY msg_id + LIMIT 1 FOR UPDATE SKIP LOCKED) + UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t + SET vt = now() + interval '${vt} seconds', + read_ct = read_ct + 1 + FROM cte + WHERE t.msg_id = cte.msg_id + RETURNING *;` const msg = await connection.query(query) connection.release() return this.parseDbMessage(msg.rows[0]) } + /** + * Delete a message from the queue + * @param queue - the name of the queue + * @param id - the id of the message to delete + */ public async deleteMessage(queue: string, id: number) { const query = `DELETE FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} @@ -106,6 +131,11 @@ export class Pgmq { return parseInt(msg.rows[0].msg_id) } + /** + * Archives a message from the queue to its matching archive + * @param queue - the name of the queue / archive + * @param id - the id of the message to delete + */ public async archiveMessage(queue: string, id: number): Promise { const query = `WITH archived AS ( DELETE FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}