-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update message table schema for efficient poller and w/o potential message loss #1015
Changes from 6 commits
9d264b3
fba8347
74fdc2d
7fd1b56
65bea38
5e2efd4
e3402b9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,23 @@ | ||
--- | ||
title: Messaging | ||
weight: 18 | ||
aliases: ['/docs/advanced/messaging/','/docs/reference/messaging/'] | ||
title: Vitess Messaging | ||
--- | ||
|
||
Vitess messaging gives the application an easy way to schedule and manage work | ||
that needs to be performed asynchronously. Under the covers, messages are | ||
stored in a traditional MySQL table and therefore enjoy the following | ||
properties: | ||
|
||
* **Scalable**: Because of vitess's sharding abilities, messages can scale to | ||
* **Scalable**: Because of Vitess's sharding abilities, messages can scale to | ||
very large QPS or sizes. | ||
* **Guaranteed delivery**: A message will be indefinitely retried until a | ||
successful ack is received. | ||
* **Non-blocking**: If the sending is backlogged, new messages continue to be | ||
accepted for eventual delivery. | ||
* **Adaptive**: Messages that fail delivery are backed off exponentially. | ||
* **Analytics**: The retention period for messages is dictated by the | ||
application. One could potentially choose to never delete any messages and | ||
use the data for performing analytics. | ||
* **Adaptive**: Messages that fail delivery are backed off exponentially with | ||
jitter to prevent thundering herds. | ||
* **Analytics**: Acknowledged messages are retained for a period of time — dictated | ||
by the `time_acked` value for the row and the `vt_purge_after` (seconds) value | ||
provided for the table — and can be used for analytics. | ||
* **Transactional**: Messages can be created or acked as part of an existing | ||
transaction. The action will complete only if the commit succeeds. | ||
|
||
|
@@ -34,102 +33,102 @@ Messages are good for: | |
* Handing off work to another system. | ||
* Recording potentially time-consuming work that needs to be done | ||
asynchronously. | ||
* Scheduling for future delivery. | ||
* Accumulating work that could be done during off-peak hours. | ||
|
||
Messages are not a good fit for the following use cases: | ||
|
||
* Broadcasting of events to multiple subscribers. | ||
* Ordered delivery. | ||
* Real-time delivery. | ||
* Broadcasting each event to multiple subscribers. | ||
* Ordered delivery is required. | ||
* Real-time delivery properties are required. | ||
|
||
## Creating a message table | ||
|
||
The current implementation requires a fixed schema. This will be made more | ||
flexible in the future. There will also be a custom DDL syntax. For now, a | ||
message table must be created like this: | ||
The current implementation requires a base fixed schema with properties defined | ||
using Vitess specific table `COMMENT` directives. The message table format is as | ||
follows: | ||
|
||
```sql | ||
create table my_message( | ||
time_scheduled bigint, | ||
id bigint, | ||
time_next bigint, | ||
epoch bigint, | ||
time_created bigint, | ||
time_acked bigint, | ||
message varchar(128), | ||
priority tinyint NOT NULL DEFAULT '0', | ||
primary key(time_scheduled, id), | ||
unique index id_idx(id), | ||
index next_idx(priority asc, time_next desc) | ||
) comment 'vitess_message,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30' | ||
# required columns | ||
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence', | ||
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first', | ||
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends the message, and is used for incremental backoff doubling', | ||
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set', | ||
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set', | ||
|
||
# add as many custom fields here as required | ||
# optional - these are suggestions | ||
tenant_id bigint COMMENT 'offers a nice way to segment your messages', | ||
message json, | ||
|
||
# required indexes | ||
primary key(id), | ||
index poller_idx(time_acked, priority, time_next desc) | ||
|
||
# add any secondary indexes or foreign keys - no restrictions | ||
) comment 'vitess_message,vt_min_backoff=30,vt_max_backoff=3600,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30' | ||
``` | ||
|
||
The application-related columns are as follows: | ||
|
||
* `id`: can be any type. Must be unique. | ||
* `id`: can be any type. Must be unique (for sharded message tables, this will typically be your primary vindex column). | ||
* `message`: can be any type. | ||
* `time_scheduled`: must be a bigint. It will be used to store unix time in | ||
nanoseconds. If unspecified, the `Now` value is inserted. | ||
* `priority`: messages with a lower priority will be processed first. | ||
|
||
The above indexes are recommended for optimum performance. However, some | ||
The noted indexes are recommended for optimum performance. However, some | ||
variation can be allowed to achieve different performance trade-offs. | ||
|
||
The comment section specifies additional configuration parameters. The fields | ||
are as follows: | ||
|
||
* `vitess_message`: Indicates that this is a message table. | ||
* `vt_ack_wait=30`: Wait for 30s for the first message ack. If one is not | ||
received, resend. | ||
* `vt_min_backoff=30`, `vt_max_backoff=3600`: Set bounds, in seconds, on exponential | ||
backoff for message retries. | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* `vt_ack_wait=30`: Wait for 30 seconds for the *first* message send to be acked. | ||
If one is not received within this time frame, the message will be resent. | ||
* `vt_purge_after=86400`: Purge acked messages that are older than 86400 | ||
seconds (1 day). | ||
* `vt_batch_size=10`: Send up to 10 messages per RPC packet. | ||
* `vt_cache_size=10000`: Store up to 10000 messages in the cache. If the demand | ||
* `vt_batch_size=10`: Send up to 10 messages per gRPC packet. | ||
* `vt_cache_size=10000`: Store up to 10,000 messages in the cache. If the demand | ||
is higher, the rest of the items will have to wait for the next poller cycle. | ||
* `vt_poller_interval=30`: Poll every 30s for messages that are due to be sent. | ||
* `vt_poller_interval=30`: Poll every 30 seconds for messages that should be | ||
[re]sent. | ||
|
||
If any of the above fields are missing, vitess will fail to load the table. No | ||
If any of the above fields are missing, Vitess will fail to load the table. No | ||
operation will be allowed on a table that has failed to load. | ||
|
||
## Enqueuing messages | ||
|
||
The application can enqueue messages using an insert statement: | ||
The application can enqueue messages using a standard `INSERT` statement, for example: | ||
|
||
```sql | ||
insert into my_message(id, message) values(1, 'hello world') | ||
insert into my_message(id, message) values(1, '{"message": "hello world"}') | ||
``` | ||
|
||
These inserts can be part of a regular transaction. Multiple messages can be | ||
inserted to different tables. Avoid accumulating too many big messages within a | ||
inserted into different tables. Avoid accumulating too many big messages within a | ||
transaction as it consumes memory on the VTTablet side. At the time of commit, | ||
memory permitting, all messages are instantly enqueued to be sent. | ||
|
||
Messages can also be created to be sent in the future: | ||
|
||
```sql | ||
insert into my_message(id, message, time_scheduled) values(1, 'hello world', :future_time) | ||
``` | ||
|
||
`future_time` must be the unix time expressed in nanoseconds. | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
## Receiving messages | ||
|
||
Processes can subscribe to receive messages by sending a `MessageStream` | ||
request to VTGate. If there are multiple subscribers, the messages will be | ||
delivered in a round-robin fashion. Note that this is not a broadcast; Each | ||
gRPC request to a `VTGate` or using the `stream * from <table>` SQL statement | ||
(if using the interactive mysql command-line client you must also pass the | ||
`-q`/`--quick` option). If there are multiple subscribers, the messages will be | ||
delivered in a round-robin fashion. Note that *this is not a broadcast*; each | ||
message will be sent to at most one subscriber. | ||
|
||
The format for messages is the same as a vitess `Result`. This means that | ||
standard database tools that understand query results can also be message | ||
recipients. Currently, there is no SQL format for subscribing to messages, but | ||
one will be provided soon. | ||
The format for messages is the same as a standard Vitess `Result` received from | ||
a `VTGate`. This means that standard database tools that understand query results | ||
can also be message receivers. | ||
|
||
### Subsetting | ||
|
||
It's possible that you may want to subscribe to specific shards or groups of | ||
shards while requesting messages. This is useful for partitioning or load | ||
balancing. The `MessageStream` API allows you to specify these constraints. The | ||
request parameters are as follows: | ||
balancing. The `MessageStream` gRPC API call allows you to specify these | ||
constraints. The request parameters are as follows: | ||
|
||
* `Name`: Name of the message table. | ||
* `Keyspace`: Keyspace where the message table is present. | ||
|
@@ -141,8 +140,9 @@ request parameters are as follows: | |
|
||
## Acknowledging messages | ||
|
||
A received (or processed) message can be acknowledged using the `MessageAck` | ||
API call. This call accepts the following parameters: | ||
A received and processed (you've completed some meaningful work based on the | ||
message contents received) message can be acknowledged using the `MessageAck` | ||
gRPC API call. This call accepts the following parameters: | ||
|
||
* `Name`: Name of the message table. | ||
* `Keyspace`: Keyspace where the message table is present. This field can be | ||
|
@@ -153,9 +153,10 @@ Once a message is successfully acked, it will never be resent. | |
|
||
## Exponential backoff | ||
|
||
A message that was successfully sent will wait for the specified ack wait time. | ||
If no ack is received by then, it will be resent. The next attempt will be 2x | ||
the previous wait, and this delay is doubled for every attempt. | ||
For a message that was successfully sent we will wait for the specified `vt_ack_wait` | ||
time. If no ack is received by then, it will be resent. The next attempt will be 2x | ||
the previous wait, and this delay is doubled for every attempt (with some added | ||
jitter to avoid thundering herds). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should probably add details for Also, maybe we should specify that "some added jitter" is "up to 33% jitter" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if we want to get this technical, but here's a first stab at giving actual details Basic backoff equation:
backoff query if
|
||
|
||
## Purging | ||
|
||
|
@@ -164,50 +165,38 @@ exceeds the time period specified by `vt_purge_after`. | |
|
||
## Advanced usage | ||
|
||
The `MessageAck` functionality is currently an API call and cannot be used | ||
inside a transaction. However, you can ack messages using a regular DML. It | ||
should look like this: | ||
The `MessageAck` functionality is currently a gRPC API call and cannot be used | ||
from the SQL interface. However, you can manually ack messages using a regular | ||
DML query like this: | ||
|
||
```sql | ||
update my_message set time_acked = :time_acked, time_next = null where id in ::ids and time_acked is null | ||
``` | ||
|
||
You can manually change the schedule of existing messages with a statement like | ||
You can also manually change the schedule of existing messages with a statement like | ||
this: | ||
|
||
```sql | ||
update my_message set time_next = :time_next, epoch = :epoch where id in ::ids and time_acked is null | ||
update my_message set priority = :priority, time_next = :time_next, epoch = :epoch where id in ::ids and time_acked is null | ||
``` | ||
|
||
This comes in handy if a bunch of messages had chronic failures and got | ||
postponed to the distant future. If the root cause of the problem was fixed, | ||
the application could reschedule them to be delivered immediately. You can also | ||
optionally change the epoch. Lower epoch values increase the priority of the | ||
message and the back-off is less aggressive. | ||
the application could reschedule them to be delivered as soon as possible. You can | ||
also optionally change the priroity and or epoch. Lower priority and epoch values | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
both increase the relative priority of the message and the back-off is less | ||
aggressive. | ||
|
||
You can also view messages using regular `select` queries. | ||
|
||
## Undocumented features | ||
|
||
These are features that were previously known limitations, but have since been supported | ||
and are awaiting further documentation. | ||
|
||
* Flexible columns: Allow any number of application defined columns to be in | ||
the message table. | ||
* No ACL check for receivers: To be added. | ||
* Monitoring support: To be added. | ||
* Dropped tables: The message engine does not currently detect dropped tables. | ||
You can also view messages using regular `SELECT` queries against the message table. | ||
|
||
## Known limitations | ||
|
||
The message feature is currently in alpha, and can be improved. Here is the | ||
list of possible limitations/improvements: | ||
Here is a short list of possible limitations/improvements: | ||
|
||
* Proactive scheduling: Upcoming messages can be proactively scheduled for | ||
timely delivery instead of waiting for the next polling cycle. | ||
* Changed properties: Although the engine detects new message tables, it does | ||
not refresh properties of an existing table. | ||
* A `SELECT` style syntax for subscribing to messages. | ||
* No rate limiting. | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* Usage of partitions for efficient purging. | ||
not refresh the properties (such as `vt_ack_wait`) of an existing table. | ||
Comment on lines
209
to
+210
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is true anymore, but I'm not sure |
||
* Usage of MySQL partitioning for more efficient purging. | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we say "shard" instead of "segment"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like it could still be useful when not sharded. Some receivers could only process and ack messages for a given tenant. No?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have lots of unsharded queues - tenant id just feels like the most common sharding id, though I understand your point. My first reaction on reading it was to wonder if "segment" was a specific concept for messaging. I don't feel super strongly about it.
Once we're done with this, I want to blog about and/or add an opinionated design to the docs, with the specific fields and queries we use, so people don't have to architect from scratch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion here either. We can always revisit.
That would be awesome! ❤️