From 819fbf2470579c2122ffa44ab5fc7c2cae9da114 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Thu, 31 Oct 2024 17:54:13 -0300 Subject: [PATCH 1/5] Implement headers Implement message headers --- pgmq-extension/sql/pgmq.sql | 94 ++++++++++++++++++++++++++++--------- 1 file changed, 73 insertions(+), 21 deletions(-) diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index 303b2b76..219e19e5 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -26,7 +26,8 @@ CREATE TYPE pgmq.message_record AS ( read_ct INTEGER, enqueued_at TIMESTAMP WITH TIME ZONE, vt TIMESTAMP WITH TIME ZONE, - message JSONB + message JSONB, + headers JSONB ); CREATE TYPE pgmq.queue_record AS ( @@ -85,7 +86,7 @@ BEGIN read_ct = read_ct + 1 FROM cte WHERE m.msg_id = cte.msg_id - RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message, m.headers; $QUERY$, qtable, conditional, qtable, make_interval(secs => vt) ); @@ -136,7 +137,7 @@ BEGIN read_ct = read_ct + 1 FROM cte WHERE m.msg_id = cte.msg_id - RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message; + RETURNING m.msg_id, m.read_ct, m.enqueued_at, m.vt, m.message, m.headers; $QUERY$, qtable, conditional, qtable, make_interval(secs => vt) ); @@ -269,21 +270,44 @@ $$ LANGUAGE plpgsql; -- send -- sends a message to a queue, optionally with a delay +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp()); +$$ LANGUAGE sql; + CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, - delay INTEGER DEFAULT 0 + delay_or_headers anyelement ) RETURNS SETOF BIGINT AS $$ BEGIN - RETURN QUERY SELECT * FROM pgmq.send(queue_name, msg, clock_timestamp() + make_interval(secs => delay)); + IF pg_typeof(delay_or_headers) = 'jsonb'::regtype THEN + RETURN QUERY SELECT * FROM pgmq.send(queue_name, msg, delay_or_headers, clock_timestamp()); + ELSIF pg_typeof(delay_or_headers) = 'integer'::regtype OR pg_typeof(delay_or_headers) = 'timestamptz'::regtype THEN + RETURN QUERY SELECT * FROM pgmq.send(queue_name, msg, NULL, delay_or_headers); + ELSE + RAISE EXCEPTION 'Invalid delay_or_headers type: %. Expected integer, timestamptz, or jsonb', pg_typeofof(p_input); + END IF; END; $$ LANGUAGE plpgsql; --- send_at --- sends a message to a queue, with a delay as a timestamp +-- sends a message to a queue, with headers and with a delay as a integer +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + headers JSONB, + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- sends a message to a queue, with headers and with a delay as a timestamp CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, + headers JSONB, delay TIMESTAMP WITH TIME ZONE ) RETURNS SETOF BIGINT AS $$ DECLARE @@ -292,28 +316,51 @@ DECLARE BEGIN sql := FORMAT( $QUERY$ - INSERT INTO pgmq.%I (vt, message) - VALUES ($2, $1) + INSERT INTO pgmq.%I (vt, message, headers) + VALUES ($2, $1, $3) RETURNING msg_id; $QUERY$, qtable ); - RETURN QUERY EXECUTE sql USING msg, delay; + RETURN QUERY EXECUTE sql USING msg, delay, headers; END; $$ LANGUAGE plpgsql; --- send_batch --- sends an array of list of messages to a queue, optionally with a delay CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], - delay INTEGER DEFAULT 0 + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp()); +$$ LANGUAGE sql; + +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + delay_or_headers ANYELEMENT ) RETURNS SETOF BIGINT AS $$ BEGIN - RETURN QUERY SELECT * FROM pgmq.send_batch(queue_name, msgs, clock_timestamp() + make_interval(secs => delay)); + IF pg_typeof(delay_or_headers) = 'jsonb[]'::regtype THEN + + ELSIF pg_typeof(delay_or_headers) = 'integer'::regtype OR pg_typeof(delay_or_headers) = 'timestamptz'::regtype THEN + RETURN QUERY SELECT * FROM pgmq.send_batch(queue_name, msg, NULL, delay_or_headers); + ELSE + RAISE EXCEPTION 'Invalid delay_or_headers type: %. Expected integer, timestamptz, or jsonb[]', pg_typeofof(p_input); + END IF; END; $$ LANGUAGE plpgsql; +-- send_batch +-- sends an array of list of messages to a queue, optionally with a delay +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + headers JSONB[], + delay INTEGER DEFAULT 0 +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + -- send_batch_at -- sends an array of list of messages to a queue, with a delay as a timestamp CREATE FUNCTION pgmq.send_batch( @@ -631,7 +678,8 @@ BEGIN read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ) $QUERY$, qtable @@ -645,7 +693,8 @@ BEGIN enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ); $QUERY$, atable @@ -699,7 +748,8 @@ BEGIN read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ) $QUERY$, qtable @@ -713,7 +763,8 @@ BEGIN enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ); $QUERY$, atable @@ -819,7 +870,8 @@ BEGIN read_ct INT DEFAULT 0 NOT NULL, enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ) PARTITION BY RANGE (%I) $QUERY$, qtable, partition_col @@ -895,7 +947,8 @@ BEGIN enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, archived_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, vt TIMESTAMP WITH TIME ZONE NOT NULL, - message JSONB + message JSONB, + headers JSONB ) PARTITION BY RANGE (%I); $QUERY$, atable, a_partition_col @@ -1034,6 +1087,5 @@ BEGIN retention_interval, qualified_a_table_name ); - END; $$ LANGUAGE plpgsql; From e497c8ee783caa9bd80c380fac35b919eb90793b Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Sat, 2 Nov 2024 16:03:35 -0300 Subject: [PATCH 2/5] refactoring --- pgmq-extension/sql/pgmq.sql | 88 +++++++++++++++++++++++-------------- 1 file changed, 54 insertions(+), 34 deletions(-) diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index 219e19e5..46f49214 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -268,8 +268,7 @@ BEGIN END; $$ LANGUAGE plpgsql; --- send --- sends a message to a queue, optionally with a delay +-- send: 2 args, no delay or headers CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB @@ -277,23 +276,34 @@ CREATE FUNCTION pgmq.send( SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp()); $$ LANGUAGE sql; +-- send: 3 args with headers CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, - delay_or_headers anyelement + headers JSONB ) RETURNS SETOF BIGINT AS $$ -BEGIN - IF pg_typeof(delay_or_headers) = 'jsonb'::regtype THEN - RETURN QUERY SELECT * FROM pgmq.send(queue_name, msg, delay_or_headers, clock_timestamp()); - ELSIF pg_typeof(delay_or_headers) = 'integer'::regtype OR pg_typeof(delay_or_headers) = 'timestamptz'::regtype THEN - RETURN QUERY SELECT * FROM pgmq.send(queue_name, msg, NULL, delay_or_headers); - ELSE - RAISE EXCEPTION 'Invalid delay_or_headers type: %. Expected integer, timestamptz, or jsonb', pg_typeofof(p_input); - END IF; -END; -$$ LANGUAGE plpgsql; + SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp()); +$$ LANGUAGE sql; --- sends a message to a queue, with headers and with a delay as a integer +-- send: 3 args with integer delay +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send: 3 args with timestamp +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, delay); +$$ LANGUAGE sql; + +-- send: 4 args with integer delay CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, @@ -303,7 +313,7 @@ CREATE FUNCTION pgmq.send( SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay)); $$ LANGUAGE sql; --- sends a message to a queue, with headers and with a delay as a timestamp +-- send: actual implementation CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, @@ -326,32 +336,42 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- send batch: 2 args CREATE FUNCTION pgmq.send_batch( queue_name TEXT, - msgs JSONB[], - delay INTEGER + msgs JSONB[] ) RETURNS SETOF BIGINT AS $$ SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp()); $$ LANGUAGE sql; +-- send batch: 3 args with headers CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], - delay_or_headers ANYELEMENT + headers JSONB[] ) RETURNS SETOF BIGINT AS $$ -BEGIN - IF pg_typeof(delay_or_headers) = 'jsonb[]'::regtype THEN + SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp()); +$$ LANGUAGE sql; - ELSIF pg_typeof(delay_or_headers) = 'integer'::regtype OR pg_typeof(delay_or_headers) = 'timestamptz'::regtype THEN - RETURN QUERY SELECT * FROM pgmq.send_batch(queue_name, msg, NULL, delay_or_headers); - ELSE - RAISE EXCEPTION 'Invalid delay_or_headers type: %. Expected integer, timestamptz, or jsonb[]', pg_typeofof(p_input); - END IF; -END; -$$ LANGUAGE plpgsql; +-- send batch: 3 args with integer delay +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send batch: 3 args with timestamp +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, delay); +$$ LANGUAGE sql; --- send_batch --- sends an array of list of messages to a queue, optionally with a delay +-- send_batch: 4 args with integer delay CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], @@ -361,11 +381,11 @@ CREATE FUNCTION pgmq.send_batch( SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay)); $$ LANGUAGE sql; --- send_batch_at --- sends an array of list of messages to a queue, with a delay as a timestamp +-- send_batch: actual implementation CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], + headers JSONB[], delay TIMESTAMP WITH TIME ZONE ) RETURNS SETOF BIGINT AS $$ DECLARE @@ -374,13 +394,13 @@ DECLARE BEGIN sql := FORMAT( $QUERY$ - INSERT INTO pgmq.%I (vt, message) - SELECT $2, unnest($1) + INSERT INTO pgmq.%I (vt, message, headers) + SELECT $2, unnest($1), unnest($2) RETURNING msg_id; $QUERY$, qtable ); - RETURN QUERY EXECUTE sql USING msgs, delay; + RETURN QUERY EXECUTE sql USING msgs, delay, headers; END; $$ LANGUAGE plpgsql; From cbb8f552e7cc2fb87ad9ed682d617f0fa344ca67 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Sat, 2 Nov 2024 17:02:37 -0300 Subject: [PATCH 3/5] test fixes --- pgmq-extension/sql/pgmq.sql | 2 +- pgmq-extension/test/expected/base.out | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index 46f49214..bf8632df 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -395,7 +395,7 @@ BEGIN sql := FORMAT( $QUERY$ INSERT INTO pgmq.%I (vt, message, headers) - SELECT $2, unnest($1), unnest($2) + SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[])) RETURNING msg_id; $QUERY$, qtable diff --git a/pgmq-extension/test/expected/base.out b/pgmq-extension/test/expected/base.out index 34ae005a..52dfea5d 100644 --- a/pgmq-extension/test/expected/base.out +++ b/pgmq-extension/test/expected/base.out @@ -503,8 +503,8 @@ SELECT pgmq.create('test_pop_queue'); (1 row) SELECT * FROM pgmq.pop('test_pop_queue'); - msg_id | read_ct | enqueued_at | vt | message ---------+---------+-------------+----+--------- + msg_id | read_ct | enqueued_at | vt | message | headers +--------+---------+-------------+----+---------+--------- (0 rows) SELECT send AS first_msg_id from pgmq.send('test_pop_queue', '0') \gset @@ -534,8 +534,8 @@ SELECT pgmq.create('test_set_vt_queue'); (1 row) SELECT * FROM pgmq.set_vt('test_set_vt_queue', 9999, 0); - msg_id | read_ct | enqueued_at | vt | message ---------+---------+-------------+----+--------- + msg_id | read_ct | enqueued_at | vt | message | headers +--------+---------+-------------+----+---------+--------- (0 rows) SELECT send AS first_msg_id from pgmq.send('test_set_vt_queue', '0') \gset From 686d27820f6bab652bdab49d97ecd720552bfa06 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Sun, 24 Nov 2024 21:33:18 -0300 Subject: [PATCH 4/5] Archive headers --- pgmq-extension/sql/pgmq.sql | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index bf8632df..b335dc96 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -175,10 +175,10 @@ BEGIN WITH archived AS ( DELETE FROM pgmq.%I WHERE msg_id = $1 - RETURNING msg_id, vt, read_ct, enqueued_at, message + RETURNING msg_id, vt, read_ct, enqueued_at, message, headers ) - INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message) - SELECT msg_id, vt, read_ct, enqueued_at, message + INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers) + SELECT msg_id, vt, read_ct, enqueued_at, message, headers FROM archived RETURNING msg_id; $QUERY$, @@ -207,9 +207,9 @@ BEGIN WITH archived AS ( DELETE FROM pgmq.%I WHERE msg_id = ANY($1) - RETURNING msg_id, vt, read_ct, enqueued_at, message + RETURNING msg_id, vt, read_ct, enqueued_at, message, headers ) - INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message) + INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers) SELECT msg_id, vt, read_ct, enqueued_at, message FROM archived RETURNING msg_id; From 9ae6022cb0661935d6b6a59ebbbe614e7accb301 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Sun, 24 Nov 2024 22:07:19 -0300 Subject: [PATCH 5/5] Migration --- pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql | 228 ++++++++++++++++++++++ 1 file changed, 228 insertions(+) diff --git a/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql b/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql index 11cb1479..07f29c9a 100644 --- a/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql +++ b/pgmq-extension/sql/pgmq--1.4.4--1.5.0.sql @@ -1,3 +1,4 @@ +-- Add conditional read -- read -- reads a number of messages from a queue, setting a visibility timeout on them DROP FUNCTION IF EXISTS pgmq.read(TEXT, INTEGER, INTEGER); @@ -191,6 +192,7 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- Add queue visible length to metrics ALTER TYPE pgmq.metrics_result ADD ATTRIBUTE queue_visible_length bigint; DROP FUNCTION pgmq.metrics(queue_name TEXT); @@ -235,3 +237,229 @@ BEGIN RETURN result_row; END; $$ LANGUAGE plpgsql; + +-- Headers +-- Update types +ALTER TYPE pgmq.message_record ADD ATTRIBUTE headers JSONB; + +-- Update functions +DROP FUNCTION pgmq.send(TEXT, JSONB); +DROP FUNCTION pgmq.send(TEXT, JSONB, TIMESTAMP WITH TIME ZONE); +DROP FUNCTION pgmq.send(TEXT, JSONB, INTEGER); +DROP FUNCTION pgmq.send_batch(TEXT, JSONB[]); +DROP FUNCTION pgmq.send_batch(TEXT, JSONB[], TIMESTAMP WITH TIME ZONE); +DROP FUNCTION pgmq.send_batch(TEXT, JSONB[], INTEGER); +DROP FUNCTION pgmq.archive(TEXT, BIGINT); +DROP FUNCTION pgmq.archive(TEXT, BIGINT[]); + +-- send: 2 args, no delay or headers +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp()); +$$ LANGUAGE sql; + +-- send: 3 args with headers +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + headers JSONB +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp()); +$$ LANGUAGE sql; + +-- send: 3 args with integer delay +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send: 3 args with timestamp +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, NULL, delay); +$$ LANGUAGE sql; + +-- send: 4 args with integer delay +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + headers JSONB, + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send(queue_name, msg, headers, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send: actual implementation +CREATE FUNCTION pgmq.send( + queue_name TEXT, + msg JSONB, + headers JSONB, + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ +DECLARE + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + sql := FORMAT( + $QUERY$ + INSERT INTO pgmq.%I (vt, message, headers) + VALUES ($2, $1, $3) + RETURNING msg_id; + $QUERY$, + qtable + ); + RETURN QUERY EXECUTE sql USING msg, delay, headers; +END; +$$ LANGUAGE plpgsql; + +-- send batch: 2 args +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[] +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp()); +$$ LANGUAGE sql; + +-- send batch: 3 args with headers +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + headers JSONB[] +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp()); +$$ LANGUAGE sql; + +-- send batch: 3 args with integer delay +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + delay INTEGER +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send batch: 3 args with timestamp +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, NULL, delay); +$$ LANGUAGE sql; + +-- send_batch: 4 args with integer delay +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + headers JSONB[], + delay INTEGER DEFAULT 0 +) RETURNS SETOF BIGINT AS $$ + SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay)); +$$ LANGUAGE sql; + +-- send_batch: actual implementation +CREATE FUNCTION pgmq.send_batch( + queue_name TEXT, + msgs JSONB[], + headers JSONB[], + delay TIMESTAMP WITH TIME ZONE +) RETURNS SETOF BIGINT AS $$ +DECLARE + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + sql := FORMAT( + $QUERY$ + INSERT INTO pgmq.%I (vt, message, headers) + SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[])) + RETURNING msg_id; + $QUERY$, + qtable + ); + RETURN QUERY EXECUTE sql USING msgs, delay, headers; +END; +$$ LANGUAGE plpgsql; + +-- archive +CREATE FUNCTION pgmq.archive( + queue_name TEXT, + msg_ids BIGINT[] +) +RETURNS SETOF BIGINT AS $$ +DECLARE + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); + atable TEXT := pgmq.format_table_name(queue_name, 'a'); +BEGIN + sql := FORMAT( + $QUERY$ + WITH archived AS ( + DELETE FROM pgmq.%I + WHERE msg_id = ANY($1) + RETURNING msg_id, vt, read_ct, enqueued_at, message, headers + ) + INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers) + SELECT msg_id, vt, read_ct, enqueued_at, message + FROM archived + RETURNING msg_id; + $QUERY$, + qtable, atable + ); + RETURN QUERY EXECUTE sql USING msg_ids; +END; +$$ LANGUAGE plpgsql; + +-- archive +CREATE FUNCTION pgmq.archive( + queue_name TEXT, + msg_id BIGINT +) +RETURNS BOOLEAN AS $$ +DECLARE + sql TEXT; + result BIGINT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); + atable TEXT := pgmq.format_table_name(queue_name, 'a'); +BEGIN + sql := FORMAT( + $QUERY$ + WITH archived AS ( + DELETE FROM pgmq.%I + WHERE msg_id = $1 + RETURNING msg_id, vt, read_ct, enqueued_at, message, headers + ) + INSERT INTO pgmq.%I (msg_id, vt, read_ct, enqueued_at, message, headers) + SELECT msg_id, vt, read_ct, enqueued_at, message, headers + FROM archived + RETURNING msg_id; + $QUERY$, + qtable, atable + ); + EXECUTE sql USING msg_id INTO result; + RETURN NOT (result IS NULL); +END; +$$ LANGUAGE plpgsql; + +-- Update existing queues +FOR queue_record IN SELECT queue_name FROM pgmq.meta LOOP + qtable := pgmq.format_table_name(queue_record.queue_name, 'q'); + atable := pgmq.format_table_name(queue_record.queue_name, 'a'); + + EXECUTE format( + 'ALTER TABLE pgmq.%I ADD COLUMN headers JSONB', + qtable + ); + + EXECUTE format( + 'ALTER TABLE pgmq.%I ADD COLUMN headers JSONB', + atable + ); +END LOOP;