-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rpc module): stream API
for SubscriptionSink
#639
Conversation
add_stream
to subscription sinkstream API
for SubscriptionSink
/// Connection ID | ||
pub conn_id: ConnectionId, | ||
/// Channel to know whether the connection is closed or not. | ||
pub close: async_channel::Receiver<()>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: this panics if count > usize::MAX / 2
but if we reach that we likely have other problems such as OOM :)
stream API
for SubscriptionSinkstream API
for SubscriptionSink
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks pretty good to me, what is it you want to do more here?
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped"); | ||
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I felt like I understood the test until I got here. I thought rx
would produce None
after the client was dropped and so it'd be assert!(rx.next().await.is_none())
. What am I missing? :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha, maybe it's more clear as you described it but the test actually sends a message on the channel when the subscription terminated.
the reason why is because the tx
is kept in the RpcModule and can't be dropped in the subscribe callback.
Co-authored-by: David <[email protected]>
It should be complete but the grumbles is maybe the API (naming) and cloning a bunch |
core/src/server/rpc_module.rs
Outdated
let (conn_tx, conn_rx) = oneshot::channel::<()>(); | ||
let c = conn.expect("conn must be Some; this is bug"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it bust be Some
, why don't we restrict it on parameter level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's way better but I think it requires additional callback types right? see my comment below.
thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I had a look at the code to solve this properly at parameter level
one need to lookup at the actual callback to avoid to pass down a bunch of unused parameters.
// server code
let result = match RpcModule::as_callback(&req.method) {
None => {
send_error(sink, msg),
continue
}
Some(MethodKind::Sync(cb)) => (cb)(id, params, sink, conn_id),
Some(MethodKind::Async(cb)) => (cb)(id, params, sink, resources).await,
// should be used for subscriptions...
// servers that don't support subscriptions should throw an error here...
Some(MethodKind::Subscription) => (cb)(id, params, sink, resources, conn_state),
};
// modify RpcModule::register_subscription
pub fn register_subscription<F>(
&mut self,
subscribe_method_name: &'static str,
notif_method_name: &'static str,
unsubscribe_method_name: &'static str,
callback: F,
) -> Result<(), Error> {
....
....
self.methods.mut_callbacks().insert(
subscribe_method_name,
MethodCallback::new_subscription(Arc::new(move |id, params, method_sink, conn| {
...
}
);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one benefit is not to have to clone this "introduced channel connection_state" for every method instead only for subscriptions where it's actually used... not sure I liked how the old code abstracted this away
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already thought about splitting async calls from subscriptions when doing my refactoring earlier, and for regular method calls returning values instead of passing in the sink as a parameter, I reckon that would make things much more readable and straight forward, and potentially make the final binary smaller. So if you want to go that route and add another enum variant I think that's cool, and I can do a PR that switches method calls to have a return value later on :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I already added the enum variant in this PR but just a hacky draft to check that it works and to show you and David what I had in mind :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds great to get rid of the sink for the synchronous calls.
S: Stream<Item = T> + Unpin, | ||
T: Serialize, | ||
{ | ||
let mut close_stream = self.close.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is hack; to workaround borrowing issues when we call self.send
below.
we may refactor this to split the SubscripionSink
to several types or something.... because the close_stream
is mutable borrowed and then we can't borrow any of the other fields here or move out of self
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need plenty of comments in this method, it requires some pretty deep understanding of the architecture and I think we should capture it here.
core/src/server/rpc_module.rs
Outdated
@@ -786,9 +746,40 @@ impl SubscriptionSink { | |||
self.inner_send(msg).map_err(Into::into) | |||
} | |||
|
|||
/// Consume the sink by passing a stream to be sent via the sink. | |||
pub async fn add_stream<S, T>(mut self, mut stream: S) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about calling this into_stream
? I think "add" implies there could be more than one and that it doesn't quite relay the information about the important changes that this call makes to the sink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like add_stream
either, but into_stream
is not really great either it doesn't return the stream....
maybe run_stream
, from_stream
, spawn_stream
or something else?!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair; I'd have liked as_stream
but as_
is "taken" with different semantics so can't do that.
Of your suggestions I like from_stream
the best.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this method consumes a stream, feeding the items into the subscription?
I guess I'd go with something like consume_stream
or read_from_stream
. into_
, as_
, and from_
all sortof feel like I should expect some result back from this call to me!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this method consumes a stream, feeding the items into the subscription?
yes
I guess we could return a type that impls Sink/SinkExt
instead here to make it more readable and flexible i.e, to deal with errors and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamify()
?
I think consume_stream
is so-so. Yes, we do consume it, but that's not really the point. Rather we're "hooking up" the stream to the sink and leave it there for the duration of the subscription.
with_stream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipe
, maybe? we're piping a stream into the subscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sink.pipe_from_stream
? I quite like pipe
!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like pipe_from_stream
, let's settle for that?
S: Stream<Item = T> + Unpin, | ||
T: Serialize, | ||
{ | ||
let mut close_stream = self.close.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need plenty of comments in this method, it requires some pretty deep understanding of the architecture and I think we should capture it here.
} | ||
Err(err) => { | ||
tracing::warn!("Subscription got error: {:?} terminating task", err); | ||
break Err(err); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is most likely a bug when serializing T fails, should we have a flag for this when to exit, just log an continue, or something else?
The error is not really possible to handle but if you require each element to be processed in order, ignore might break expectations so I just terminated the "task" when an error is detected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you made the right choice. We could also panic I guess but let's terminate for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me overall, and I like the simplification in substrate!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, good job separating subscriptions!
Fixes #627
This PR introduces a new API for the
SubscriptionSink
to add a stream to send items over the subscription when "new items" get produced, if the connection gets closed it will terminate the task because it has additional channel linked to actual connection.