Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make Nats JetStream scale #18876

Open
2 of 4 tasks
tabVersion opened this issue Oct 11, 2024 · 4 comments
Open
2 of 4 tasks

refactor: make Nats JetStream scale #18876

tabVersion opened this issue Oct 11, 2024 · 4 comments
Assignees

Comments

@tabVersion
Copy link
Contributor

tabVersion commented Oct 11, 2024

Current Impl

  • Our current NATS JetStream implementation tracks progress through each message’s sequence ID and creates an ephemeral consumer upon each startup to guarantee exactly-once message consumption.
  • However, due to NATS API limitations, we are only able to consume messages using a single worker, which significantly limits read efficiency.
  • The ephemeral consumer is memory-bound on the broker, and creating new consumers during failover puts pressure on the broker.
  • Customers have also reported data loss issues, which we need to address by improving ACK mechanism.

Our primary goal is to enable multiple workers to consume a single subject (the smallest unit in NATS) or a group of subjects (described using wildcards).

Challenges and Solutions

  • This objective requires the use of durable consumers, as the NATS documentation explicitly states: "Ephemeral consumers are meant to be used by a single instance of an application (e.g., to get its own replay of the messages in the stream)." Therefore, ephemeral consumers cannot share progress across multiple instances. ref
  • Durable consumers operate similarly to a consumer group, so we need to acknowledge each message (or batch acknowledgment) to prevent the broker from sending the same message to other workers.
    • The broker will dispatch messages across workers without a guarantee of the distribution.
  • We will entrust offset management entirely to the broker. By using the durable consumer name, we can retrieve previous progress. This allows us to scale workers horizontally and dynamically align the number of workers (splits) with the operator’s parallelism.

Drawback

  • Each worker will independently pull data from the broker, and when the number of workers changes, the broker may rebalance, causing data duplication for a short period.
    • This rebalancing process cannot be controlled by RisingWave, so we need to accept a degradation of message consumption semantics from exactly-once to at-least-once.

TODO List

@xxchan
Copy link
Member

xxchan commented Oct 11, 2024

Thanks for providing the context. Now I can understand #18873 better.
And I had also thought about this for Kafka (rely on consumer group, and use manual commit), and yes the largest drawback seems to be exactly-once -> at-least-once.

(But there seem to be no many benefits for Kafka to do the change.)

(And I'm not sure whether group rebalancing causes duplication, but was more thinking that failover will cause duplication. i.e., crash after checkpoint, but before ack)

But I found there's a "partitioning" mechanism in NATS https://docs.nats.io/nats-concepts/subject_mapping (not delved into the details), which makes me think whether NATS's parallelism mechanism is like Pulsar.

@tabVersion
Copy link
Contributor Author

Their diagram tells the story. Subject Mapping does things inside broker, you can dynamically route messages into different subjects. RW, on the consumer side, does not need to care much about it. The consumer just needs to know which subject to consume, no matter foo or foo.*.

which makes me think whether NATS's parallelism mechanism is like Pulsar

Yes, I have the same feeling. But Pulsar offers API to list sub-partitions, giving us a way to consume in Kafka's way.

(But there seem to be no many benefits for Kafka to do the change.)

Yes, I am just proposing changes just to Nats JetStream (maybe Pubsub later).

(And I'm not sure whether group rebalancing causes duplication, but was more thinking that failover will cause duplication. i.e., crash after checkpoint, but before ack)

Kafka pauses the consumption during rebalancing. It is not designed for rapidly changing parallelism.

@yufansong
Copy link
Member

I am still confuse about one point:

The broker will dispatch messages across workers without a guarantee of the distribution.

If we have several works A,B,C to read message from one broker in the future. Then B batch ACK the message and A C crashed. When A and C recovery, what message they will get from Nats Jetstream?

@tabVersion
Copy link
Contributor Author

I am still confuse about one point:

The broker will dispatch messages across workers without a guarantee of the distribution.

If we have several works A,B,C to read message from one broker in the future. Then B batch ACK the message and A C crashed. When A and C recovery, what message they will get from Nats Jetstream?

B just acks messages sent to B, it does not involves messages sent to A and C. The broker records each message sent to which consumer.
After A and C recovery, the broker sends the un-acked messages waiting in the queue. It does not guarantee there is no overlap in A's, B's, and C's messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants