Skip to content

Commit

Permalink
Add timestamp support in send and send_batch (#320)
Browse files Browse the repository at this point in the history

Co-authored-by: v0idpwn <[email protected]>
Co-authored-by: felipe stival <[email protected]>
  • Loading branch information
3 people authored Oct 28, 2024
1 parent 762b068 commit d2a0021
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 37 deletions.
98 changes: 69 additions & 29 deletions docs/api/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,27 @@ RETURNS SETOF bigint
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| msg | jsonb | The message to send to the queue |
| delay | integer | Time in seconds before the message becomes visible. Defaults to 0. |
| delay | integer/timestampz | Time in seconds before the message becomes visible, or a timestamp of when it becomes visible. Defaults to 0. |

Example:

```sql
select * from pgmq.send('my_queue', '{"hello": "world"}');
send
send
------
4

-- Message with a delay of 5 seconds
select * from pgmq.send('my_queue', '{"hello": "world"}', 5);
send
------
5

-- Message readable from tomorrow
select * from pgmq.send('my_queue', '{"hello": "world"}', CURRENT_TIMESTAMP + INTERVAL '1 day');
send
------
6
```

---
Expand All @@ -53,17 +65,45 @@ RETURNS SETOF bigint
| :--- | :---- | :--- |
| queue_name | text | The name of the queue |
| msgs | jsonb[] | Array of messages to send to the queue |
| delay | integer | Time in seconds before the messages becomes visible. Defaults to 0. |
| delay | integer/timestampz | Time in seconds before the messages becomes visible, or a timestamp of when it becomes visible. Defaults to 0. |

```sql
select * from pgmq.send_batch('my_queue', ARRAY[
'{"hello": "world_0"}'::jsonb,
'{"hello": "world_1"}'::jsonb]
select * from pgmq.send_batch('my_queue',
ARRAY[
'{"hello": "world_0"}',
'{"hello": "world_1"}'
]::jsonb[]
);
send_batch
send_batch
------------
1
2

-- Message with a delay of 5 seconds
select * from pgmq.send_batch('my_queue',
ARRAY[
'{"hello": "world_0"}',
'{"hello": "world_1"}'
]::jsonb[],
5
);
send_batch
------------
3
4

-- Message readable from tomorrow
select * from pgmq.send_batch('my_queue',
ARRAY[
'{"hello": "world_0"}',
'{"hello": "world_1"}'
]::jsonb[],
5
);
send_batch
------------
6
7
```

---
Expand Down Expand Up @@ -101,7 +141,7 @@ Read messages from a queue

```sql
select * from pgmq.read('my_queue', 10, 2);
msg_id | read_ct | enqueued_at | vt | message
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+----------------------
1 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608922-05 | {"hello": "world_0"}
2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"}
Expand All @@ -112,7 +152,7 @@ Read a message from a queue with message filtering

```sql
select * from pgmq.read('my_queue', 10, 2, '{"hello": "world_1"}');
msg_id | read_ct | enqueued_at | vt | message
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+----------------------
2 | 1 | 2023-10-28 19:14:47.356595-05 | 2023-10-28 19:17:08.608974-05 | {"hello": "world_1"}
(1 row)
Expand Down Expand Up @@ -155,7 +195,7 @@ Example:

```sql
select * from pgmq.read_with_poll('my_queue', 1, 1, 5, 100);
msg_id | read_ct | enqueued_at | vt | message
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+--------------------
1 | 1 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}
```
Expand Down Expand Up @@ -185,7 +225,7 @@ Example:

```sql
pgmq=# select * from pgmq.pop('my_queue');
msg_id | read_ct | enqueued_at | vt | message
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+--------------------
1 | 2 | 2023-10-28 19:09:09.177756-05 | 2023-10-28 19:27:00.337929-05 | {"hello": "world"}
```
Expand Down Expand Up @@ -214,7 +254,7 @@ Example:

```sql
select pgmq.delete('my_queue', 5);
delete
delete
--------
t
```
Expand Down Expand Up @@ -243,7 +283,7 @@ Delete two messages that exist.

```sql
select * from pgmq.delete('my_queue', ARRAY[2, 3]);
delete
delete
--------
2
3
Expand All @@ -253,7 +293,7 @@ Delete two messages, one that exists and one that does not. Message `999` does n

```sql
select * from pgmq.delete('my_queue', ARRAY[6, 999]);
delete
delete
--------
6
```
Expand All @@ -280,8 +320,8 @@ Example:
Purge the queue when it contains 8 messages;

```sql
select * from pgmq.purge_queue('my_queue');
purge_queue
select * from pgmq.purge_queue('my_queue');
purge_queue
-------------
8
```
Expand Down Expand Up @@ -311,7 +351,7 @@ Example; remove message with ID 1 from queue `my_queue` and archive it:

```sql
SELECT * FROM pgmq.archive('my_queue', 1);
archive
archive
---------
t
```
Expand Down Expand Up @@ -341,7 +381,7 @@ Delete messages with ID 1 and 2 from queue `my_queue` and move to the archive.

```sql
SELECT * FROM pgmq.archive('my_queue', ARRAY[1, 2]);
archive
archive
---------
1
2
Expand All @@ -351,7 +391,7 @@ Delete messages 4, which exists and 999, which does not exist.

```sql
select * from pgmq.archive('my_queue', ARRAY[4, 999]);
archive
archive
---------
4
```
Expand Down Expand Up @@ -379,7 +419,7 @@ Example:

```sql
select from pgmq.create('my_queue');
create
create
--------
```

Expand Down Expand Up @@ -416,15 +456,15 @@ select from pgmq.create_partitioned(
'100000',
'10000000'
);
create_partitioned
create_partitioned
--------------------
```

---

### create_unlogged

Creates an unlogged table. This is useful when write throughput is more important that durability.
Creates an unlogged table. This is useful when write throughput is more important that durability.
See Postgres documentation for [unlogged tables](https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-UNLOGGED) for more information.

```text
Expand All @@ -442,7 +482,7 @@ Example:

```sql
select pgmq.create_unlogged('my_unlogged');
create_unlogged
create_unlogged
-----------------
```

Expand All @@ -467,7 +507,7 @@ Example:

```sql
select * from pgmq.detach_archive('my_queue');
detach_archive
detach_archive
----------------
```

Expand All @@ -492,7 +532,7 @@ Example:

```sql
select * from pgmq.drop_queue('my_unlogged');
drop_queue
drop_queue
------------
t
```
Expand Down Expand Up @@ -526,7 +566,7 @@ Set the visibility timeout of message 1 to 30 seconds from now.

```sql
select * from pgmq.set_vt('my_queue', 11, 30);
msg_id | read_ct | enqueued_at | vt | message
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+----------------------
1 | 0 | 2023-10-28 19:42:21.778741-05 | 2023-10-28 19:59:34.286462-05 | {"hello": "world_0"}
```
Expand All @@ -551,7 +591,7 @@ Example:

```sql
select * from pgmq.list_queues();
queue_name | created_at | is_partitioned | is_unlogged
queue_name | created_at | is_partitioned | is_unlogged
----------------------+-------------------------------+----------------+-------------
my_queue | 2023-10-28 14:13:17.092576-05 | f | f
my_partitioned_queue | 2023-10-28 19:47:37.098692-05 | t | f
Expand Down Expand Up @@ -597,7 +637,7 @@ Example:

```sql
select * from pgmq.metrics('my_queue');
queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time
queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time
------------+--------------+--------------------+--------------------+----------------+-------------------------------
my_queue | 16 | 2445 | 2447 | 35 | 2023-10-28 20:23:08.406259-05
```
Expand Down Expand Up @@ -633,7 +673,7 @@ RETURNS TABLE(

```sql
select * from pgmq.metrics_all();
queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time
queue_name | queue_length | newest_msg_age_sec | oldest_msg_age_sec | total_messages | scrape_time
----------------------+--------------+--------------------+--------------------+----------------+-------------------------------
my_queue | 16 | 2563 | 2565 | 35 | 2023-10-28 20:25:07.016413-05
my_partitioned_queue | 1 | 11 | 11 | 1 | 2023-10-28 20:25:07.016413-05
Expand Down
36 changes: 30 additions & 6 deletions pgmq-extension/sql/pgmq.sql
Original file line number Diff line number Diff line change
Expand Up @@ -274,19 +274,31 @@ CREATE FUNCTION pgmq.send(
msg JSONB,
delay INTEGER DEFAULT 0
) RETURNS SETOF BIGINT AS $$
BEGIN
RETURN QUERY SELECT * FROM pgmq.send(queue_name, msg, clock_timestamp() + make_interval(secs => delay));
END;
$$ LANGUAGE plpgsql;
-- send_at
-- sends a message to a queue, with a delay as a timestamp
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message)
VALUES ((clock_timestamp() + %L), $1)
VALUES ($2, $1)
RETURNING msg_id;
$QUERY$,
qtable, make_interval(secs => delay)
qtable
);
RETURN QUERY EXECUTE sql USING msg;
RETURN QUERY EXECUTE sql USING msg, delay;
END;
$$ LANGUAGE plpgsql;
Expand All @@ -297,19 +309,31 @@ CREATE FUNCTION pgmq.send_batch(
msgs JSONB[],
delay INTEGER DEFAULT 0
) RETURNS SETOF BIGINT AS $$
BEGIN
RETURN QUERY SELECT * FROM pgmq.send_batch(queue_name, msgs, clock_timestamp() + make_interval(secs => delay));
END;
$$ LANGUAGE plpgsql;
-- send_batch_at
-- sends an array of list of messages to a queue, with a delay as a timestamp
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message)
SELECT clock_timestamp() + %L, unnest($1)
SELECT $2, unnest($1)
RETURNING msg_id;
$QUERY$,
qtable, make_interval(secs => delay)
qtable
);
RETURN QUERY EXECUTE sql USING msgs;
RETURN QUERY EXECUTE sql USING msgs, delay;
END;
$$ LANGUAGE plpgsql;
Expand Down
Loading

0 comments on commit d2a0021

Please sign in to comment.