Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Queue struct and expose functionality #5

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"version": "0.1.1",
"main": "index.ts",
"scripts": {
"prepare": "pnpm run lint && pnpm run format:fix",
"lint": "eslint src --cache",
"format:fix": "prettier --write \"**/*.{ts,tsx}\"",
"format:check": "prettier --check \"**/*.{ts,tsx}\"",
Expand Down
19 changes: 19 additions & 0 deletions src/classes/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,22 @@ export interface Message<T> {
vt: Date
message: T
}

interface DbMessage {
msg_id: string
read_ct: string
enqueued_at: string
vt: string
message: string
}

export function parseDbMessage<T>(msg: DbMessage): Message<T> {
if (msg == null) return msg
return {
msgId: parseInt(msg.msg_id),
readCount: parseInt(msg.read_ct),
enqueuedAt: new Date(msg.enqueued_at),
vt: new Date(msg.vt),
message: msg.message as T,
}
}
95 changes: 22 additions & 73 deletions src/classes/pgmq.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import { Pool } from "pg"
import { Message } from "./message"

const PGMQ_SCHEMA = "pgmq"
const QUEUE_PREFIX = "q"
const ARCHIVE_PREFIX = "a"
import { parseDbMessage } from "./message"
import {
archiveQuery,
createQueueQuery,
deleteQuery,
deleteQueueQuery,
PGMQ_SCHEMA,
readQuery,
sendQuery,
} from "./queries"
import { Queue } from "./queue"

// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
const NAMELEN = 64
const BIGGEST_CONCAT = "archived_at_idx_"
const MAX_PGMQ_QUEUE_LEN = NAMELEN - 1 - BIGGEST_CONCAT.length

interface DbMessage {
msg_id: string
read_ct: string
enqueued_at: string
vt: string
message: string
}

/** This is the central class this library exports
* @constructor requires a valid PG connection string. Example: postgresql://user:password@localhost:5432/pgmq **/
export class Pgmq {
Expand Down Expand Up @@ -46,37 +44,22 @@ export class Pgmq {
public async createQueue(name: string) {
validateQueueName(name)
const connection = await this.pool.connect()
const query = `
CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name}
(
msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);
CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name}
(
msg_id BIGINT PRIMARY KEY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);`
const query = createQueueQuery(name)
await connection.query(query)
connection.release()
}

public getQueue(name: string) {
return new Queue(this.pool, name)
}

/**
* Deletes a queue and its matching archive if exists. If queue or archive do not exist does not throw error
* @param name - the name of the queue
* **/
public async deleteQueue(name: string) {
const connection = await this.pool.connect()
const query = `
DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name};
DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name};`
const query = deleteQueueQuery(name)
await connection.query(query)
connection.release()
}
Expand All @@ -93,9 +76,7 @@ export class Pgmq {
public async sendMessage<T>(queue: string, message: T, vt = 0) {
const connection = await this.pool.connect()

const query = `INSERT INTO ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} (vt, message)
VALUES ((now() + interval '${vt} seconds'), $1::jsonb)
RETURNING msg_id;`
const query = sendQuery(queue, vt)
const res = await connection.query(query, [JSON.stringify(message)])
connection.release()
return parseInt(res.rows[0].msg_id)
Expand All @@ -111,20 +92,10 @@ export class Pgmq {
*/
public async readMessage<T>(queue: string, vt: number) {
const connection = await this.pool.connect()
const query = `WITH cte AS
(SELECT msg_id
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
ORDER BY msg_id
LIMIT 1 FOR UPDATE SKIP LOCKED)
UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t
SET vt = now() + interval '${vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id = cte.msg_id
RETURNING *;`
const query = readQuery(queue, vt)
const msg = await connection.query(query)
connection.release()
return this.parseDbMessage<T>(msg.rows[0])
return parseDbMessage<T>(msg.rows[0])
}

/**
Expand All @@ -134,10 +105,7 @@ export class Pgmq {
* @return the id of the message that was created
*/
public async deleteMessage(queue: string, id: number) {
const query = `DELETE
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
WHERE msg_id = ${id}
RETURNING msg_id;`
const query = deleteQuery(queue, id)
const connection = await this.pool.connect()
const msg = await connection.query(query)
connection.release()
Expand All @@ -151,31 +119,12 @@ export class Pgmq {
* @return the id of the message that was created
*/
public async archiveMessage(queue: string, id: number): Promise<number> {
const query = `WITH archived AS (
DELETE FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
WHERE msg_id = ${id}
RETURNING msg_id, vt, read_ct, enqueued_at, message)
INSERT
INTO ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${queue} (msg_id, vt, read_ct, enqueued_at, message)
SELECT msg_id, vt, read_ct, enqueued_at, message
FROM archived
RETURNING msg_id;`
const query = archiveQuery(queue, id)
const connection = await this.pool.connect()
const msg = await connection.query(query)
connection.release()
return parseInt(msg.rows[0].msg_id)
}

parseDbMessage<T>(msg: DbMessage): Message<T> {
if (msg == null) return msg
return {
msgId: parseInt(msg.msg_id),
readCount: parseInt(msg.read_ct),
enqueuedAt: new Date(msg.enqueued_at),
vt: new Date(msg.vt),
message: msg.message as T,
}
}
}

const validateQueueName = (name: string) => {
Expand Down
69 changes: 69 additions & 0 deletions src/classes/queries.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
export const PGMQ_SCHEMA = "pgmq"
const QUEUE_PREFIX = "q"
const ARCHIVE_PREFIX = "a"

export function createQueueQuery(name: string) {
return `
CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name}
(
msg_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);
CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name}
(
msg_id BIGINT PRIMARY KEY,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);`
}

export function deleteQueueQuery(name: string) {
return `
DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name};
DROP TABLE IF EXISTS ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${name};`
}

export function sendQuery(queue: string, vt: number) {
return `INSERT INTO ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} (vt, message)
VALUES ((now() + interval '${vt} seconds'), $1::jsonb)
RETURNING msg_id;`
}

export function readQuery(queue: string, vt: number) {
return `WITH cte AS
(SELECT msg_id
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
ORDER BY msg_id
LIMIT 1 FOR UPDATE SKIP LOCKED)
UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t
SET vt = now() + interval '${vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id = cte.msg_id
RETURNING *;`
}

export function archiveQuery(queue: string, id: number) {
return `WITH archived AS (
DELETE FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
WHERE msg_id = ${id}
RETURNING msg_id, vt, read_ct, enqueued_at, message)
INSERT
INTO ${PGMQ_SCHEMA}.${ARCHIVE_PREFIX}_${queue} (msg_id, vt, read_ct, enqueued_at, message)
SELECT msg_id, vt, read_ct, enqueued_at, message
FROM archived
RETURNING msg_id;`
}

export function deleteQuery(queue: string, id: number) {
return `DELETE
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
WHERE msg_id = ${id}
RETURNING msg_id;`
}
46 changes: 46 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Pool } from "pg"
import { archiveQuery, deleteQuery, readQuery } from "./queries"
import { parseDbMessage } from "./message"

export class Queue {
private pool: Pool
private readonly name: string

constructor(connectionPool: Pool, name: string) {
this.pool = connectionPool
this.name = name
}

public async readMessage<T>(vt = 0) {
const query = readQuery(this.name, vt)
const conn = await this.pool.connect()
const result = await conn.query(query)
return parseDbMessage<T>(result.rows[0])
}

/**
* Delete a message from the queue
* @param id - the id of the message to delete
* @return the id of the message that was created
*/
public async deleteMessage(id: number) {
const query = deleteQuery(this.name, id)
const connection = await this.pool.connect()
const msg = await connection.query(query)
connection.release()
return parseInt(msg.rows[0].msg_id)
}

/**
* Archives a message from the queue to its matching archive
* @param id - the id of the message to delete
* @return the id of the message that was created
*/
public async archiveMessage(id: number): Promise<number> {
const query = archiveQuery(this.name, id)
const connection = await this.pool.connect()
const msg = await connection.query(query)
connection.release()
return parseInt(msg.rows[0].msg_id)
}
}
30 changes: 30 additions & 0 deletions test/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,34 @@ describe("Integration test", () => {
const res = await pgmq.deleteMessage(QUEUE, id)
expect(res).to.eq(id)
})

describe("Test Queue contracts", () => {
const queue = pgmq.getQueue(QUEUE)
it("should read message", async () => {
const msg = await queue.readMessage<TestMessage>(60)
expect(msg?.message?.org).to.eq("acme")
})

it("should archive message", async () => {
const msg = await queue.readMessage(60)
const id = msg?.msgId
if (!id) {
assert.fail("Expected id to be a number")
}
expect(msg?.readCount).to.be.gt(0)
const res = await queue.archiveMessage(id)
expect(res).to.eq(id)
})

it("should delete message", async () => {
const msg = await queue.readMessage(60)
const id = msg?.msgId
if (!id) {
assert.fail("Expected id to be a number")
}
expect(msg?.readCount).to.be.gt(0)
const res = await queue.deleteMessage(id)
expect(res).to.eq(id)
})
})
})