-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(rpc module): close subscription task when a subscription is unsubscribed
via the unsubscribe call
#743
Conversation
This reverts commit 366176a.
core/src/server/rpc_module.rs
Outdated
} | ||
}; | ||
|
||
let sub_closed = match self.unsubscribe.take() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, so this oneshot unsubscribe channel is removed here, which means we can only use pipe_from_try_stream
once.
Is it possible to take a mutable ref to it instead to use below (which may need pinning)? Maybe then this method would still be reusable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it won't work because send
takes a &self
as well.
I changed it to tokio::sync::watch
for ease of use.
The receiver is clonable and it's possible to check whether the sender is still alive
Co-authored-by: David <[email protected]>
Co-authored-by: David <[email protected]>
@@ -108,6 +100,48 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle | |||
}) | |||
.unwrap(); | |||
|
|||
module | |||
.register_subscription("subscribe_5_ints", "n", "unsubscribe_5_ints", |_, pending, _| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we can call pipe_from_stream
more than once, should we also have a test to make sure that we can?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me!
unsubscribed
via the unsubscribe call
unsubscribed
via the unsubscribe call
The reason for this is that the stream passed to
pipe_from_stream
might not produce new items and that can lead to that calls that areunsubscribed
is not terminated until an item for the underlying stream is produced.In worst these tasks will not be terminated until the actual connection is closed which this fixes.
After this PR the subscriptions is terminated once the unsubscribe call is received.