Skip to content

Commit

Permalink
feat: Add Queue struct and expose functionality (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
nimrodkor authored Oct 11, 2024
1 parent 311f0b1 commit 6000ff0
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 73 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"version": "0.1.1",
"main": "index.ts",
"scripts": {
"prepare": "pnpm run lint && pnpm run format:fix",
"lint": "eslint src --cache",
"format:fix": "prettier --write \"**/*.{ts,tsx}\"",
"format:check": "prettier --check \"**/*.{ts,tsx}\"",
Expand Down
19 changes: 19 additions & 0 deletions src/classes/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,22 @@ export interface Message<T> {
vt: Date
message: T
}

interface DbMessage {
msg_id: string
read_ct: string
enqueued_at: string
vt: string
message: string
}

export function parseDbMessage<T>(msg: DbMessage): Message<T> {
if (msg == null) return msg
return {
msgId: parseInt(msg.msg_id),
readCount: parseInt(msg.read_ct),
enqueuedAt: new Date(msg.enqueued_at),
vt: new Date(msg.vt),
message: msg.message as T,
}
}
95 changes: 22 additions & 73 deletions src/classes/pgmq.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import { Pool } from "pg"
import { Message } from "./message"

const PGMQ_SCHEMA = "pgmq"
const QUEUE_PREFIX = "q"
const ARCHIVE_PREFIX = "a"
import { parseDbMessage } from "./message"
import {
archiveQuery,
createQueueQuery,
deleteQuery,
deleteQueueQuery,
PGMQ_SCHEMA,
readQuery,
sendQuery,
} from "./queries"
import { Queue } from "./queue"

// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
const NAMELEN = 64
const BIGGEST_CONCAT = "archived_at_idx_"
const MAX_PGMQ_QUEUE_LEN = NAMELEN - 1 - BIGGEST_CONCAT.length

interface DbMessage {
msg_id: string
read_ct: string
enqueued_at: string
vt: string
message: string
}

/** This is the central class this library exports
* @constructor requires a valid PG connection string. Example: postgresql://user:password@localhost:5432/pgmq **/
export class Pgmq {
Expand Down Expand Up @@ -46,37 +44,22 @@ export class Pgmq {
public async createQueue(name: string) {
validateQueueName(name)
const connection = await this.pool.connect()
const query = `
CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name}
(
msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);
CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name}
(
msg_id BIGINT PRIMARY KEY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);`
const query = createQueueQuery(name)
await connection.query(query)
connection.release()
}

public getQueue(name: string) {
return new Queue(this.pool, name)
}

/**
* Deletes a queue and its matching archive if exists. If queue or archive do not exist does not throw error
* @param name - the name of the queue
* **/
public async deleteQueue(name: string) {
const connection = await this.pool.connect()
const query = `
DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name};
DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name};`
const query = deleteQueueQuery(name)
await connection.query(query)
connection.release()
}
Expand All @@ -93,9 +76,7 @@ export class Pgmq {
public async sendMessage<T>(queue: string, message: T, vt = 0) {
const connection = await this.pool.connect()

const query = `INSERT INTO ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} (vt, message)
VALUES ((now() + interval '${vt} seconds'), $1::jsonb)
RETURNING msg_id;`
const query = sendQuery(queue, vt)
const res = await connection.query(query, [JSON.stringify(message)])
connection.release()
return parseInt(res.rows[0].msg_id)
Expand All @@ -111,20 +92,10 @@ export class Pgmq {
*/
public async readMessage<T>(queue: string, vt: number) {
const connection = await this.pool.connect()
const query = `WITH cte AS
(SELECT msg_id
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
ORDER BY msg_id
LIMIT 1 FOR UPDATE SKIP LOCKED)
UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t
SET vt = now() + interval '${vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id = cte.msg_id
RETURNING *;`
const query = readQuery(queue, vt)
const msg = await connection.query(query)
connection.release()
return this.parseDbMessage<T>(msg.rows[0])
return parseDbMessage<T>(msg.rows[0])
}

/**
Expand All @@ -134,10 +105,7 @@ export class Pgmq {
* @return the id of the message that was created
*/
public async deleteMessage(queue: string, id: number) {
const query = `DELETE
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
WHERE msg_id = ${id}
RETURNING msg_id;`
const query = deleteQuery(queue, id)
const connection = await this.pool.connect()
const msg = await connection.query(query)
connection.release()
Expand All @@ -151,31 +119,12 @@ export class Pgmq {
* @return the id of the message that was created
*/
public async archiveMessage(queue: string, id: number): Promise<number> {
const query = `WITH archived AS (
DELETE FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
WHERE msg_id = ${id}
RETURNING msg_id, vt, read_ct, enqueued_at, message)
INSERT
INTO ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${queue} (msg_id, vt, read_ct, enqueued_at, message)
SELECT msg_id, vt, read_ct, enqueued_at, message
FROM archived
RETURNING msg_id;`
const query = archiveQuery(queue, id)
const connection = await this.pool.connect()
const msg = await connection.query(query)
connection.release()
return parseInt(msg.rows[0].msg_id)
}

parseDbMessage<T>(msg: DbMessage): Message<T> {
if (msg == null) return msg
return {
msgId: parseInt(msg.msg_id),
readCount: parseInt(msg.read_ct),
enqueuedAt: new Date(msg.enqueued_at),
vt: new Date(msg.vt),
message: msg.message as T,
}
}
}

const validateQueueName = (name: string) => {
Expand Down
69 changes: 69 additions & 0 deletions src/classes/queries.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
export const PGMQ_SCHEMA = "pgmq"
const QUEUE_PREFIX = "q"
const ARCHIVE_PREFIX = "a"

export function createQueueQuery(name: string) {
return `
CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name}
(
msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);
CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name}
(
msg_id BIGINT PRIMARY KEY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);`
}

export function deleteQueueQuery(name: string) {
return `
DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name};
DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name};`
}

export function sendQuery(queue: string, vt: number) {
return `INSERT INTO ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} (vt, message)
VALUES ((now() + interval '${vt} seconds'), $1::jsonb)
RETURNING msg_id;`
}

export function readQuery(queue: string, vt: number) {
return `WITH cte AS
(SELECT msg_id
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
ORDER BY msg_id
LIMIT 1 FOR UPDATE SKIP LOCKED)
UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t
SET vt = now() + interval '${vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id = cte.msg_id
RETURNING *;`
}

export function archiveQuery(queue: string, id: number) {
return `WITH archived AS (
DELETE FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
WHERE msg_id = ${id}
RETURNING msg_id, vt, read_ct, enqueued_at, message)
INSERT
INTO ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${queue} (msg_id, vt, read_ct, enqueued_at, message)
SELECT msg_id, vt, read_ct, enqueued_at, message
FROM archived
RETURNING msg_id;`
}

export function deleteQuery(queue: string, id: number) {
return `DELETE
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
WHERE msg_id = ${id}
RETURNING msg_id;`
}
46 changes: 46 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Pool } from "pg"
import { archiveQuery, deleteQuery, readQuery } from "./queries"
import { parseDbMessage } from "./message"

export class Queue {
private pool: Pool
private readonly name: string

constructor(connectionPool: Pool, name: string) {
this.pool = connectionPool
this.name = name
}

public async readMessage<T>(vt = 0) {
const query = readQuery(this.name, vt)
const conn = await this.pool.connect()
const result = await conn.query(query)
return parseDbMessage<T>(result.rows[0])
}

/**
* Delete a message from the queue
* @param id - the id of the message to delete
* @return the id of the message that was created
*/
public async deleteMessage(id: number) {
const query = deleteQuery(this.name, id)
const connection = await this.pool.connect()
const msg = await connection.query(query)
connection.release()
return parseInt(msg.rows[0].msg_id)
}

/**
* Archives a message from the queue to its matching archive
* @param id - the id of the message to delete
* @return the id of the message that was created
*/
public async archiveMessage(id: number): Promise<number> {
const query = archiveQuery(this.name, id)
const connection = await this.pool.connect()
const msg = await connection.query(query)
connection.release()
return parseInt(msg.rows[0].msg_id)
}
}
30 changes: 30 additions & 0 deletions test/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,34 @@ describe("Integration test", () => {
const res = await pgmq.deleteMessage(QUEUE, id)
expect(res).to.eq(id)
})

describe("Test Queue contracts", () => {
const queue = pgmq.getQueue(QUEUE)
it("should read message", async () => {
const msg = await queue.readMessage<TestMessage>(60)
expect(msg?.message?.org).to.eq("acme")
})

it("should archive message", async () => {
const msg = await queue.readMessage(60)
const id = msg?.msgId
if (!id) {
assert.fail("Expected id to be a number")
}
expect(msg?.readCount).to.be.gt(0)
const res = await queue.archiveMessage(id)
expect(res).to.eq(id)
})

it("should delete message", async () => {
const msg = await queue.readMessage(60)
const id = msg?.msgId
if (!id) {
assert.fail("Expected id to be a number")
}
expect(msg?.readCount).to.be.gt(0)
const res = await queue.deleteMessage(id)
expect(res).to.eq(id)
})
})
})

0 comments on commit 6000ff0

Please sign in to comment.