Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow subscriber might block publishes #60

Open
jjj-vtm opened this issue Sep 4, 2024 · 6 comments
Open

Slow subscriber might block publishes #60

jjj-vtm opened this issue Sep 4, 2024 · 6 comments

Comments

@jjj-vtm
Copy link

jjj-vtm commented Sep 4, 2024

Hi,

I wrote a client which basically simulates an overloaded (not consuming) subscriber.

async fn main() -> () {
    let paths = vec!["Vehicle.Speed"];
    let mut client: KuksaClient =
        KuksaClient::new(kuksa_common::to_uri("127.0.0.1:55556").unwrap());

    let mut stream = client.subscribe(paths).await.unwrap();

    match stream.message().await {
        Ok(msg) => {
            println!("Got {:?}, not consuming any more message.", msg);
        }
        Err(_) => todo!(),
    }
    tokio::time::sleep(Duration::MAX).await; 
}

to trigger the bug faster I also modified the http/2 initial_connect_window and stream_window_size

async fn try_create_channel(&mut self) -> Result<&Channel, ClientError> {
        #[cfg(feature = "tls")]
        let mut builder = tonic::transport::Channel::builder(self.uri.clone());
        builder = builder.initial_connection_window_size(2048);
        builder = builder.initial_stream_window_size(2048);

After around 15-20 publishes via the cli

kuksa.val.v1 > publish Vehicle.Speed 123
[publish] OK

no response will come back and publishing a different value via another cli session
kuksa.val.v1 > publish Vehicle.Width 3

will also hang. I guess that this send

match self.sender.send(notifications).await {

will wait until the receiver reads the data and since a read lock on database is held subsequent publishes wait on the lock. But even with a different database implementation the issue would be that one slow consumer might cause delays in the value propagation for other subscribers on the same path since the send from above is called in a loop over the possible subscribers.

Maybe it would be better to use try_send instead of send.

Cheers,

Jan

@argerus
Copy link
Contributor

argerus commented Sep 5, 2024

Nice catch!

I've actually had this issue in the back of my mind for some time now, but haven't gotten around to fixing / documenting it.

Maybe it would be better to use try_send instead of send.

That's one option. An alternative solution is to use a broadcast::channel instead of the mpsc::channel currently used to notify subscribers. The semantics of a broadcast channel is that sending will always succeed never block, and if a subscriber cannot keep up, it will receive a RecvError::Lagged which can then be handled downstream by that subscriber.

@jjj-vtm
Copy link
Author

jjj-vtm commented Sep 6, 2024

broadcast::channel sounds like a good solution and might be used on something like a "per path" basis which would be a good way to only send messages to subscribers they are really interested in. The current solution I guess has quite some overhead if there are many subscribers to lots of different topics/paths and each change is propagated to all of them and the subscriber filter them out.

@lukasmittag
Copy link
Contributor

@jjj-vtm If you have time you can join our community meeting taking place tomorrow and we can discuss it further. Here is the link to it https://eclipse.zoom.us/j/87644929505?pwd=cTRpYklVaS9xYjlhMXRtbS9IN0FCQT09 its planned at 1 pm.

@jjj-vtm
Copy link
Author

jjj-vtm commented Oct 4, 2024

Hi,
I found some time to look at this issue again and I would prefer to implement it via try_send and map the "Queue full error" to something which should not trigger the cleanup. Sample implementation would be:

https://github.com/jjj-vtm/kuksa-databroker/blob/792ba13abe1edb5c5afcc448568815026e2cf8d6/databroker/src/broker.rs#L763C16-L770C23

and error handling in the update_entries function:

https://github.com/jjj-vtm/kuksa-databroker/blob/792ba13abe1edb5c5afcc448568815026e2cf8d6/databroker/src/broker.rs#L1447C1-L1464C14

What would be nice about the broadcast channel is that it would signal the client that it is lagging behind but on the other hand having a broadcast channel with a single receiver only for this semantic seems to be overkill. I thought about having a broadcast channel per ID so that only Subscribers which are interested in changes for that ID are informed but that would need some refactoring and I only would see this necessary if the databroker must support a high number of subscribers (> 10k). With < 1k subscriptions checking if the subscriptions is interested in the change should not cause problems since the checks are very simple (maybe apart from the HashMap lookup).

In my playground I also added a small performance test:
https://github.com/jjj-vtm/kuksa-databroker/blob/small_set_improvement/databroker/examples/pub_perf.rs

Using this I made some changes to the databroker, which we could discuss:

  1. Changed General HashSets with enums as values to EnumSet which handles insertion, intersection as bit wise operations
  2. Use FxHashMap since the rust HashMap implementation uses a much more expensive Hash function
  3. Converted tokio RwLocks to std RwLocks

(3) is quite controversial since a lot of ".awaits" are removed and the broker gets more synchronous but since there is no IO involved and everything is backed by a simple in Memory datastore this should be fine. This change increased the performance quite drastically. Using the test above with the standard (main) databroker on my M3 I get:

janjongen@tsssss3 kuksa-databroker-1 % ./target/release/examples/pub_perf
Time to handle 2500000 message: : 20.26343 seconds. Msg per second: 125.000

with my patches:

tsssss3 kuksa-databroker-1 % ./target/release/examples/pub_perf
Time to handle 2500000 message: : 15.60479 seconds. Msg per second: 166.666

Sorry for the long post, I try to join the community meeting next time.

@argerus
Copy link
Contributor

argerus commented Oct 4, 2024

Awesome!

I will have a closer look, but I have a couple of initial thoughts.

What would be nice about the broadcast channel is that it would signal the client that it is lagging behind but on the other hand having a broadcast channel with a single receiver only for this semantic seems to be overkill.

You're right that it's probably "overkill" to use one broadcast channel per subscriber, but the other nice semantic of the broadcast channel is that it will drop the oldest value when the subscriber is lagging.

I would prefer to implement it via try_send and map the "Queue full error" to something

The problem with this is that it would effectively mean that the latest value is dropped instead of the oldest. That's probably not the semantics we want, as the oldest value is probably that of least interest if we're starting to drop things.

My other thought is that I'm not sure pub_perf is measuring what we really want. If I'm reading it right, it is measuring how fast the publishing side can publish stuff. Since we've introduced dropping stuff, I would assume that this means it can publish things faster as things are now dropped when queues start filling up.

We would be really happy to improve the performance measuring situation, though. We've recently created kuksa-perf as the first step to have more meaningful performance tests. What it's measuring is the latency between a provider publishing an update and a subscriber receiving an update, and we're working on making it better a simulating realistic workloads with the ability to fine tune the amount of traffic generated while still measuring the resulting latency.

This is an area where we need to look further into how we want databroker to behave under excessive load (i.e. when and how it should start dropping stuff).

Anyway, thank you for also looking into this, and I will take a further look into the other performance enhancing modifications you've made as well.

@jjj-vtm
Copy link
Author

jjj-vtm commented Oct 4, 2024

The problem with this is that it would effectively mean that the latest value is dropped instead of the oldest. That's probably not the semantics we want, as the oldest value is probably that of least interest if we're starting to drop things.

You are right, I thought about it but it too but could not come up with an easy way how to implement this without using the broadcast channel. IMHO there are 3 options

  1. Bound the buffer size of the channel to 1 in this case try_send would have the same semantics as the broadcast channel. I guess that this should not cause any problems but to have the option of buffering seems nice

  2. Use a broadcast channel per subscriber which for me feels wrong

  3. On each subscribe spawn a task and give it the receiver of a broadcast channel and the sender of the channel which is used for the tonic stream. On each update propagate the changes via the broadcast channel to the tasks and let them send the notification out. A slow subscriber would case no effect and with those tasks one would have control over the handling of the receive error in case the buffer is full. IMHO this seems to be the right way to go but it would be much more effort compared to (1) and (2) ;)

My other thought is that I'm not sure pub_perf is measuring what we really want

It simply tests the throughput of publishes of the databroker and is not a realistic load test. I just wanted to measure the effects of my changes.

https://github.com/eclipse-kuksa/kuksa-perf

Thanks! I must have overlooked that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants