diff --git a/package.json b/package.json index d1cd971..b2130c8 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "main": "index.ts", "scripts": { "lint": "eslint src --cache", - "format:write": "prettier --write \"**/*.{ts,tsx}\"", + "format:fix": "prettier --write \"**/*.{ts,tsx}\"", "format:check": "prettier --check \"**/*.{ts,tsx}\"", "test": "mocha --require ts-node/register test/**/*.spec.ts --exit" }, diff --git a/src/classes/pgmq.ts b/src/classes/pgmq.ts index 5bd58ed..33445b5 100644 --- a/src/classes/pgmq.ts +++ b/src/classes/pgmq.ts @@ -5,6 +5,11 @@ const PGMQ_SCHEMA = "pgmq" const QUEUE_PREFIX = "q" const ARCHIVE_PREFIX = "a" +// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS +const NAMELEN = 64 +const BIGGEST_CONCAT = "archived_at_idx_" +const MAX_PGMQ_QUEUE_LEN = NAMELEN - 1 - BIGGEST_CONCAT.length + interface DbMessage { msg_id: string read_ct: string @@ -39,6 +44,7 @@ export class Pgmq { * @param name - the name of the queue * **/ public async createQueue(name: string) { + validateQueueName(name) const connection = await this.pool.connect() const query = ` CREATE TABLE IF NOT EXISTS ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${name} @@ -88,8 +94,8 @@ export class Pgmq { const connection = await this.pool.connect() const query = `INSERT INTO ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} (vt, message) - VALUES ((now() + interval '${vt} seconds'), $1::jsonb) - RETURNING msg_id;` + VALUES ((now() + interval '${vt} seconds'), $1::jsonb) + RETURNING msg_id;` const res = await connection.query(query, [JSON.stringify(message)]) connection.release() return parseInt(res.rows[0].msg_id) @@ -106,16 +112,16 @@ export class Pgmq { public async readMessage(queue: string, vt: number) { const connection = await this.pool.connect() const query = `WITH cte AS - (SELECT msg_id - FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} - ORDER BY msg_id - LIMIT 1 FOR UPDATE SKIP LOCKED) - UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t - SET vt = now() + interval '${vt} seconds', - read_ct = read_ct + 1 - FROM cte - WHERE t.msg_id = cte.msg_id - RETURNING *;` + (SELECT msg_id + FROM ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} + ORDER BY msg_id + LIMIT 1 FOR UPDATE SKIP LOCKED) + UPDATE ${PGMQ_SCHEMA}.${QUEUE_PREFIX}_${queue} t + SET vt = now() + interval '${vt} seconds', + read_ct = read_ct + 1 + FROM cte + WHERE t.msg_id = cte.msg_id + RETURNING *;` const msg = await connection.query(query) connection.release() return this.parseDbMessage(msg.rows[0]) @@ -171,3 +177,15 @@ export class Pgmq { } } } + +const validateQueueName = (name: string) => { + if (name.length > MAX_PGMQ_QUEUE_LEN) { + throw new Error("Queue name is too long") + } + const alphanumericRegex = /^[a-zA-Z0-9_]+$/ + if (!alphanumericRegex.test(name)) { + throw new Error( + `Queue name must be made of only alphanumeric characters and the '_' character` + ) + } +} diff --git a/test/integration.spec.ts b/test/integration.spec.ts index 3eb8108..6e2ba48 100644 --- a/test/integration.spec.ts +++ b/test/integration.spec.ts @@ -41,7 +41,6 @@ describe("Integration test", () => { after(async () => { await pgmq.deleteQueue(QUEUE) - await pgmq.deleteSchema() }) beforeEach(async () => {