Skip to content

Commit

Permalink
feat: Add queue name validation (#3)
Browse files Browse the repository at this point in the history
* chore: Add queue name validation

* chore: Add queue name validation
  • Loading branch information
nimrodkor authored Oct 10, 2024
1 parent 20bdad6 commit 5421c96
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 14 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"main": "index.ts",
"scripts": {
"lint": "eslint src --cache",
"format:write": "prettier --write \"**/*.{ts,tsx}\"",
"format:fix": "prettier --write \"**/*.{ts,tsx}\"",
"format:check": "prettier --check \"**/*.{ts,tsx}\"",
"test": "mocha --require ts-node/register test/**/*.spec.ts --exit"
},
Expand Down
42 changes: 30 additions & 12 deletions src/classes/pgmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ const PGMQ_SCHEMA = "pgmq"
const QUEUE_PREFIX = "q"
const ARCHIVE_PREFIX = "a"

// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
const NAMELEN = 64
const BIGGEST_CONCAT = "archived_at_idx_"
const MAX_PGMQ_QUEUE_LEN = NAMELEN - 1 - BIGGEST_CONCAT.length

interface DbMessage {
msg_id: string
read_ct: string
Expand Down Expand Up @@ -39,6 +44,7 @@ export class Pgmq {
* @param name - the name of the queue
* **/
public async createQueue(name: string) {
validateQueueName(name)
const connection = await this.pool.connect()
const query = `
CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name}
Expand Down Expand Up @@ -88,8 +94,8 @@ export class Pgmq {
const connection = await this.pool.connect()

const query = `INSERT INTO ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} (vt, message)
VALUES ((now() + interval '${vt} seconds'), $1::jsonb)
RETURNING msg_id;`
VALUES ((now() + interval '${vt} seconds'), $1::jsonb)
RETURNING msg_id;`
const res = await connection.query(query, [JSON.stringify(message)])
connection.release()
return parseInt(res.rows[0].msg_id)
Expand All @@ -106,16 +112,16 @@ export class Pgmq {
public async readMessage<T>(queue: string, vt: number) {
const connection = await this.pool.connect()
const query = `WITH cte AS
(SELECT msg_id
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
ORDER BY msg_id
LIMIT 1 FOR UPDATE SKIP LOCKED)
UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t
SET vt = now() + interval '${vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id = cte.msg_id
RETURNING *;`
(SELECT msg_id
FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue}
ORDER BY msg_id
LIMIT 1 FOR UPDATE SKIP LOCKED)
UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t
SET vt = now() + interval '${vt} seconds',
read_ct = read_ct + 1
FROM cte
WHERE t.msg_id = cte.msg_id
RETURNING *;`
const msg = await connection.query(query)
connection.release()
return this.parseDbMessage<T>(msg.rows[0])
Expand Down Expand Up @@ -171,3 +177,15 @@ export class Pgmq {
}
}
}

const validateQueueName = (name: string) => {
if (name.length > MAX_PGMQ_QUEUE_LEN) {
throw new Error("Queue name is too long")
}
const alphanumericRegex = /^[a-zA-Z0-9_]+$/
if (!alphanumericRegex.test(name)) {
throw new Error(
`Queue name must be made of only alphanumeric characters and the '_' character`
)
}
}
1 change: 0 additions & 1 deletion test/integration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ describe("Integration test", () => {

after(async () => {
await pgmq.deleteQueue(QUEUE)
await pgmq.deleteSchema()
})

beforeEach(async () => {
Expand Down

0 comments on commit 5421c96

Please sign in to comment.