From 755f78c6e2feb89593cf6c8b4c6d1f5938b60647 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien?= <3535019+leruaa@users.noreply.github.com> Date: Thu, 25 Jan 2024 21:33:57 +0100 Subject: [PATCH] fix(pubsub): handle subscription response on reconnects (#105) (#107) --- crates/pubsub/src/managers/sub.rs | 2 +- crates/pubsub/src/service.rs | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/crates/pubsub/src/managers/sub.rs b/crates/pubsub/src/managers/sub.rs index 278c9992abf..9cdff2dda28 100644 --- a/crates/pubsub/src/managers/sub.rs +++ b/crates/pubsub/src/managers/sub.rs @@ -47,7 +47,7 @@ impl SubscriptionManager { // If we already know a subscription with the exact params, // we can just update the server_id and get a new listener. - if self.local_to_server.contains_left(&local_id) { + if self.local_to_sub.contains_left(&local_id) { self.change_server_id(local_id, server_id); self.get_rx(local_id).expect("checked existence") } else { diff --git a/crates/pubsub/src/service.rs b/crates/pubsub/src/service.rs index e90629b8f98..1b5bce5d27e 100644 --- a/crates/pubsub/src/service.rs +++ b/crates/pubsub/src/service.rs @@ -91,13 +91,19 @@ where // Drop all server IDs. We'll re-insert them as we get responses. self.subs.drop_server_ids(); + // Dispatch all subscription requests - self.subs - .iter() - .map(|(_, sub)| sub.request().serialized().to_owned()) - .collect::>() - .into_iter() - .try_for_each(|brv| self.dispatch_request(brv))?; + self.subs.iter().try_for_each(|(_, sub)| { + let req = sub.request().to_owned(); + let (in_flight, _) = InFlight::new(req.clone()); + self.in_flights.insert(in_flight); + + self.handle + .to_socket + .send(req.serialized().to_owned()) + .map(drop) + .map_err(|_| TransportErrorKind::backend_gone()) + })?; Ok(()) }