Skip to content

Commit

Permalink
Add doc strings
Browse files Browse the repository at this point in the history
  • Loading branch information
nimrodkor committed Oct 10, 2024
1 parent e4800b4 commit 07ff783
Showing 1 changed file with 49 additions and 19 deletions.
68 changes: 49 additions & 19 deletions src/classes/pgmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ interface DbMessage {
message: string
}

/** This is the central class this library exports
* @constructor requires a valid PG connection string. Example: postgresql://user:password@localhost:5432/pgmq **/
export class Pgmq {
private pool: Pool

Expand All @@ -26,6 +28,10 @@ export class Pgmq {
await connection.query(query)
}

/**
* Creates a queue and a matching archive if does not exist. If queue or archive already exist does not throw error
* @param name - the name of the queue
* **/
public async createQueue(name: string) {
const connection = await this.pool.connect()
const query = `
Expand All @@ -50,6 +56,10 @@ export class Pgmq {
connection.release()
}

/**
* Deletes a queue and its matching archive if exists. If queue or archive do not exist does not throw error
* @param name - the name of the queue
* **/
public async deleteQueue(name: string) {
const connection = await this.pool.connect()
const query = `
Expand All @@ -59,11 +69,16 @@ export class Pgmq {
connection.release()
}

public async sendMessage<T>(
queue: string,
message: T,
vt = 0
): Promise<number> {
/**
* Write a message to the queue.
* If queue doesn't exist, will throw error. See [createQueue]{@link createQueue}
* @param queue - the name of the queue to send the message to.
* @param message - the object to put as the payload of the message.
* @param vt - the visibility timeout of the message. The visibility timeout
* defines the time a message will stay hidden after being saved.
* @return the id of the message that was created
* **/
public async sendMessage<T>(queue: string, message: T, vt = 0) {
const connection = await this.pool.connect()

const query = `INSERT INTO ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} (vt, message)
Expand All @@ -74,27 +89,37 @@ export class Pgmq {
return parseInt(res.rows[0].msg_id)
}

public async readMessage<T>(
queue: string,
vt: number
): Promise<Message<T> | null> {
/**
* Read a message from the queue
* @param queue - the name of the queue
* @param vt - the visibility timeout of the message. The visibility timeout
* defines the time a message will stay hidden after being retrieved, allowing other
* consumers to process it later if it was not removed from the queue
* @return the whole [message]{@link Message}, including the id, read count and the actual message within
*/
public async readMessage<T>(queue: string, vt: number) {
const connection = await this.pool.connect()
const query = `WITH cte AS
(SELECT msg_id
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
ORDER BY msg_id ASC
LIMIT 1 FOR UPDATE SKIP LOCKED)
UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t
SET vt = now() + interval '${vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id = cte.msg_id
RETURNING *;`
(SELECT msg_id
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
ORDER BY msg_id
LIMIT 1 FOR UPDATE SKIP LOCKED)
UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t
SET vt = now() + interval '${vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id = cte.msg_id
RETURNING *;`
const msg = await connection.query(query)
connection.release()
return this.parseDbMessage<T>(msg.rows[0])
}

/**
* Delete a message from the queue
* @param queue - the name of the queue
* @param id - the id of the message to delete
*/
public async deleteMessage(queue: string, id: number) {
const query = `DELETE
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
Expand All @@ -106,6 +131,11 @@ export class Pgmq {
return parseInt(msg.rows[0].msg_id)
}

/**
* Archives a message from the queue to its matching archive
* @param queue - the name of the queue / archive
* @param id - the id of the message to delete
*/
public async archiveMessage(queue: string, id: number): Promise<number> {
const query = `WITH archived AS (
DELETE FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
Expand Down

0 comments on commit 07ff783

Please sign in to comment.