Skip to content

Commit

Permalink
Add doc strings (#1)
Browse files Browse the repository at this point in the history
* Add doc strings

* Add doc strings

* Fix tests trigger

* Fix tests

* Fix tests

* Fix tests

* lint

* lint

* Fix test

* Create pgmq database

* Create pgmq database
  • Loading branch information
nimrodkor authored Oct 10, 2024
1 parent f67754f commit 87107c3
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 26 deletions.
13 changes: 7 additions & 6 deletions .github/workflows/pr-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ name: PR Tests

on:
pull_request:
branches:
- main

jobs:
test:
Expand All @@ -14,6 +12,7 @@ jobs:
image: postgres
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: pgmq
options: >-
--health-cmd pg_isready
--health-interval 2s
Expand All @@ -30,13 +29,15 @@ jobs:
node-version: 22
- name: install pnpm
uses: pnpm/action-setup@v4
with:
version: 9
- name: install dependencies
runs: pnpm install
run: pnpm install
- name: run lint
runs: pnpm run lint
run: pnpm run lint
- name: run prettier
runs: pnpm run format:check
run: pnpm run format:check
- name: run tests
env:
DATABASE_URL: postgresql://postgres:postgres@localhost:5432/pgmq
runs: pnpm run tests
run: pnpm run test
70 changes: 51 additions & 19 deletions src/classes/pgmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ interface DbMessage {
message: string
}

/** This is the central class this library exports
* @constructor requires a valid PG connection string. Example: postgresql://user:password@localhost:5432/pgmq **/
export class Pgmq {
private pool: Pool

Expand All @@ -26,6 +28,10 @@ export class Pgmq {
await connection.query(query)
}

/**
* Creates a queue and a matching archive if does not exist. If queue or archive already exist does not throw error
* @param name - the name of the queue
* **/
public async createQueue(name: string) {
const connection = await this.pool.connect()
const query = `
Expand All @@ -50,6 +56,10 @@ export class Pgmq {
connection.release()
}

/**
* Deletes a queue and its matching archive if exists. If queue or archive do not exist does not throw error
* @param name - the name of the queue
* **/
public async deleteQueue(name: string) {
const connection = await this.pool.connect()
const query = `
Expand All @@ -59,11 +69,16 @@ export class Pgmq {
connection.release()
}

public async sendMessage<T>(
queue: string,
message: T,
vt = 0
): Promise<number> {
/**
* Write a message to the queue.
* If queue doesn't exist, will throw error. See [createQueue]{@link createQueue}
* @param queue - the name of the queue to send the message to.
* @param message - the object to put as the payload of the message.
* @param vt - the visibility timeout of the message. The visibility timeout
* defines the time a message will stay hidden after being saved.
* @return the id of the message that was created
* **/
public async sendMessage<T>(queue: string, message: T, vt = 0) {
const connection = await this.pool.connect()

const query = `INSERT INTO ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} (vt, message)
Expand All @@ -74,27 +89,38 @@ export class Pgmq {
return parseInt(res.rows[0].msg_id)
}

public async readMessage<T>(
queue: string,
vt: number
): Promise<Message<T> | null> {
/**
* Read a message from the queue
* @param queue - the name of the queue
* @param vt - the visibility timeout of the message. The visibility timeout
* defines the time a message will stay hidden after being retrieved, allowing other
* consumers to process it later if it was not removed from the queue
* @return the whole [message]{@link Message}, including the id, read count and the actual message within
*/
public async readMessage<T>(queue: string, vt: number) {
const connection = await this.pool.connect()
const query = `WITH cte AS
(SELECT msg_id
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
ORDER BY msg_id ASC
LIMIT 1 FOR UPDATE SKIP LOCKED)
UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t
SET vt = now() + interval '${vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id = cte.msg_id
RETURNING *;`
(SELECT msg_id
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
ORDER BY msg_id
LIMIT 1 FOR UPDATE SKIP LOCKED)
UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t
SET vt = now() + interval '${vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id = cte.msg_id
RETURNING *;`
const msg = await connection.query(query)
connection.release()
return this.parseDbMessage<T>(msg.rows[0])
}

/**
* Delete a message from the queue
* @param queue - the name of the queue
* @param id - the id of the message to delete
* @return the id of the message that was created
*/
public async deleteMessage(queue: string, id: number) {
const query = `DELETE
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
Expand All @@ -106,6 +132,12 @@ export class Pgmq {
return parseInt(msg.rows[0].msg_id)
}

/**
* Archives a message from the queue to its matching archive
* @param queue - the name of the queue / archive
* @param id - the id of the message to delete
* @return the id of the message that was created
*/
public async archiveMessage(queue: string, id: number): Promise<number> {
const query = `WITH archived AS (
DELETE FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
Expand Down
2 changes: 1 addition & 1 deletion test/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ describe("Integration test", () => {
await pgmq.createSchema()
await pgmq.createQueue(QUEUE)
})

const pgmq = new PGMQ(connString)
describe("Send message", () => {
it("should send the message", async () => {
Expand Down

0 comments on commit 87107c3

Please sign in to comment.