-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Messages User Guide #2493
Messages User Guide #2493
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
# Vitess Messaging | ||
|
||
# Overview | ||
|
||
Vitess messaging gives the application an easy way to schedule and manage work that needs to be performed asynchronously. Under the covers, messages are stored in a traditional MySQL table and therefore enjoy the following properties: | ||
|
||
* **Transactional**: Messages can be created or acked as part of an existing transaction. The action will complete only if the commit succeeds. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one is not the main attraction, I would move it down, keep Scalable as the first one in the list. |
||
* **Scalable**: Because of vitess's sharding abilities, messages can scale to very large QPS or sizes. | ||
* **Guaranteed delivery**: A message will be indefinitely retried until a successful ack is received. | ||
* **Non-blocking**: If the sending is backlogged, new messages continue to be accepted for eventual delivery. | ||
* **Adaptive**: Messages that fail delivery are backed off exponentially. | ||
* **Analytics**: The retention period for messages is dictated by the application. One could potentially choose to never delete any messages and use the data for performing analytics. | ||
|
||
The properties of a message are chosen by the application. However, every message needs a uniquely identifiable key. If the messages are stored in a sharded table, the key must also be the primary vindex of the table. | ||
|
||
Although messages will generally be delivered in the order they're created, this is not an explicit guarantee of the system. The focus is more on keeping track of the work that needs to be done and ensuring that it was performed. Messages are good for: | ||
|
||
* Handing off work to another system. | ||
* Recording potentially time-consuming work that needs to be done asynchronously. | ||
* Scheduling for future delivery. | ||
* Accumulating work that could be done during off-peak hours. | ||
|
||
Messages are not a good fit for the following use cases: | ||
|
||
* Broadcasting of events to multiple subscribers. | ||
* Ordered delivery. | ||
* Real-time delivery. | ||
|
||
# Creating a message table | ||
|
||
The current implementation requires a fixed schema. This will be made more flexible in the future. There will also be a custom DDL syntax. For now, a message table must be created like this: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1 table ~ 1 queue, so inserts / acks will be limited by your single node performance, correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The table can be sharded. So, there should be no performance bottleneck. Or did I misunderstand the question? |
||
|
||
``` | ||
create table my_message( | ||
time_scheduled bigint, | ||
id bigint, | ||
time_next bigint, | ||
epoch bigint, | ||
time_created bigint, | ||
time_acked bigint, | ||
message varchar(128), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned below, the message can be any MySQL type. |
||
primary key(time_scheduled, id), | ||
unique index id_idx(id), | ||
index next_idx(time_next, epoch) | ||
) comment 'vitess_message,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30' | ||
``` | ||
|
||
The application-related columns are as follows: | ||
|
||
* `id`: can be any type. Must be unique. | ||
* `message`: can be any type. | ||
* `time_scheduled`: must be a bigint. It will be used to store unix time in nanoseconds. If unspecified, the `Now` value is inserted. | ||
|
||
The above indexes are recommended for optimum performance. However, some variation can be allowed to achieve different performance trade-offs. | ||
|
||
The comment section specifies additional configuration parameters. The fields are as follows: | ||
|
||
* `vitess_message`: Indicates that this is a message table. | ||
* `vt_ack_wait=30`: Wait for 30s for the first message ack. If one is not received, resend. | ||
* `vt_purge_after=86400`: Purge acked messages that are older than 86400 seconds (1 day). | ||
* `vt_batch_size=10`: Send up to 10 messages per batch. | ||
* `vt_cache_seze=10000`: Store up to 10000 messages in the cache. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Batch size is the number of message rows sent per RPC packet. |
||
* `vt_poller_interval=30`: Poll every 30s for messages that are due to be sent. | ||
|
||
If any of the above fields are missing, vitess will fail to load the table. No operation will be allowed on a table that has failed to load. | ||
|
||
# Enqueuing messages | ||
|
||
The application can enqueue messages using an insert statement: | ||
|
||
``` | ||
insert into my_message(id, message) values(1, 'hello world') | ||
``` | ||
|
||
These inserts can be part of a regular transaction. Multiple messages can be inserted to different tables. Avoid accumulating too many big messages within a transaction as it consumes memory on the VTTablet side. At the time of commit, memoery permitting, all messages are instantly enqueued to be sent. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. memoery to memory There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does that VTTablet memory limitation look like? It's pretty common for us to add > 1M tasks in a single transaction, with the payload being a ~100 character url. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you have 100 shards, you'll need about 1M per shard. |
||
|
||
Messages can also be created to be sent in the future: | ||
|
||
``` | ||
insert into my_message(id, message, time_scheduled) values(1, 'hello world', :future_time) | ||
``` | ||
|
||
`future_time` must be the unix time expressed in nanoseconds. | ||
|
||
# Receiving messages | ||
|
||
Processes can subscribe to receive messages by sending a `MessageStream` request to VTGate. If there are multiple subscribers, the messages will be delivered in a round-robin fashion. Note that this is not a broadcast; Each message will be sent to at most one subscriber. | ||
|
||
The format for messages is the same as a vitess `Result`. This means that standard database tools that understand query results can also be message recipients. Currently, there is no SQL format for subscribing to messages, but one will be provided soon. | ||
|
||
## Subsetting | ||
|
||
It's possible that you may want to subscribe to specific shards or groups of shards while requesting messages. This is useful for partitioning or load balancing. The `MessageStream` API allows you to specify these constraints. The request parameters are as follows: | ||
|
||
* `Name`: Name of the message table. | ||
* `Keyspace`: Keyspace where the message table is present. | ||
* `Shard`: For unsharded keyspaces, this is usually "0". However, an empty shard will also work. For sharded keyspaces, a specific shard name can be specified. | ||
* `KeyRange`: If the keyspace is sharded, streaming will be performed only from the shards that match the range. This must be an exact match. | ||
|
||
# Acknowledging messages | ||
|
||
A received (or processed) message can be acknowledged using the `MessageAck` API call. This call accepts the following parameters: | ||
|
||
* `Name`: Name of the message table. | ||
* `Keyspace`: Keypsace where the message table is present. This field can be empty if the table name is unique across all keyspaces. | ||
* `Ids`: The list of ids that need to be acked. | ||
|
||
Once a message is successfully acked, it will never be resent. | ||
|
||
## Exponential backoff | ||
|
||
A message that was successfully sent will wait for the specified ack wait time. If no ack is received by then, it will be resent. The next attempt will be 2x the previous wait, and this delay is doubled for every attempt. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to set a queue level min/max? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We debated not backing off beyond a certain time period. However, that will just lead to hotspots in the future for messages that have chronic failures. It's better to provide for manual control over such messages, so they can be tried only after the root cause is addressed. |
||
|
||
# Purging | ||
|
||
Messages that have been successfully acked will be deleted after their age exceeds the time period specified by `vt_purge_after`. | ||
|
||
# Advanced usage | ||
|
||
The `MessageAck` functionality is currently an API call and cannot be used inside a transaction. However, you can ack a message using a regular DML. It should look like this: | ||
|
||
``` | ||
update my_message set time_acked = :time_acked, time_next = null where id in ::ids and time_acked is null | ||
``` | ||
|
||
You can also view messages using regular `select` queries. | ||
|
||
# Known limitations | ||
|
||
The message feature is currently in alpha, and can be improved. Here is the list of possible limitations/improvements: | ||
|
||
* Flexible columns: Allow any number of application defined columns to be in the message table. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Flexible columns is a close second on the feature list There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This won't be too hard to add on (after I take care of a few unrelated high-priority items). |
||
* No ACL check for receivers: To be added. | ||
* Proactive scheduling: Upcoming messages can be proactively scheduled for timely delivery instead of waiting for the next polling cycle. | ||
* Monitoring support: To be added. | ||
* Dropped tables: The message engine does not currently detect dropped tables. | ||
* Changed properties: Although the engine detects new message tables, it does not refresh properties of an existing table. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does properties mean the columns or the rows themselves? Is it possible to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now, it's all properties: columns, types as well as config parameters like cache size, etc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's actually possible to update the table. I've mentioned this in the Advanced Usage section. I'll add instructions for rescheduling messages in there. |
||
* A `SELECT` style syntax for subscribing to messages. | ||
* No rate limiting. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rate limiting would be my #1 request There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The rate limiting design needs to be discussed. You could add the ability for each vttablet to rate-limit. However, it's hard to manage because of possible size differences, resharding, etc. A more global solution may need to be brainstormed. |
||
* Usage of partitions for efficient purging. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you format this file with < 80 characters per line? My emacs does that for me...