Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

style: use poll loop for CallState #779

Merged
merged 1 commit into from
May 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 38 additions & 64 deletions crates/rpc-client/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,84 +62,58 @@ where
}
}

impl<Params, Conn> CallState<Params, Conn>
impl<Params, Conn> Future for CallState<Params, Conn>
where
Conn: Transport + Clone,
Params: RpcParam,
{
fn poll_prepared(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<<Self as Future>::Output> {
let fut = {
let CallStateProj::Prepared { connection, request } = self.as_mut().project() else {
unreachable!("Called poll_prepared in incorrect state")
};
type Output = TransportResult<Box<RawValue>>;

if let Err(e) = task::ready!(Service::<RequestPacket>::poll_ready(connection, cx)) {
self.set(CallState::Complete);
return Ready(RpcResult::Err(e));
}
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
loop {
match self.as_mut().project() {
CallStateProj::Prepared { connection, request } => {
if let Err(e) =
task::ready!(Service::<RequestPacket>::poll_ready(connection, cx))
{
self.set(CallState::Complete);
return Ready(RpcResult::Err(e));
}

let request = request.take().expect("no request");
debug!(method=%request.meta.method, id=%request.meta.id, "sending request");
trace!(params_ty=%std::any::type_name::<Params>(), ?request, "full request");
let request = request.serialize();
match request {
Ok(request) => {
trace!(request=%request.serialized(), "serialized request");
connection.call(request.into())
let request = request.take().expect("no request");
debug!(method=%request.meta.method, id=%request.meta.id, "sending request");
trace!(params_ty=%std::any::type_name::<Params>(), ?request, "full request");
let request = request.serialize();
let fut = match request {
Ok(request) => {
trace!(request=%request.serialized(), "serialized request");
connection.call(request.into())
}
Err(err) => {
trace!(?err, "failed to serialize request");
self.set(CallState::Complete);
return Ready(RpcResult::Err(TransportError::ser_err(err)));
}
};
self.set(CallState::AwaitingResponse { fut });
}
Err(err) => {
trace!(?err, "failed to serialize request");
CallStateProj::AwaitingResponse { fut } => {
let res = match task::ready!(fut.poll(cx)) {
Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)),
Err(e) => Ready(RpcResult::Err(e)),
_ => panic!("received batch response from single request"),
};
self.set(CallState::Complete);
return Ready(RpcResult::Err(TransportError::ser_err(err)));
return res;
}
CallStateProj::Complete => {
panic!("Polled after completion");
}
}
};

self.set(CallState::AwaitingResponse { fut });
cx.waker().wake_by_ref();

task::Poll::Pending
}

fn poll_awaiting(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<<Self as Future>::Output> {
let CallStateProj::AwaitingResponse { fut } = self.as_mut().project() else {
unreachable!("Called poll_awaiting in incorrect state")
};

match task::ready!(fut.poll(cx)) {
Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)),
Err(e) => Ready(RpcResult::Err(e)),
_ => panic!("received batch response from single request"),
}
}
}

impl<Params, Conn> Future for CallState<Params, Conn>
where
Conn: Transport + Clone,
Params: RpcParam,
{
type Output = TransportResult<Box<RawValue>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
if matches!(*self.as_mut(), CallState::Prepared { .. }) {
return self.poll_prepared(cx);
}

if matches!(*self.as_mut(), CallState::AwaitingResponse { .. }) {
return self.poll_awaiting(cx);
}

panic!("Polled in bad state");
}
}

/// A prepared, but unsent, RPC call.
///
/// This is a future that will send the request when polled. It contains a
Expand Down
Loading