From 9e48c61c5abe902e4ae1dc53f52f17e561755165 Mon Sep 17 00:00:00 2001 From: Neptune Date: Thu, 17 Oct 2024 15:27:03 -0500 Subject: [PATCH 01/13] Create send_at and send_batch_at functions receiving a timestamp --- docs/api/sql/functions.md | 67 +++++++++++++++++++ pgmq-extension/sql/pgmq.sql | 46 +++++++++++++ .../tembo_pgmq_python/async_queue.py | 44 ++++++++++++ tembo-pgmq-python/tembo_pgmq_python/queue.py | 17 +++++ 4 files changed, 174 insertions(+) diff --git a/docs/api/sql/functions.md b/docs/api/sql/functions.md index f5612293..9176b500 100644 --- a/docs/api/sql/functions.md +++ b/docs/api/sql/functions.md @@ -35,6 +35,39 @@ select * from pgmq.send('my_queue', '{"hello": "world"}'); --- +### send_at + +Send a single message to a queue with delay as a timestamp. + +```text +pgmq.send_at( + queue_name text, + msg jsonb, + delay timestamp +) + +RETURNS SETOF bigint +``` + +**Parameters:** + +| Parameter | Type | Description | +| :--- | :---- | :--- | +| queue_name | text | The name of the queue | +| msg | jsonb | The message to send to the queue | +| delay | timestamp | Timestamp until the message becomes visible. | + +Example: + +```sql +select * from pgmq.send_at('my_queue', '{"hello": "world"}', CURRENT_TIMESTAMP); + send_at +--------- + 4 +``` + +--- + ### send_batch Send 1 or more messages to a queue. @@ -68,6 +101,40 @@ select * from pgmq.send_batch('my_queue', ARRAY[ --- +### send_batch_at + +Send 1 or more messages to a queue with delay as a timestamp. + +```text +pgmq.send_batch( + queue_name text, + msgs jsonb[], + delay timestamp +) +RETURNS SETOF bigint +``` +**Parameters:** + +| Parameter | Type | Description | +| :--- | :---- | :--- | +| queue_name | text | The name of the queue | +| msgs | jsonb[] | Array of messages to send to the queue | +| delay | timestamp | Timestamp until the messages become visible. | + +```sql +select * from pgmq.send_batch_at('my_queue', ARRAY[ + '{"hello": "world_0"}'::jsonb, + '{"hello": "world_1"}'::jsonb], + CURRENT_TIMESTAMP +); + send_batch_at +--------------- + 1 + 2 +``` + +--- + ## Reading Messages ### read diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index 751a22f8..8150b0b6 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -282,6 +282,29 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- send_at +-- sends a message to a queue, with a delay as a timestamp +CREATE FUNCTION pgmq.send_at( + queue_name TEXT, + msg JSONB, + delay TIMESTAMP +) RETURNS SETOF BIGINT AS $$ +DECLARE + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + sql := FORMAT( + $QUERY$ + INSERT INTO pgmq.%I (vt, message) + VALUES ((clock_timestamp() + %L), $1) + RETURNING msg_id; + $QUERY$, + qtable, AGE(date_trunc('second', delay), date_trunc('second', CURRENT_TIMESTAMP)) + ); + RETURN QUERY EXECUTE sql USING msg; +END; +$$ LANGUAGE plpgsql; + -- send_batch -- sends an array of list of messages to a queue, optionally with a delay CREATE FUNCTION pgmq.send_batch( @@ -305,6 +328,29 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- send_batch_at +-- sends an array of list of messages to a queue, with a delay as a timestamp +CREATE FUNCTION pgmq.send_batch_at( + queue_name TEXT, + msgs JSONB[], + delay TIMESTAMP +) RETURNS SETOF BIGINT AS $$ +DECLARE + sql TEXT; + qtable TEXT := pgmq.format_table_name(queue_name, 'q'); +BEGIN + sql := FORMAT( + $QUERY$ + INSERT INTO pgmq.%I (vt, message) + SELECT clock_timestamp() + %L, unnest($1) + RETURNING msg_id; + $QUERY$, + qtable, AGE(date_trunc('second', delay), date_trunc('second', CURRENT_TIMESTAMP)) + ); + RETURN QUERY EXECUTE sql USING msgs; +END; +$$ LANGUAGE plpgsql; + -- returned by pgmq.metrics() and pgmq.metrics_all CREATE TYPE pgmq.metrics_result AS ( queue_name text, diff --git a/tembo-pgmq-python/tembo_pgmq_python/async_queue.py b/tembo-pgmq-python/tembo_pgmq_python/async_queue.py index f3137820..731a2fd2 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/async_queue.py +++ b/tembo-pgmq-python/tembo_pgmq_python/async_queue.py @@ -175,6 +175,27 @@ async def _send_internal(self, queue, message, delay, conn): self.logger.debug(f"Message sent with msg_id={result[0]}") return result[0] + @transaction + async def send_at(self, queue: str, message: dict, delay: str, conn=None) -> int: + """Send a message to a queue with timestamp.""" + self.logger.debug(f"send_at called with queue='{queue}', message={message}, delay={delay}, conn={conn}") + if conn is None: + async with self.pool.acquire() as conn: + return await self._send_at_internal(queue, message, delay, conn) + else: + return await self._send_at_internal(queue, message, delay, conn) + + async def _send_at_internal(self, queue, message, delay, conn): + self.logger.debug(f"Sending message to queue '{queue}' with delay={delay}") + result = await conn.fetchrow( + "SELECT * FROM pgmq.send_at($1, $2::jsonb, $3);", + queue, + dumps(message).decode("utf-8"), + delay, + ) + self.logger.debug(f"Message sent with msg_id={result[0]}") + return result[0] + @transaction async def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None) -> List[int]: """Send a batch of messages to a queue.""" @@ -198,6 +219,29 @@ async def _send_batch_internal(self, queue, messages, delay, conn): self.logger.debug(f"Batch messages sent with msg_ids={msg_ids}") return msg_ids + @transaction + async def send_batch_at(self, queue: str, messages: List[dict], delay: str, conn=None) -> List[int]: + """Send a batch of messages to a queue with timestamp.""" + self.logger.debug(f"send_batch_at called with queue='{queue}', messages={messages}, delay={delay}, conn={conn}") + if conn is None: + async with self.pool.acquire() as conn: + return await self._send_batch_at_internal(queue, messages, delay, conn) + else: + return await self._send_batch_at_internal(queue, messages, delay, conn) + + async def _send_batch_at_internal(self, queue, messages, delay, conn): + self.logger.debug(f"Sending batch of messages to queue '{queue}' with delay={delay}") + jsonb_array = [dumps(message).decode("utf-8") for message in messages] + result = await conn.fetch( + "SELECT * FROM pgmq.send_batch_at($1, $2::jsonb[], $3);", + queue, + jsonb_array, + delay, + ) + msg_ids = [message[0] for message in result] + self.logger.debug(f"Batch messages sent with msg_ids={msg_ids}") + return msg_ids + @transaction async def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Message]: """Read a message from a queue.""" diff --git a/tembo-pgmq-python/tembo_pgmq_python/queue.py b/tembo-pgmq-python/tembo_pgmq_python/queue.py index f4a56e83..2c61a950 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/queue.py +++ b/tembo-pgmq-python/tembo_pgmq_python/queue.py @@ -119,6 +119,14 @@ def send(self, queue: str, message: dict, delay: int = 0, conn=None) -> int: result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn) return result[0][0] + @transaction + def send_at(self, queue: str, message: dict, delay: str, conn=None) -> int: + """Send a message to a queue with timestamp.""" + self.logger.debug(f"send called with conn: {conn}") + query = "select * from pgmq.send_at(%s, %s, %s);" + result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn) + return result[0][0] + @transaction def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None) -> List[int]: """Send a batch of messages to a queue.""" @@ -128,6 +136,15 @@ def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None result = self._execute_query_with_result(query, params, conn=conn) return [message[0] for message in result] + @transaction + def send_batch_at(self, queue: str, messages: List[dict], delay: str, conn=None) -> List[int]: + """Send a batch of messages to a queue with timestamp.""" + self.logger.debug(f"send_batch called with conn: {conn}") + query = "select * from pgmq.send_batch_at(%s, %s, %s);" + params = [queue, [Jsonb(message) for message in messages], delay] + result = self._execute_query_with_result(query, params, conn=conn) + return [message[0] for message in result] + @transaction def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Message]: """Read a message from a queue.""" From 5c77ffc88c66efc520cc1061c0613036c35dc867 Mon Sep 17 00:00:00 2001 From: Neptune Date: Fri, 18 Oct 2024 14:38:07 -0500 Subject: [PATCH 02/13] Add test for send_at --- pgmq-extension/test/expected/base.out | 26 ++++++++++++++++++++++++++ pgmq-extension/test/sql/base.sql | 11 +++++++++++ 2 files changed, 37 insertions(+) diff --git a/pgmq-extension/test/expected/base.out b/pgmq-extension/test/expected/base.out index c527f5da..294c5de1 100644 --- a/pgmq-extension/test/expected/base.out +++ b/pgmq-extension/test/expected/base.out @@ -107,6 +107,32 @@ SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1); t (1 row) +SELECT pgmq.create('test_default_queue_vt'); + create +-------- + +(1 row) + +-- send message with timestamp +SELECT * from pgmq.send_at('test_default_queue_vt', '{"hello": "world"}', CURRENT_TIMESTAMP + '5 seconds'::interval); + send_at +--------- + 1 +(1 row) + +-- read, assert no messages because we set timestamp to the future +SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue_vt', 2, 1); + ?column? +---------- +(0 rows) + +-- read again, now using poll to block until message is ready +SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue_vt', 10, 1, 10); + ?column? +---------- + t +(1 row) + -- send a batch of 2 messages SELECT pgmq.create('batch_queue'); create diff --git a/pgmq-extension/test/sql/base.sql b/pgmq-extension/test/sql/base.sql index fe546038..20defbe9 100644 --- a/pgmq-extension/test/sql/base.sql +++ b/pgmq-extension/test/sql/base.sql @@ -45,6 +45,17 @@ SELECT msg_id = :msg_id FROM pgmq.set_vt('test_default_queue', :msg_id, 0); -- read again, should have msg_id 1 again SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1); +SELECT pgmq.create('test_default_queue_vt'); + +-- send message with timestamp +SELECT * from pgmq.send_at('test_default_queue_vt', '{"hello": "world"}', CURRENT_TIMESTAMP + '5 seconds'::interval); + +-- read, assert no messages because we set timestamp to the future +SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue_vt', 2, 1); + +-- read again, now using poll to block until message is ready +SELECT msg_id = :msg_id FROM pgmq.read_with_poll('test_default_queue_vt', 10, 1, 10); + -- send a batch of 2 messages SELECT pgmq.create('batch_queue'); SELECT ARRAY( SELECT pgmq.send_batch( From 26ba62c166178122b943b808e7121650b300b359 Mon Sep 17 00:00:00 2001 From: Neptune Date: Fri, 18 Oct 2024 14:45:08 -0500 Subject: [PATCH 03/13] Add tests for send_batch_at --- pgmq-extension/test/expected/base.out | 23 +++++++++++++++++++++-- pgmq-extension/test/sql/base.sql | 12 ++++++++++-- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/pgmq-extension/test/expected/base.out b/pgmq-extension/test/expected/base.out index 294c5de1..5043b46d 100644 --- a/pgmq-extension/test/expected/base.out +++ b/pgmq-extension/test/expected/base.out @@ -114,7 +114,7 @@ SELECT pgmq.create('test_default_queue_vt'); (1 row) -- send message with timestamp -SELECT * from pgmq.send_at('test_default_queue_vt', '{"hello": "world"}', CURRENT_TIMESTAMP + '5 seconds'::interval); +SELECT * from pgmq.send_at('test_default_queue_vt', '{"hello": "world"}', CAST(CURRENT_TIMESTAMP + '5 seconds'::interval AS timestamp)); send_at --------- 1 @@ -149,6 +149,23 @@ SELECT ARRAY( SELECT pgmq.send_batch( t (1 row) +-- send a batch of 2 messages with timestamp +SELECT pgmq.create('batch_queue_vt'); + create +-------- + +(1 row) + +SELECT ARRAY( SELECT pgmq.send_batch_at( + 'batch_queue_vt', + ARRAY['{"hello": "world_0"}', '{"hello": "world_1"}']::jsonb[], + CAST(CURRENT_TIMESTAMP + '5 seconds'::interval AS timestamp) +)) = ARRAY[1, 2]::BIGINT[]; + ?column? +---------- + t +(1 row) + -- CREATE with 5 seconds per partition, 10 seconds retention SELECT pgmq.create_partitioned('test_duration_queue', '5 seconds', '10 seconds'); create_partitioned @@ -195,7 +212,9 @@ SELECT pgmq.drop_queue(q.queue_name, true) t t t -(4 rows) + t + t +(6 rows) SELECT queue_name FROM pgmq.list_queues(); queue_name diff --git a/pgmq-extension/test/sql/base.sql b/pgmq-extension/test/sql/base.sql index 20defbe9..2d503638 100644 --- a/pgmq-extension/test/sql/base.sql +++ b/pgmq-extension/test/sql/base.sql @@ -48,7 +48,7 @@ SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1); SELECT pgmq.create('test_default_queue_vt'); -- send message with timestamp -SELECT * from pgmq.send_at('test_default_queue_vt', '{"hello": "world"}', CURRENT_TIMESTAMP + '5 seconds'::interval); +SELECT * from pgmq.send_at('test_default_queue_vt', '{"hello": "world"}', CAST(CURRENT_TIMESTAMP + '5 seconds'::interval AS timestamp)); -- read, assert no messages because we set timestamp to the future SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue_vt', 2, 1); @@ -63,6 +63,14 @@ SELECT ARRAY( SELECT pgmq.send_batch( ARRAY['{"hello": "world_0"}', '{"hello": "world_1"}']::jsonb[] )) = ARRAY[1, 2]::BIGINT[]; +-- send a batch of 2 messages with timestamp +SELECT pgmq.create('batch_queue_vt'); +SELECT ARRAY( SELECT pgmq.send_batch_at( + 'batch_queue_vt', + ARRAY['{"hello": "world_0"}', '{"hello": "world_1"}']::jsonb[], + CAST(CURRENT_TIMESTAMP + '5 seconds'::interval AS timestamp) +)) = ARRAY[1, 2]::BIGINT[]; + -- CREATE with 5 seconds per partition, 10 seconds retention SELECT pgmq.create_partitioned('test_duration_queue', '5 seconds', '10 seconds'); @@ -314,4 +322,4 @@ SELECT pgmq.format_table_name($$single'quote-fail$$, 'a'); --Cleanup tests DROP EXTENSION pgmq CASCADE; -DROP EXTENSION pg_partman CASCADE; \ No newline at end of file +DROP EXTENSION pg_partman CASCADE; From 11d740b6f2a35193647483e3fe173d32934630a1 Mon Sep 17 00:00:00 2001 From: Neptune Date: Tue, 22 Oct 2024 20:49:36 -0500 Subject: [PATCH 04/13] Update expected tests for latest changes --- pgmq-extension/test/expected/base.out | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pgmq-extension/test/expected/base.out b/pgmq-extension/test/expected/base.out index 3dc0fae3..9fd16dde 100644 --- a/pgmq-extension/test/expected/base.out +++ b/pgmq-extension/test/expected/base.out @@ -217,6 +217,8 @@ SELECT pgmq.drop_queue(q.queue_name, true) WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead +WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead +WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead drop_queue ------------ From 6a9be8bf56bd11b5c05a503be92532d102c93fe8 Mon Sep 17 00:00:00 2001 From: Neptune Date: Tue, 22 Oct 2024 20:56:30 -0500 Subject: [PATCH 05/13] Fix typo in send_batch_at docs --- docs/api/sql/functions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/api/sql/functions.md b/docs/api/sql/functions.md index 9176b500..967fa011 100644 --- a/docs/api/sql/functions.md +++ b/docs/api/sql/functions.md @@ -106,7 +106,7 @@ select * from pgmq.send_batch('my_queue', ARRAY[ Send 1 or more messages to a queue with delay as a timestamp. ```text -pgmq.send_batch( +pgmq.send_batch_at( queue_name text, msgs jsonb[], delay timestamp From 26f25d2aa47464f761fee88fcc82a9950691b8d7 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Sun, 27 Oct 2024 10:39:12 -0300 Subject: [PATCH 06/13] Use overloading; Refactor; --- pgmq-extension/sql/pgmq.sql | 46 ++++++++++--------------------------- 1 file changed, 12 insertions(+), 34 deletions(-) diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index 12e2f440..644877f3 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -266,28 +266,17 @@ CREATE FUNCTION pgmq.send( msg JSONB, delay INTEGER DEFAULT 0 ) RETURNS SETOF BIGINT AS $$ -DECLARE - sql TEXT; - qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN - sql := FORMAT( - $QUERY$ - INSERT INTO pgmq.%I (vt, message) - VALUES ((clock_timestamp() + %L), $1) - RETURNING msg_id; - $QUERY$, - qtable, make_interval(secs => delay) - ); - RETURN QUERY EXECUTE sql USING msg; + RETURN QUERY SELECT * FROM pgmq.send(queue_name, msg, clock_timestamp() + make_interval(secs => delay)); END; $$ LANGUAGE plpgsql; -- send_at -- sends a message to a queue, with a delay as a timestamp -CREATE FUNCTION pgmq.send_at( +CREATE FUNCTION pgmq.send( queue_name TEXT, msg JSONB, - delay TIMESTAMP + delay TIMESTAMP WITH TIME ZONE ) RETURNS SETOF BIGINT AS $$ DECLARE sql TEXT; @@ -296,12 +285,12 @@ BEGIN sql := FORMAT( $QUERY$ INSERT INTO pgmq.%I (vt, message) - VALUES ((clock_timestamp() + %L), $1) + VALUES ($2, $1) RETURNING msg_id; $QUERY$, - qtable, AGE(date_trunc('second', delay), date_trunc('second', CURRENT_TIMESTAMP)) + qtable ); - RETURN QUERY EXECUTE sql USING msg; + RETURN QUERY EXECUTE sql USING msg, delay; END; $$ LANGUAGE plpgsql; @@ -312,28 +301,17 @@ CREATE FUNCTION pgmq.send_batch( msgs JSONB[], delay INTEGER DEFAULT 0 ) RETURNS SETOF BIGINT AS $$ -DECLARE - sql TEXT; - qtable TEXT := pgmq.format_table_name(queue_name, 'q'); BEGIN - sql := FORMAT( - $QUERY$ - INSERT INTO pgmq.%I (vt, message) - SELECT clock_timestamp() + %L, unnest($1) - RETURNING msg_id; - $QUERY$, - qtable, make_interval(secs => delay) - ); - RETURN QUERY EXECUTE sql USING msgs; + RETURN QUERY EXECUTE pgmq.send_batch(queue_name, msgs, clock_timestamp() + make_interval(secs => delay)); END; $$ LANGUAGE plpgsql; -- send_batch_at -- sends an array of list of messages to a queue, with a delay as a timestamp -CREATE FUNCTION pgmq.send_batch_at( +CREATE FUNCTION pgmq.send_batch( queue_name TEXT, msgs JSONB[], - delay TIMESTAMP + delay TIMESTAMP WITH TIME ZONE ) RETURNS SETOF BIGINT AS $$ DECLARE sql TEXT; @@ -342,12 +320,12 @@ BEGIN sql := FORMAT( $QUERY$ INSERT INTO pgmq.%I (vt, message) - SELECT clock_timestamp() + %L, unnest($1) + SELECT $2, unnest($1) RETURNING msg_id; $QUERY$, - qtable, AGE(date_trunc('second', delay), date_trunc('second', CURRENT_TIMESTAMP)) + qtable ); - RETURN QUERY EXECUTE sql USING msgs; + RETURN QUERY EXECUTE sql USING msgs, delay; END; $$ LANGUAGE plpgsql; From 9bc6be04bc1dab69057c0021d8991dbe0a02d258 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Sun, 27 Oct 2024 10:48:46 -0300 Subject: [PATCH 07/13] Fix tests --- pgmq-extension/test/expected/base.out | 4 ++-- pgmq-extension/test/sql/base.sql | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pgmq-extension/test/expected/base.out b/pgmq-extension/test/expected/base.out index 9fd16dde..01ca2f7d 100644 --- a/pgmq-extension/test/expected/base.out +++ b/pgmq-extension/test/expected/base.out @@ -114,8 +114,8 @@ SELECT pgmq.create('test_default_queue_vt'); (1 row) -- send message with timestamp -SELECT * from pgmq.send_at('test_default_queue_vt', '{"hello": "world"}', CAST(CURRENT_TIMESTAMP + '5 seconds'::interval AS timestamp)); - send_at +SELECT * from pgmq.send_at('test_default_queue_vt', '{"hello": "world"}', CURRENT_TIMESTAMP + '5 seconds'::interval); + send --------- 1 (1 row) diff --git a/pgmq-extension/test/sql/base.sql b/pgmq-extension/test/sql/base.sql index f40a915e..2d78c989 100644 --- a/pgmq-extension/test/sql/base.sql +++ b/pgmq-extension/test/sql/base.sql @@ -48,7 +48,7 @@ SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue', 2, 1); SELECT pgmq.create('test_default_queue_vt'); -- send message with timestamp -SELECT * from pgmq.send_at('test_default_queue_vt', '{"hello": "world"}', CAST(CURRENT_TIMESTAMP + '5 seconds'::interval AS timestamp)); +SELECT * from pgmq.send('test_default_queue_vt', '{"hello": "world"}', CURRENT_TIMESTAMP + '5 seconds'::interval); -- read, assert no messages because we set timestamp to the future SELECT msg_id = :msg_id FROM pgmq.read('test_default_queue_vt', 2, 1); From 70244a0b7866acdaaf11bce95b6ef5db774dd746 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Sun, 27 Oct 2024 11:02:48 -0300 Subject: [PATCH 08/13] fix tests --- pgmq-extension/sql/pgmq.sql | 2 +- pgmq-extension/test/expected/base.out | 19 +++++++++++-------- pgmq-extension/test/sql/base.sql | 4 ++-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/pgmq-extension/sql/pgmq.sql b/pgmq-extension/sql/pgmq.sql index 091060d1..303b2b76 100644 --- a/pgmq-extension/sql/pgmq.sql +++ b/pgmq-extension/sql/pgmq.sql @@ -310,7 +310,7 @@ CREATE FUNCTION pgmq.send_batch( delay INTEGER DEFAULT 0 ) RETURNS SETOF BIGINT AS $$ BEGIN - RETURN QUERY EXECUTE pgmq.send_batch(queue_name, msgs, clock_timestamp() + make_interval(secs => delay)); + RETURN QUERY SELECT * FROM pgmq.send_batch(queue_name, msgs, clock_timestamp() + make_interval(secs => delay)); END; $$ LANGUAGE plpgsql; diff --git a/pgmq-extension/test/expected/base.out b/pgmq-extension/test/expected/base.out index 8ed95464..34ae005a 100644 --- a/pgmq-extension/test/expected/base.out +++ b/pgmq-extension/test/expected/base.out @@ -136,10 +136,10 @@ SELECT pgmq.create('test_default_queue_vt'); (1 row) -- send message with timestamp -SELECT * from pgmq.send_at('test_default_queue_vt', '{"hello": "world"}', CURRENT_TIMESTAMP + '5 seconds'::interval); +SELECT * from pgmq.send('test_default_queue_vt', '{"hello": "world"}', CURRENT_TIMESTAMP + '5 seconds'::interval); send ---------- - 1 +------ + 1 (1 row) -- read, assert no messages because we set timestamp to the future @@ -178,10 +178,10 @@ SELECT pgmq.create('batch_queue_vt'); (1 row) -SELECT ARRAY( SELECT pgmq.send_batch_at( +SELECT ARRAY( SELECT pgmq.send_batch( 'batch_queue_vt', ARRAY['{"hello": "world_0"}', '{"hello": "world_1"}']::jsonb[], - CAST(CURRENT_TIMESTAMP + '5 seconds'::interval AS timestamp) + CURRENT_TIMESTAMP + '5 seconds'::interval )) = ARRAY[1, 2]::BIGINT[]; ?column? ---------- @@ -244,7 +244,7 @@ SELECT queue_name, queue_length, newest_msg_age_sec, oldest_msg_age_sec, total_m SELECT COUNT(1) from pgmq.metrics_all(); count ------- - 7 + 9 (1 row) -- delete all the queues @@ -273,7 +273,8 @@ WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead - +WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead +WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed in PGMQ v2.0. Use drop_queue(queue_name) instead drop_queue ------------ t @@ -281,7 +282,9 @@ WARNING: drop_queue(queue_name, partitioned) is deprecated and will be removed t t t -(5 rows) + t + t +(7 rows) SELECT queue_name FROM pgmq.list_queues(); queue_name diff --git a/pgmq-extension/test/sql/base.sql b/pgmq-extension/test/sql/base.sql index 9020743c..61e1ae5f 100644 --- a/pgmq-extension/test/sql/base.sql +++ b/pgmq-extension/test/sql/base.sql @@ -75,10 +75,10 @@ SELECT ARRAY( SELECT pgmq.send_batch( -- send a batch of 2 messages with timestamp SELECT pgmq.create('batch_queue_vt'); -SELECT ARRAY( SELECT pgmq.send_batch_at( +SELECT ARRAY( SELECT pgmq.send_batch( 'batch_queue_vt', ARRAY['{"hello": "world_0"}', '{"hello": "world_1"}']::jsonb[], - CAST(CURRENT_TIMESTAMP + '5 seconds'::interval AS timestamp) + CURRENT_TIMESTAMP + '5 seconds'::interval )) = ARRAY[1, 2]::BIGINT[]; -- CREATE with 5 seconds per partition, 10 seconds retention From bd0078ce42c1e1daecda0e03227e661294356b1f Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Sun, 27 Oct 2024 11:34:55 -0300 Subject: [PATCH 09/13] Fix documentation/remove dead functions --- docs/api/sql/functions.md | 157 ++++++++++++++++---------------------- 1 file changed, 65 insertions(+), 92 deletions(-) diff --git a/docs/api/sql/functions.md b/docs/api/sql/functions.md index 8464d845..8819d174 100644 --- a/docs/api/sql/functions.md +++ b/docs/api/sql/functions.md @@ -22,48 +22,27 @@ RETURNS SETOF bigint | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msg | jsonb | The message to send to the queue | -| delay | integer | Time in seconds before the message becomes visible. Defaults to 0. | +| delay | integer/timestampz | Time in seconds before the message becomes visible, or a timestamp of when it becomes visible. Defaults to 0. | Example: ```sql select * from pgmq.send('my_queue', '{"hello": "world"}'); - send + send ------ 4 -``` - ---- - -### send_at - -Send a single message to a queue with delay as a timestamp. - -```text -pgmq.send_at( - queue_name text, - msg jsonb, - delay timestamp -) - -RETURNS SETOF bigint -``` - -**Parameters:** - -| Parameter | Type | Description | -| :--- | :---- | :--- | -| queue_name | text | The name of the queue | -| msg | jsonb | The message to send to the queue | -| delay | timestamp | Timestamp until the message becomes visible. | -Example: +-- Message with a delay of 5 seconds +select * from pgmq.send('my_queue', '{"hello": "world"}', 5); + send +------ + 5 -```sql -select * from pgmq.send_at('my_queue', '{"hello": "world"}', CURRENT_TIMESTAMP); - send_at ---------- - 4 +-- Message readable from tomorrow +select * from pgmq.send('my_queue', '{"hello": "world"}', CURRENT_TIMESTAMP + INTERVAL '1 day'); + send +------ + 6 ``` --- @@ -86,51 +65,45 @@ RETURNS SETOF bigint | :--- | :---- | :--- | | queue_name | text | The name of the queue | | msgs | jsonb[] | Array of messages to send to the queue | -| delay | integer | Time in seconds before the messages becomes visible. Defaults to 0. | +| delay | integer/timestampz | Time in seconds before the messages becomes visible, or a timestamp of when it becomes visible. Defaults to 0. | ```sql -select * from pgmq.send_batch('my_queue', ARRAY[ - '{"hello": "world_0"}'::jsonb, - '{"hello": "world_1"}'::jsonb] +select * from pgmq.send_batch('my_queue', + ARRAY[ + '{"hello": "world_0"}', + '{"hello": "world_1"}' + ]::jsonb[] ); - send_batch + send_batch ------------ 1 2 -``` - ---- - -### send_batch_at - -Send 1 or more messages to a queue with delay as a timestamp. - -```text -pgmq.send_batch_at( - queue_name text, - msgs jsonb[], - delay timestamp -) -RETURNS SETOF bigint -``` -**Parameters:** - -| Parameter | Type | Description | -| :--- | :---- | :--- | -| queue_name | text | The name of the queue | -| msgs | jsonb[] | Array of messages to send to the queue | -| delay | timestamp | Timestamp until the messages become visible. | -```sql -select * from pgmq.send_batch_at('my_queue', ARRAY[ - '{"hello": "world_0"}'::jsonb, - '{"hello": "world_1"}'::jsonb], - CURRENT_TIMESTAMP +-- Message with a delay of 5 seconds +select * from pgmq.send_batch('my_queue', + ARRAY[ + '{"hello": "world_0"}', + '{"hello": "world_1"}' + ]::jsonb[], + 5 ); - send_batch_at ---------------- - 1 - 2 + send_batch +------------ + 3 + 4 + +-- Message readable from tomorrow +select * from pgmq.send_batch('my_queue', + ARRAY[ + '{"hello": "world_0"}', + '{"hello": "world_1"}' + ]::jsonb[], + 5 +); + send_batch +------------ + 6 + 7 ``` --- @@ -168,7 +141,7 @@ Read messages from a queue ```sql select * from pgmq.read('my_queue', 10, 2); - msg_id | read_ct | enqueued_at | vt | message + msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+---------------------- 1 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608922-05 | {"hello": "world_0"} 2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"} @@ -179,7 +152,7 @@ Read a message from a queue with message filtering ```sql select * from pgmq.read('my_queue', 10, 2, '{"hello": "world_1"}'); - msg_id | read_ct | enqueued_at | vt | message + msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+---------------------- 2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"} (1 row) @@ -222,7 +195,7 @@ Example: ```sql select * from pgmq.read_with_poll('my_queue', 1, 1, 5, 100); - msg_id | read_ct | enqueued_at | vt | message + msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+-------------------- 1 | 1 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"} ``` @@ -252,7 +225,7 @@ Example: ```sql pgmq=# select * from pgmq.pop('my_queue'); - msg_id | read_ct | enqueued_at | vt | message + msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+-------------------- 1 | 2 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"} ``` @@ -281,7 +254,7 @@ Example: ```sql select pgmq.delete('my_queue', 5); - delete + delete -------- t ``` @@ -310,7 +283,7 @@ Delete two messages that exist. ```sql select * from pgmq.delete('my_queue', ARRAY[2, 3]); - delete + delete -------- 2 3 @@ -320,7 +293,7 @@ Delete two messages, one that exists and one that does not. Message `999` does n ```sql select * from pgmq.delete('my_queue', ARRAY[6, 999]); - delete + delete -------- 6 ``` @@ -347,8 +320,8 @@ Example: Purge the queue when it contains 8 messages; ```sql -select * from pgmq.purge_queue('my_queue'); - purge_queue +select * from pgmq.purge_queue('my_queue'); + purge_queue ------------- 8 ``` @@ -378,7 +351,7 @@ Example; remove message with ID 1 from queue `my_queue` and archive it: ```sql SELECT * FROM pgmq.archive('my_queue', 1); - archive + archive --------- t ``` @@ -408,7 +381,7 @@ Delete messages with ID 1 and 2 from queue `my_queue` and move to the archive. ```sql SELECT * FROM pgmq.archive('my_queue', ARRAY[1, 2]); - archive + archive --------- 1 2 @@ -418,7 +391,7 @@ Delete messages 4, which exists and 999, which does not exist. ```sql select * from pgmq.archive('my_queue', ARRAY[4, 999]); - archive + archive --------- 4 ``` @@ -446,7 +419,7 @@ Example: ```sql select from pgmq.create('my_queue'); - create + create -------- ``` @@ -483,7 +456,7 @@ select from pgmq.create_partitioned( '100000', '10000000' ); - create_partitioned + create_partitioned -------------------- ``` @@ -491,7 +464,7 @@ select from pgmq.create_partitioned( ### create_unlogged -Creates an unlogged table. This is useful when write throughput is more important that durability. +Creates an unlogged table. This is useful when write throughput is more important that durability. See Postgres documentation for [unlogged tables](https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-UNLOGGED) for more information. ```text @@ -509,7 +482,7 @@ Example: ```sql select pgmq.create_unlogged('my_unlogged'); - create_unlogged + create_unlogged ----------------- ``` @@ -534,7 +507,7 @@ Example: ```sql select * from pgmq.detach_archive('my_queue'); - detach_archive + detach_archive ---------------- ``` @@ -559,7 +532,7 @@ Example: ```sql select * from pgmq.drop_queue('my_unlogged'); - drop_queue + drop_queue ------------ t ``` @@ -593,7 +566,7 @@ Set the visibility timeout of message 1 to 30 seconds from now. ```sql select * from pgmq.set_vt('my_queue', 11, 30); - msg_id | read_ct | enqueued_at | vt | message + msg_id | read_ct | enqueued_at | vt | message --------+---------+-------------------------------+-------------------------------+---------------------- 1 | 0 | 2023-10-28 19:42:21.778741-05 | 2023-10-28 19:59:34.286462-05 | {"hello": "world_0"} ``` @@ -618,7 +591,7 @@ Example: ```sql select * from pgmq.list_queues(); - queue_name | created_at | is_partitioned | is_unlogged + queue_name | created_at | is_partitioned | is_unlogged ----------------------+-------------------------------+----------------+------------- my_queue | 2023-10-28 14:13:17.092576-05 | f | f my_partitioned_queue | 2023-10-28 19:47:37.098692-05 | t | f @@ -664,7 +637,7 @@ Example: ```sql select * from pgmq.metrics('my_queue'); - queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time + queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time ------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2445 | 2447 | 35 | 2023-10-28 20:23:08.406259-05 ``` @@ -700,7 +673,7 @@ RETURNS TABLE( ```sql select * from pgmq.metrics_all(); - queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time + queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time ----------------------+--------------+--------------------+--------------------+----------------+------------------------------- my_queue | 16 | 2563 | 2565 | 35 | 2023-10-28 20:25:07.016413-05 my_partitioned_queue | 1 | 11 | 11 | 1 | 2023-10-28 20:25:07.016413-05 From 4ccc4213753d06b0b2622eff5e666009ead5a7ed Mon Sep 17 00:00:00 2001 From: Neptune Date: Sun, 27 Oct 2024 18:16:27 -0500 Subject: [PATCH 10/13] Use overloaded functions in Python extension --- tembo-pgmq-python/tembo_pgmq_python/queue.py | 49 ++++++++++---------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/tembo-pgmq-python/tembo_pgmq_python/queue.py b/tembo-pgmq-python/tembo_pgmq_python/queue.py index 2c61a950..9d34b67b 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/queue.py +++ b/tembo-pgmq-python/tembo_pgmq_python/queue.py @@ -112,37 +112,38 @@ def list_queues(self, conn=None) -> List[str]: return [row[0] for row in rows] @transaction - def send(self, queue: str, message: dict, delay: int = 0, conn=None) -> int: + def send(self, queue: str, message: dict, delay: int = 0, timestamp: str = None, conn=None) -> int: """Send a message to a queue.""" self.logger.debug(f"send called with conn: {conn}") - query = "select * from pgmq.send(%s, %s, %s);" - result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn) - return result[0][0] - - @transaction - def send_at(self, queue: str, message: dict, delay: str, conn=None) -> int: - """Send a message to a queue with timestamp.""" - self.logger.debug(f"send called with conn: {conn}") - query = "select * from pgmq.send_at(%s, %s, %s);" - result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn) + result = None + if delay: + query = "select * from pgmq.send(%s, %s, %s);" + result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn) + elif timestamp: + query = "select * from pgmq.send(%s, %s, %s);" + result = self._execute_query_with_result(query, [queue, Jsonb(message), timestamp], conn=conn) + else: + query = "select * from pgmq.send(%s, %s);" + result = self._execute_query_with_result(query, [queue, Jsonb(message)], conn=conn) return result[0][0] @transaction - def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None) -> List[int]: + def send_batch(self, queue: str, messages: List[dict], delay: int = 0, timestamp: str = None, conn=None) -> List[int]: """Send a batch of messages to a queue.""" self.logger.debug(f"send_batch called with conn: {conn}") - query = "select * from pgmq.send_batch(%s, %s, %s);" - params = [queue, [Jsonb(message) for message in messages], delay] - result = self._execute_query_with_result(query, params, conn=conn) - return [message[0] for message in result] - - @transaction - def send_batch_at(self, queue: str, messages: List[dict], delay: str, conn=None) -> List[int]: - """Send a batch of messages to a queue with timestamp.""" - self.logger.debug(f"send_batch called with conn: {conn}") - query = "select * from pgmq.send_batch_at(%s, %s, %s);" - params = [queue, [Jsonb(message) for message in messages], delay] - result = self._execute_query_with_result(query, params, conn=conn) + result = None + if delay: + query = "select * from pgmq.send_batch(%s, %s, %s);" + params = [queue, [Jsonb(message) for message in messages], delay] + result = self._execute_query_with_result(query, params, conn=conn) + elif timestamp: + query = "select * from pgmq.send_batch(%s, %s, %s);" + params = [queue, [Jsonb(message) for message in messages], timestamp] + result = self._execute_query_with_result(query, params, conn=conn) + else: + query = "select * from pgmq.send_batch(%s, %s);" + params = [queue, [Jsonb(message) for message in messages]] + result = self._execute_query_with_result(query, params, conn=conn) return [message[0] for message in result] @transaction From 2f05a02f66e46352a799b9bda682ecf6dc89c43f Mon Sep 17 00:00:00 2001 From: Neptune Date: Sun, 27 Oct 2024 18:24:34 -0500 Subject: [PATCH 11/13] Change timestamp to tz --- tembo-pgmq-python/tembo_pgmq_python/queue.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tembo-pgmq-python/tembo_pgmq_python/queue.py b/tembo-pgmq-python/tembo_pgmq_python/queue.py index 9d34b67b..b53fdcf1 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/queue.py +++ b/tembo-pgmq-python/tembo_pgmq_python/queue.py @@ -112,23 +112,23 @@ def list_queues(self, conn=None) -> List[str]: return [row[0] for row in rows] @transaction - def send(self, queue: str, message: dict, delay: int = 0, timestamp: str = None, conn=None) -> int: + def send(self, queue: str, message: dict, delay: int = 0, tz: str = None, conn=None) -> int: """Send a message to a queue.""" self.logger.debug(f"send called with conn: {conn}") result = None if delay: query = "select * from pgmq.send(%s, %s, %s);" result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn) - elif timestamp: + elif tz: query = "select * from pgmq.send(%s, %s, %s);" - result = self._execute_query_with_result(query, [queue, Jsonb(message), timestamp], conn=conn) + result = self._execute_query_with_result(query, [queue, Jsonb(message), tz], conn=conn) else: query = "select * from pgmq.send(%s, %s);" result = self._execute_query_with_result(query, [queue, Jsonb(message)], conn=conn) return result[0][0] @transaction - def send_batch(self, queue: str, messages: List[dict], delay: int = 0, timestamp: str = None, conn=None) -> List[int]: + def send_batch(self, queue: str, messages: List[dict], delay: int = 0, tz: str = None, conn=None) -> List[int]: """Send a batch of messages to a queue.""" self.logger.debug(f"send_batch called with conn: {conn}") result = None @@ -136,9 +136,9 @@ def send_batch(self, queue: str, messages: List[dict], delay: int = 0, timestamp query = "select * from pgmq.send_batch(%s, %s, %s);" params = [queue, [Jsonb(message) for message in messages], delay] result = self._execute_query_with_result(query, params, conn=conn) - elif timestamp: + elif tz: query = "select * from pgmq.send_batch(%s, %s, %s);" - params = [queue, [Jsonb(message) for message in messages], timestamp] + params = [queue, [Jsonb(message) for message in messages], tz] result = self._execute_query_with_result(query, params, conn=conn) else: query = "select * from pgmq.send_batch(%s, %s);" From 7ff6cbc7401a02e0c6753c0285b6a3c154c105c7 Mon Sep 17 00:00:00 2001 From: Neptune Date: Sun, 27 Oct 2024 18:37:50 -0500 Subject: [PATCH 12/13] Use overloaded functions in async Python extension --- .../tembo_pgmq_python/async_queue.py | 124 ++++++++---------- 1 file changed, 57 insertions(+), 67 deletions(-) diff --git a/tembo-pgmq-python/tembo_pgmq_python/async_queue.py b/tembo-pgmq-python/tembo_pgmq_python/async_queue.py index 731a2fd2..e78a0827 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/async_queue.py +++ b/tembo-pgmq-python/tembo_pgmq_python/async_queue.py @@ -155,89 +155,79 @@ async def _list_queues_internal(self, conn): return queues @transaction - async def send(self, queue: str, message: dict, delay: int = 0, conn=None) -> int: + async def send(self, queue: str, message: dict, delay: int = 0, tz: str = None, conn=None) -> int: """Send a message to a queue.""" - self.logger.debug(f"send called with queue='{queue}', message={message}, delay={delay}, conn={conn}") + self.logger.debug(f"send called with queue='{queue}', message={message}, delay={delay}, tz={tz}, conn={conn}") if conn is None: async with self.pool.acquire() as conn: - return await self._send_internal(queue, message, delay, conn) + return await self._send_internal(queue, message, delay, tz, conn) else: - return await self._send_internal(queue, message, delay, conn) - - async def _send_internal(self, queue, message, delay, conn): - self.logger.debug(f"Sending message to queue '{queue}' with delay={delay}") - result = await conn.fetchrow( - "SELECT * FROM pgmq.send($1, $2::jsonb, $3);", - queue, - dumps(message).decode("utf-8"), - delay, - ) - self.logger.debug(f"Message sent with msg_id={result[0]}") - return result[0] - - @transaction - async def send_at(self, queue: str, message: dict, delay: str, conn=None) -> int: - """Send a message to a queue with timestamp.""" - self.logger.debug(f"send_at called with queue='{queue}', message={message}, delay={delay}, conn={conn}") - if conn is None: - async with self.pool.acquire() as conn: - return await self._send_at_internal(queue, message, delay, conn) + return await self._send_internal(queue, message, delay, tz, conn) + + async def _send_internal(self, queue, message, delay, tz, conn): + self.logger.debug(f"Sending message to queue '{queue}' with delay={delay}, tz={tz}") + result = None + if delay: + result = await conn.fetchrow( + "SELECT * FROM pgmq.send($1, $2::jsonb, $3);", + queue, + dumps(message).decode("utf-8"), + delay, + ) + elif tz: + result = await conn.fetchrow( + "SELECT * FROM pgmq.send($1, $2::jsonb, $3);", + queue, + dumps(message).decode("utf-8"), + tz, + ) else: - return await self._send_at_internal(queue, message, delay, conn) - - async def _send_at_internal(self, queue, message, delay, conn): - self.logger.debug(f"Sending message to queue '{queue}' with delay={delay}") - result = await conn.fetchrow( - "SELECT * FROM pgmq.send_at($1, $2::jsonb, $3);", - queue, - dumps(message).decode("utf-8"), - delay, - ) + result = await conn.fetchrow( + "SELECT * FROM pgmq.send($1, $2::jsonb);", + queue, + dumps(message).decode("utf-8"), + ) self.logger.debug(f"Message sent with msg_id={result[0]}") return result[0] @transaction - async def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None) -> List[int]: + async def send_batch( + self, queue: str, messages: List[dict], delay: int = 0, tz: str = None, conn=None + ) -> List[int]: """Send a batch of messages to a queue.""" - self.logger.debug(f"send_batch called with queue='{queue}', messages={messages}, delay={delay}, conn={conn}") - if conn is None: - async with self.pool.acquire() as conn: - return await self._send_batch_internal(queue, messages, delay, conn) - else: - return await self._send_batch_internal(queue, messages, delay, conn) - - async def _send_batch_internal(self, queue, messages, delay, conn): - self.logger.debug(f"Sending batch of messages to queue '{queue}' with delay={delay}") - jsonb_array = [dumps(message).decode("utf-8") for message in messages] - result = await conn.fetch( - "SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3);", - queue, - jsonb_array, - delay, + self.logger.debug( + f"send_batch called with queue='{queue}', messages={messages}, delay={delay}, tz={tz}, conn={conn}" ) - msg_ids = [message[0] for message in result] - self.logger.debug(f"Batch messages sent with msg_ids={msg_ids}") - return msg_ids - - @transaction - async def send_batch_at(self, queue: str, messages: List[dict], delay: str, conn=None) -> List[int]: - """Send a batch of messages to a queue with timestamp.""" - self.logger.debug(f"send_batch_at called with queue='{queue}', messages={messages}, delay={delay}, conn={conn}") if conn is None: async with self.pool.acquire() as conn: - return await self._send_batch_at_internal(queue, messages, delay, conn) + return await self._send_batch_internal(queue, messages, delay, tz, conn) else: - return await self._send_batch_at_internal(queue, messages, delay, conn) + return await self._send_batch_internal(queue, messages, delay, tz, conn) - async def _send_batch_at_internal(self, queue, messages, delay, conn): - self.logger.debug(f"Sending batch of messages to queue '{queue}' with delay={delay}") + async def _send_batch_internal(self, queue, messages, delay, tz, conn): + self.logger.debug(f"Sending batch of messages to queue '{queue}' with delay={delay}, tz={tz}") jsonb_array = [dumps(message).decode("utf-8") for message in messages] - result = await conn.fetch( - "SELECT * FROM pgmq.send_batch_at($1, $2::jsonb[], $3);", - queue, - jsonb_array, - delay, - ) + result = None + if delay: + result = await conn.fetch( + "SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3);", + queue, + jsonb_array, + delay, + ) + elif tz: + result = await conn.fetch( + "SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3);", + queue, + jsonb_array, + tz, + ) + else: + result = await conn.fetch( + "SELECT * FROM pgmq.send_batch($1, $2::jsonb[]);", + queue, + jsonb_array, + ) msg_ids = [message[0] for message in result] self.logger.debug(f"Batch messages sent with msg_ids={msg_ids}") return msg_ids From ac0443c5fc90a97deb911e77b98b26b21c79e42a Mon Sep 17 00:00:00 2001 From: Neptune Date: Mon, 28 Oct 2024 14:28:05 -0500 Subject: [PATCH 13/13] Restore Python file changes --- .../tembo_pgmq_python/async_queue.py | 84 ++++++------------- tembo-pgmq-python/tembo_pgmq_python/queue.py | 32 ++----- 2 files changed, 32 insertions(+), 84 deletions(-) diff --git a/tembo-pgmq-python/tembo_pgmq_python/async_queue.py b/tembo-pgmq-python/tembo_pgmq_python/async_queue.py index e78a0827..f3137820 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/async_queue.py +++ b/tembo-pgmq-python/tembo_pgmq_python/async_queue.py @@ -155,79 +155,45 @@ async def _list_queues_internal(self, conn): return queues @transaction - async def send(self, queue: str, message: dict, delay: int = 0, tz: str = None, conn=None) -> int: + async def send(self, queue: str, message: dict, delay: int = 0, conn=None) -> int: """Send a message to a queue.""" - self.logger.debug(f"send called with queue='{queue}', message={message}, delay={delay}, tz={tz}, conn={conn}") + self.logger.debug(f"send called with queue='{queue}', message={message}, delay={delay}, conn={conn}") if conn is None: async with self.pool.acquire() as conn: - return await self._send_internal(queue, message, delay, tz, conn) + return await self._send_internal(queue, message, delay, conn) else: - return await self._send_internal(queue, message, delay, tz, conn) - - async def _send_internal(self, queue, message, delay, tz, conn): - self.logger.debug(f"Sending message to queue '{queue}' with delay={delay}, tz={tz}") - result = None - if delay: - result = await conn.fetchrow( - "SELECT * FROM pgmq.send($1, $2::jsonb, $3);", - queue, - dumps(message).decode("utf-8"), - delay, - ) - elif tz: - result = await conn.fetchrow( - "SELECT * FROM pgmq.send($1, $2::jsonb, $3);", - queue, - dumps(message).decode("utf-8"), - tz, - ) - else: - result = await conn.fetchrow( - "SELECT * FROM pgmq.send($1, $2::jsonb);", - queue, - dumps(message).decode("utf-8"), - ) + return await self._send_internal(queue, message, delay, conn) + + async def _send_internal(self, queue, message, delay, conn): + self.logger.debug(f"Sending message to queue '{queue}' with delay={delay}") + result = await conn.fetchrow( + "SELECT * FROM pgmq.send($1, $2::jsonb, $3);", + queue, + dumps(message).decode("utf-8"), + delay, + ) self.logger.debug(f"Message sent with msg_id={result[0]}") return result[0] @transaction - async def send_batch( - self, queue: str, messages: List[dict], delay: int = 0, tz: str = None, conn=None - ) -> List[int]: + async def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None) -> List[int]: """Send a batch of messages to a queue.""" - self.logger.debug( - f"send_batch called with queue='{queue}', messages={messages}, delay={delay}, tz={tz}, conn={conn}" - ) + self.logger.debug(f"send_batch called with queue='{queue}', messages={messages}, delay={delay}, conn={conn}") if conn is None: async with self.pool.acquire() as conn: - return await self._send_batch_internal(queue, messages, delay, tz, conn) + return await self._send_batch_internal(queue, messages, delay, conn) else: - return await self._send_batch_internal(queue, messages, delay, tz, conn) + return await self._send_batch_internal(queue, messages, delay, conn) - async def _send_batch_internal(self, queue, messages, delay, tz, conn): - self.logger.debug(f"Sending batch of messages to queue '{queue}' with delay={delay}, tz={tz}") + async def _send_batch_internal(self, queue, messages, delay, conn): + self.logger.debug(f"Sending batch of messages to queue '{queue}' with delay={delay}") jsonb_array = [dumps(message).decode("utf-8") for message in messages] - result = None - if delay: - result = await conn.fetch( - "SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3);", - queue, - jsonb_array, - delay, - ) - elif tz: - result = await conn.fetch( - "SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3);", - queue, - jsonb_array, - tz, - ) - else: - result = await conn.fetch( - "SELECT * FROM pgmq.send_batch($1, $2::jsonb[]);", - queue, - jsonb_array, - ) + result = await conn.fetch( + "SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3);", + queue, + jsonb_array, + delay, + ) msg_ids = [message[0] for message in result] self.logger.debug(f"Batch messages sent with msg_ids={msg_ids}") return msg_ids diff --git a/tembo-pgmq-python/tembo_pgmq_python/queue.py b/tembo-pgmq-python/tembo_pgmq_python/queue.py index b53fdcf1..f4a56e83 100644 --- a/tembo-pgmq-python/tembo_pgmq_python/queue.py +++ b/tembo-pgmq-python/tembo_pgmq_python/queue.py @@ -112,38 +112,20 @@ def list_queues(self, conn=None) -> List[str]: return [row[0] for row in rows] @transaction - def send(self, queue: str, message: dict, delay: int = 0, tz: str = None, conn=None) -> int: + def send(self, queue: str, message: dict, delay: int = 0, conn=None) -> int: """Send a message to a queue.""" self.logger.debug(f"send called with conn: {conn}") - result = None - if delay: - query = "select * from pgmq.send(%s, %s, %s);" - result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn) - elif tz: - query = "select * from pgmq.send(%s, %s, %s);" - result = self._execute_query_with_result(query, [queue, Jsonb(message), tz], conn=conn) - else: - query = "select * from pgmq.send(%s, %s);" - result = self._execute_query_with_result(query, [queue, Jsonb(message)], conn=conn) + query = "select * from pgmq.send(%s, %s, %s);" + result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn) return result[0][0] @transaction - def send_batch(self, queue: str, messages: List[dict], delay: int = 0, tz: str = None, conn=None) -> List[int]: + def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None) -> List[int]: """Send a batch of messages to a queue.""" self.logger.debug(f"send_batch called with conn: {conn}") - result = None - if delay: - query = "select * from pgmq.send_batch(%s, %s, %s);" - params = [queue, [Jsonb(message) for message in messages], delay] - result = self._execute_query_with_result(query, params, conn=conn) - elif tz: - query = "select * from pgmq.send_batch(%s, %s, %s);" - params = [queue, [Jsonb(message) for message in messages], tz] - result = self._execute_query_with_result(query, params, conn=conn) - else: - query = "select * from pgmq.send_batch(%s, %s);" - params = [queue, [Jsonb(message) for message in messages]] - result = self._execute_query_with_result(query, params, conn=conn) + query = "select * from pgmq.send_batch(%s, %s, %s);" + params = [queue, [Jsonb(message) for message in messages], delay] + result = self._execute_query_with_result(query, params, conn=conn) return [message[0] for message in result] @transaction