Skip to content

Commit

Permalink
fix(pubsub): handle subscription response on reconnects (#105) (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
leruaa authored Jan 25, 2024
1 parent 5fa63d7 commit 755f78c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
2 changes: 1 addition & 1 deletion crates/pubsub/src/managers/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl SubscriptionManager {

// If we already know a subscription with the exact params,
// we can just update the server_id and get a new listener.
if self.local_to_server.contains_left(&local_id) {
if self.local_to_sub.contains_left(&local_id) {
self.change_server_id(local_id, server_id);
self.get_rx(local_id).expect("checked existence")
} else {
Expand Down
18 changes: 12 additions & 6 deletions crates/pubsub/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,19 @@ where

// Drop all server IDs. We'll re-insert them as we get responses.
self.subs.drop_server_ids();

// Dispatch all subscription requests
self.subs
.iter()
.map(|(_, sub)| sub.request().serialized().to_owned())
.collect::<Vec<_>>()
.into_iter()
.try_for_each(|brv| self.dispatch_request(brv))?;
self.subs.iter().try_for_each(|(_, sub)| {
let req = sub.request().to_owned();
let (in_flight, _) = InFlight::new(req.clone());
self.in_flights.insert(in_flight);

self.handle
.to_socket
.send(req.serialized().to_owned())
.map(drop)
.map_err(|_| TransportErrorKind::backend_gone())
})?;

Ok(())
}
Expand Down

0 comments on commit 755f78c

Please sign in to comment.