diff --git a/package.json b/package.json index 4dc4957..8e32c96 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,7 @@ "version": "0.1.1", "main": "index.ts", "scripts": { + "prepare": "pnpm run lint && pnpm run format:fix", "lint": "eslint src --cache", "format:fix": "prettier --write \"**/*.{ts,tsx}\"", "format:check": "prettier --check \"**/*.{ts,tsx}\"", diff --git a/src/classes/message.ts b/src/classes/message.ts index cc59b0a..128902b 100644 --- a/src/classes/message.ts +++ b/src/classes/message.ts @@ -5,3 +5,22 @@ export interface Message { vt: Date message: T } + +interface DbMessage { + msg_id: string + read_ct: string + enqueued_at: string + vt: string + message: string +} + +export function parseDbMessage(msg: DbMessage): Message { + if (msg == null) return msg + return { + msgId: parseInt(msg.msg_id), + readCount: parseInt(msg.read_ct), + enqueuedAt: new Date(msg.enqueued_at), + vt: new Date(msg.vt), + message: msg.message as T, + } +} diff --git a/src/classes/pgmq.ts b/src/classes/pgmq.ts index 33445b5..b833b9f 100644 --- a/src/classes/pgmq.ts +++ b/src/classes/pgmq.ts @@ -1,23 +1,21 @@ import { Pool } from "pg" -import { Message } from "./message" - -const PGMQ_SCHEMA = "pgmq" -const QUEUE_PREFIX = "q" -const ARCHIVE_PREFIX = "a" +import { parseDbMessage } from "./message" +import { + archiveQuery, + createQueueQuery, + deleteQuery, + deleteQueueQuery, + PGMQ_SCHEMA, + readQuery, + sendQuery, +} from "./queries" +import { Queue } from "./queue" // https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS const NAMELEN = 64 const BIGGEST_CONCAT = "archived_at_idx_" const MAX_PGMQ_QUEUE_LEN = NAMELEN - 1 - BIGGEST_CONCAT.length -interface DbMessage { - msg_id: string - read_ct: string - enqueued_at: string - vt: string - message: string -} - /** This is the central class this library exports * @constructor requires a valid PG connection string. Example: postgresql://user:password@localhost:5432/pgmq **/ export class Pgmq { @@ -46,37 +44,22 @@ export class Pgmq { public async createQueue(name: string) { validateQueueName(name) const connection = await this.pool.connect() - const query = ` - CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name} - ( - msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - read_ct INT DEFAULT 0 NOT NULL, - enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, - vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB - ); - CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name} - ( - msg_id BIGINT PRIMARY KEY, - read_ct INT DEFAULT 0 NOT NULL, - enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, - archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, - vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB - );` + const query = createQueueQuery(name) await connection.query(query) connection.release() } + public getQueue(name: string) { + return new Queue(this.pool, name) + } + /** * Deletes a queue and its matching archive if exists. If queue or archive do not exist does not throw error * @param name - the name of the queue * **/ public async deleteQueue(name: string) { const connection = await this.pool.connect() - const query = ` - DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name}; - DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name};` + const query = deleteQueueQuery(name) await connection.query(query) connection.release() } @@ -93,9 +76,7 @@ export class Pgmq { public async sendMessage(queue: string, message: T, vt = 0) { const connection = await this.pool.connect() - const query = `INSERT INTO ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} (vt, message) - VALUES ((now() + interval '${vt} seconds'), $1::jsonb) - RETURNING msg_id;` + const query = sendQuery(queue, vt) const res = await connection.query(query, [JSON.stringify(message)]) connection.release() return parseInt(res.rows[0].msg_id) @@ -111,20 +92,10 @@ export class Pgmq { */ public async readMessage(queue: string, vt: number) { const connection = await this.pool.connect() - const query = `WITH cte AS - (SELECT msg_id - FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} - ORDER BY msg_id - LIMIT 1 FOR UPDATE SKIP LOCKED) - UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t - SET vt = now() + interval '${vt} seconds', - read_ct = read_ct + 1 - FROM cte - WHERE t.msg_id = cte.msg_id - RETURNING *;` + const query = readQuery(queue, vt) const msg = await connection.query(query) connection.release() - return this.parseDbMessage(msg.rows[0]) + return parseDbMessage(msg.rows[0]) } /** @@ -134,10 +105,7 @@ export class Pgmq { * @return the id of the message that was created */ public async deleteMessage(queue: string, id: number) { - const query = `DELETE - FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} - WHERE msg_id = ${id} - RETURNING msg_id;` + const query = deleteQuery(queue, id) const connection = await this.pool.connect() const msg = await connection.query(query) connection.release() @@ -151,31 +119,12 @@ export class Pgmq { * @return the id of the message that was created */ public async archiveMessage(queue: string, id: number): Promise { - const query = `WITH archived AS ( - DELETE FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} - WHERE msg_id = ${id} - RETURNING msg_id, vt, read_ct, enqueued_at, message) - INSERT - INTO ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${queue} (msg_id, vt, read_ct, enqueued_at, message) - SELECT msg_id, vt, read_ct, enqueued_at, message - FROM archived - RETURNING msg_id;` + const query = archiveQuery(queue, id) const connection = await this.pool.connect() const msg = await connection.query(query) connection.release() return parseInt(msg.rows[0].msg_id) } - - parseDbMessage(msg: DbMessage): Message { - if (msg == null) return msg - return { - msgId: parseInt(msg.msg_id), - readCount: parseInt(msg.read_ct), - enqueuedAt: new Date(msg.enqueued_at), - vt: new Date(msg.vt), - message: msg.message as T, - } - } } const validateQueueName = (name: string) => { diff --git a/src/classes/queries.ts b/src/classes/queries.ts new file mode 100644 index 0000000..1dd2fc5 --- /dev/null +++ b/src/classes/queries.ts @@ -0,0 +1,69 @@ +export const PGMQ_SCHEMA = "pgmq" +const QUEUE_PREFIX = "q" +const ARCHIVE_PREFIX = "a" + +export function createQueueQuery(name: string) { + return ` + CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name} + ( + msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + read_ct INT DEFAULT 0 NOT NULL, + enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, + vt TIMESTAMP WITH TIME ZONE NOT NULL, + message JSONB + ); + CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name} + ( + msg_id BIGINT PRIMARY KEY, + read_ct INT DEFAULT 0 NOT NULL, + enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, + archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, + vt TIMESTAMP WITH TIME ZONE NOT NULL, + message JSONB + );` +} + +export function deleteQueueQuery(name: string) { + return ` + DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name}; + DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name};` +} + +export function sendQuery(queue: string, vt: number) { + return `INSERT INTO ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} (vt, message) + VALUES ((now() + interval '${vt} seconds'), $1::jsonb) + RETURNING msg_id;` +} + +export function readQuery(queue: string, vt: number) { + return `WITH cte AS + (SELECT msg_id + FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} + ORDER BY msg_id + LIMIT 1 FOR UPDATE SKIP LOCKED) + UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t + SET vt = now() + interval '${vt} seconds', + read_ct = read_ct + 1 + FROM cte + WHERE t.msg_id = cte.msg_id + RETURNING *;` +} + +export function archiveQuery(queue: string, id: number) { + return `WITH archived AS ( + DELETE FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} + WHERE msg_id = ${id} + RETURNING msg_id, vt, read_ct, enqueued_at, message) + INSERT + INTO ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${queue} (msg_id, vt, read_ct, enqueued_at, message) + SELECT msg_id, vt, read_ct, enqueued_at, message + FROM archived + RETURNING msg_id;` +} + +export function deleteQuery(queue: string, id: number) { + return `DELETE + FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} + WHERE msg_id = ${id} + RETURNING msg_id;` +} diff --git a/src/classes/queue.ts b/src/classes/queue.ts new file mode 100644 index 0000000..1cf88f3 --- /dev/null +++ b/src/classes/queue.ts @@ -0,0 +1,46 @@ +import { Pool } from "pg" +import { archiveQuery, deleteQuery, readQuery } from "./queries" +import { parseDbMessage } from "./message" + +export class Queue { + private pool: Pool + private readonly name: string + + constructor(connectionPool: Pool, name: string) { + this.pool = connectionPool + this.name = name + } + + public async readMessage(vt = 0) { + const query = readQuery(this.name, vt) + const conn = await this.pool.connect() + const result = await conn.query(query) + return parseDbMessage(result.rows[0]) + } + + /** + * Delete a message from the queue + * @param id - the id of the message to delete + * @return the id of the message that was created + */ + public async deleteMessage(id: number) { + const query = deleteQuery(this.name, id) + const connection = await this.pool.connect() + const msg = await connection.query(query) + connection.release() + return parseInt(msg.rows[0].msg_id) + } + + /** + * Archives a message from the queue to its matching archive + * @param id - the id of the message to delete + * @return the id of the message that was created + */ + public async archiveMessage(id: number): Promise { + const query = archiveQuery(this.name, id) + const connection = await this.pool.connect() + const msg = await connection.query(query) + connection.release() + return parseInt(msg.rows[0].msg_id) + } +} diff --git a/test/integration.spec.ts b/test/integration.spec.ts index 6e2ba48..793275f 100644 --- a/test/integration.spec.ts +++ b/test/integration.spec.ts @@ -83,4 +83,34 @@ describe("Integration test", () => { const res = await pgmq.deleteMessage(QUEUE, id) expect(res).to.eq(id) }) + + describe("Test Queue contracts", () => { + const queue = pgmq.getQueue(QUEUE) + it("should read message", async () => { + const msg = await queue.readMessage(60) + expect(msg?.message?.org).to.eq("acme") + }) + + it("should archive message", async () => { + const msg = await queue.readMessage(60) + const id = msg?.msgId + if (!id) { + assert.fail("Expected id to be a number") + } + expect(msg?.readCount).to.be.gt(0) + const res = await queue.archiveMessage(id) + expect(res).to.eq(id) + }) + + it("should delete message", async () => { + const msg = await queue.readMessage(60) + const id = msg?.msgId + if (!id) { + assert.fail("Expected id to be a number") + } + expect(msg?.readCount).to.be.gt(0) + const res = await queue.deleteMessage(id) + expect(res).to.eq(id) + }) + }) })