diff --git a/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql b/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql index 11cb1479..07f29c9a 100644 --- a/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql +++ b/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql @@ -1,3 +1,4 @@ +-- Add conditional read -- read -- reads a number of messages from a queue, setting a visibility timeout on them DROP FUNCTION IF EXISTS pgmq.read(TEXT, INTEGER, INTEGER); @@ -191,6 +192,7 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- Add queue visible length to metrics ALTER TYPE pgmq.metrics_result ADD ATTRIBUTE queue_visible_length bigint; DROP FUNCTION pgmq.metrics(queue_name TEXT); @@ -235,3 +237,229 @@ BEGIN RETURN result_row; END; $$ LANGUAGE plpgsql; + +-- Headers +-- Update types +ALTER TYPE pgmq.message_record ADD ATTRIBUTE headers JSONB; + +-- Update functions +DROP FUNCTION pgmq.send(TEXT, JSONB); +DROP FUNCTION pgmq.send(TEXT, JSONB, TIMESTAMP WITH TIME ZONE); +DROP FUNCTION pgmq.send(TEXT, JSONB, INTEGER); +DROP FUNCTION pgmq.send_batch(TEXT, JSONB[]); +DROP FUNCTION pgmq.send_batch(TEXT, JSONB[], TIMESTAMP WITH TIME ZONE); +DROP FUNCTION pgmq.send_batch(TEXT, JSONB[], INTEGER); +DROP FUNCTION pgmq.archive(TEXT, BIGINT); +DROP FUNCTION pgmq.archive(TEXT, BIGINT[]); + +-- send: 2 args, no delay or headers +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp()); +$$ LANGUAGE sql; + +-- send: 3 args with headers +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + headers JSONB +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp()); +$$ LANGUAGE sql; + +-- send: 3 args with integer delay +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send: 3 args with timestamp +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, delay); +$$ LANGUAGE sql; + +-- send: 4 args with integer delay +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + headers JSONB, + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send: actual implementation +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + headers JSONB, + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ +DECLARE + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + sql := FORMAT( + $QUERY$ + INSERT INTO pgmq.%I (vt, message, headers) + VALUES ($2, $1, $3) + RETURNING msg_id; + $QUERY$, + qtable + ); + RETURN QUERY EXECUTE sql USING msg, delay, headers; +END; +$$ LANGUAGE plpgsql; + +-- send batch: 2 args +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[] +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp()); +$$ LANGUAGE sql; + +-- send batch: 3 args with headers +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + headers JSONB[] +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp()); +$$ LANGUAGE sql; + +-- send batch: 3 args with integer delay +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send batch: 3 args with timestamp +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, delay); +$$ LANGUAGE sql; + +-- send_batch: 4 args with integer delay +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + headers JSONB[], + delay INTEGER DEFAULT 0 +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send_batch: actual implementation +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + headers JSONB[], + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ +DECLARE + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + sql := FORMAT( + $QUERY$ + INSERT INTO pgmq.%I (vt, message, headers) + SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[])) + RETURNING msg_id; + $QUERY$, + qtable + ); + RETURN QUERY EXECUTE sql USING msgs, delay, headers; +END; +$$ LANGUAGE plpgsql; + +-- archive +CREATE FUNCTION pgmq.archive( + queue_name TEXT, + msg_ids BIGINT[] +) +RETURNS SETOF BIGINT AS $$ +DECLARE + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); + atable TEXT := pgmq.format_table_name(queue_name, 'a'); +BEGIN + sql := FORMAT( + $QUERY$ + WITH archived AS ( + DELETE FROM pgmq.%I + WHERE msg_id = ANY($1) + RETURNING msg_id, vt, read_ct, enqueued_at, message, headers + ) + INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers) + SELECT msg_id, vt, read_ct, enqueued_at, message + FROM archived + RETURNING msg_id; + $QUERY$, + qtable, atable + ); + RETURN QUERY EXECUTE sql USING msg_ids; +END; +$$ LANGUAGE plpgsql; + +-- archive +CREATE FUNCTION pgmq.archive( + queue_name TEXT, + msg_id BIGINT +) +RETURNS BOOLEAN AS $$ +DECLARE + sql TEXT; + result BIGINT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); + atable TEXT := pgmq.format_table_name(queue_name, 'a'); +BEGIN + sql := FORMAT( + $QUERY$ + WITH archived AS ( + DELETE FROM pgmq.%I + WHERE msg_id = $1 + RETURNING msg_id, vt, read_ct, enqueued_at, message, headers + ) + INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers) + SELECT msg_id, vt, read_ct, enqueued_at, message, headers + FROM archived + RETURNING msg_id; + $QUERY$, + qtable, atable + ); + EXECUTE sql USING msg_id INTO result; + RETURN NOT (result IS NULL); +END; +$$ LANGUAGE plpgsql; + +-- Update existing queues +FOR queue_record IN SELECT queue_name FROM pgmq.meta LOOP + qtable := pgmq.format_table_name(queue_record.queue_name, 'q'); + atable := pgmq.format_table_name(queue_record.queue_name, 'a'); + + EXECUTE format( + 'ALTER TABLE pgmq.%I ADD COLUMN headers JSONB', + qtable + ); + + EXECUTE format( + 'ALTER TABLE pgmq.%I ADD COLUMN headers JSONB', + atable + ); +END LOOP; diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index 303b2b76..b335dc96 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -26,7 +26,8 @@ CREATE TYPE pgmq.message_record AS ( read_ct INTEGER, enqueued_at TIMESTAMP WITH TIME ZONE, vt TIMESTAMP WITH TIME ZONE, - message JSONB + message JSONB, + headers JSONB ); CREATE TYPE pgmq.queue_record AS ( @@ -85,7 +86,7 @@ BEGIN read_ct = read_ct + 1 FROM cte WHERE m.msg_id = cte.msg_id - RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message, m.headers; $QUERY$, qtable, conditional, qtable, make_interval(secs => vt) ); @@ -136,7 +137,7 @@ BEGIN read_ct = read_ct + 1 FROM cte WHERE m.msg_id = cte.msg_id - RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message, m.headers; $QUERY$, qtable, conditional, qtable, make_interval(secs => vt) ); @@ -174,10 +175,10 @@ BEGIN WITH archived AS ( DELETE FROM pgmq.%I WHERE msg_id = $1 - RETURNING msg_id, vt, read_ct, enqueued_at, message + RETURNING msg_id, vt, read_ct, enqueued_at, message, headers ) - INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message) - SELECT msg_id, vt, read_ct, enqueued_at, message + INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers) + SELECT msg_id, vt, read_ct, enqueued_at, message, headers FROM archived RETURNING msg_id; $QUERY$, @@ -206,9 +207,9 @@ BEGIN WITH archived AS ( DELETE FROM pgmq.%I WHERE msg_id = ANY($1) - RETURNING msg_id, vt, read_ct, enqueued_at, message + RETURNING msg_id, vt, read_ct, enqueued_at, message, headers ) - INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message) + INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers) SELECT msg_id, vt, read_ct, enqueued_at, message FROM archived RETURNING msg_id; @@ -267,23 +268,56 @@ BEGIN END; $$ LANGUAGE plpgsql; --- send --- sends a message to a queue, optionally with a delay +-- send: 2 args, no delay or headers +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp()); +$$ LANGUAGE sql; + +-- send: 3 args with headers CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, - delay INTEGER DEFAULT 0 + headers JSONB ) RETURNS SETOF BIGINT AS $$ -BEGIN - RETURN QUERY SELECT * FROM pgmq.send(queue_name, msg, clock_timestamp() + make_interval(secs => delay)); -END; -$$ LANGUAGE plpgsql; + SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp()); +$$ LANGUAGE sql; + +-- send: 3 args with integer delay +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send: 3 args with timestamp +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, delay); +$$ LANGUAGE sql; + +-- send: 4 args with integer delay +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + headers JSONB, + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; --- send_at --- sends a message to a queue, with a delay as a timestamp +-- send: actual implementation CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, + headers JSONB, delay TIMESTAMP WITH TIME ZONE ) RETURNS SETOF BIGINT AS $$ DECLARE @@ -292,33 +326,66 @@ DECLARE BEGIN sql := FORMAT( $QUERY$ - INSERT INTO pgmq.%I (vt, message) - VALUES ($2, $1) + INSERT INTO pgmq.%I (vt, message, headers) + VALUES ($2, $1, $3) RETURNING msg_id; $QUERY$, qtable ); - RETURN QUERY EXECUTE sql USING msg, delay; + RETURN QUERY EXECUTE sql USING msg, delay, headers; END; $$ LANGUAGE plpgsql; --- send_batch --- sends an array of list of messages to a queue, optionally with a delay +-- send batch: 2 args +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[] +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp()); +$$ LANGUAGE sql; + +-- send batch: 3 args with headers +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + headers JSONB[] +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp()); +$$ LANGUAGE sql; + +-- send batch: 3 args with integer delay +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send batch: 3 args with timestamp +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, delay); +$$ LANGUAGE sql; + +-- send_batch: 4 args with integer delay CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], + headers JSONB[], delay INTEGER DEFAULT 0 ) RETURNS SETOF BIGINT AS $$ -BEGIN - RETURN QUERY SELECT * FROM pgmq.send_batch(queue_name, msgs, clock_timestamp() + make_interval(secs => delay)); -END; -$$ LANGUAGE plpgsql; + SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; --- send_batch_at --- sends an array of list of messages to a queue, with a delay as a timestamp +-- send_batch: actual implementation CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], + headers JSONB[], delay TIMESTAMP WITH TIME ZONE ) RETURNS SETOF BIGINT AS $$ DECLARE @@ -327,13 +394,13 @@ DECLARE BEGIN sql := FORMAT( $QUERY$ - INSERT INTO pgmq.%I (vt, message) - SELECT $2, unnest($1) + INSERT INTO pgmq.%I (vt, message, headers) + SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[])) RETURNING msg_id; $QUERY$, qtable ); - RETURN QUERY EXECUTE sql USING msgs, delay; + RETURN QUERY EXECUTE sql USING msgs, delay, headers; END; $$ LANGUAGE plpgsql; @@ -631,7 +698,8 @@ BEGIN read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ) $QUERY$, qtable @@ -645,7 +713,8 @@ BEGIN enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ); $QUERY$, atable @@ -699,7 +768,8 @@ BEGIN read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ) $QUERY$, qtable @@ -713,7 +783,8 @@ BEGIN enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ); $QUERY$, atable @@ -819,7 +890,8 @@ BEGIN read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ) PARTITION BY RANGE (%I) $QUERY$, qtable, partition_col @@ -895,7 +967,8 @@ BEGIN enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ) PARTITION BY RANGE (%I); $QUERY$, atable, a_partition_col @@ -1034,6 +1107,5 @@ BEGIN retention_interval, qualified_a_table_name ); - END; $$ LANGUAGE plpgsql; diff --git a/pgmq-extension/test/expected/base.out b/pgmq-extension/test/expected/base.out index 34ae005a..52dfea5d 100644 --- a/pgmq-extension/test/expected/base.out +++ b/pgmq-extension/test/expected/base.out @@ -503,8 +503,8 @@ SELECT pgmq.create('test_pop_queue'); (1 row) SELECT * FROM pgmq.pop('test_pop_queue'); - msg_id | read_ct | enqueued_at | vt | message ---------+---------+-------------+----+--------- + msg_id | read_ct | enqueued_at | vt | message | headers +--------+---------+-------------+----+---------+--------- (0 rows) SELECT send AS first_msg_id from pgmq.send('test_pop_queue', '0') \gset @@ -534,8 +534,8 @@ SELECT pgmq.create('test_set_vt_queue'); (1 row) SELECT * FROM pgmq.set_vt('test_set_vt_queue', 9999, 0); - msg_id | read_ct | enqueued_at | vt | message ---------+---------+-------------+----+--------- + msg_id | read_ct | enqueued_at | vt | message | headers +--------+---------+-------------+----+---------+--------- (0 rows) SELECT send AS first_msg_id from pgmq.send('test_set_vt_queue', '0') \gset