Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
feature: dispatch request to end subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
prestwich committed Feb 25, 2023
1 parent 94fa85b commit 22a0e33
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 25 deletions.
45 changes: 29 additions & 16 deletions ethers-providers/src/rpc/transports/ws2/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,25 @@ impl SubscriptionManager {
self.aliases.remove(&server_id);
}

fn end_subscription(&mut self, id: u64) -> Box<RawValue> {
fn end_subscription(&mut self, id: u64) -> Option<Box<RawValue>> {
if let Some(sub) = self.subs.remove(&id) {
if let Some(alias) = sub.current_server_id {
self.remove_alias(alias);
if let Some(server_id) = sub.current_server_id {
self.remove_alias(server_id);
// drop the receiver as we don't need the result
let (channel, _) = oneshot::channel();
// Serialization errors are ignored, and result in the request
// not being dispatched. This is fine, as worst case it will
// result in the server sending us notifications we ignore
let unsub_request = InFlight {
method: "eth_unsubscribe".to_string(),
params: SubId(server_id).serialize_raw().ok()?,
channel,
};
// reuse the RPC ID. this is somewhat dirty.
return unsub_request.serialize_raw(id).ok()
}
}
todo!("return unsubscribe request")
None
}

fn handle_notification(&mut self, notification: Notification) {
Expand All @@ -78,7 +90,7 @@ impl SubscriptionManager {

fn req_success(&mut self, id: u64, result: Box<RawValue>) -> Box<RawValue> {
if let Ok(sub_id) = serde_json::from_str::<SubId>(result.get()) {
self.add_alias(sub_id.subscription, id);
self.add_alias(sub_id.0, id);
let result = U256::from(id);
RawValue::from_string(format!("{result:?}")).unwrap()
} else {
Expand All @@ -100,18 +112,18 @@ impl SubscriptionManager {
params: Box<RawValue>,
) -> Result<Box<RawValue>, WsClientError> {
let (tx, rx) = mpsc::unbounded();
// we make both a pending req and an active sub here

let active_sub = ActiveSub { params, channel: tx, current_server_id: None };
let req = active_sub.to_request(id);
let req = active_sub.serialize_raw(id)?;

// Explicit drop for the lock
// Explicit scope for the lock
// This insertion should be made BEFORE the request returns.
{
self.channel_map.lock().unwrap().insert(id.into(), rx);
}
self.subs.insert(id, active_sub);

Ok(RawValue::from_string(serde_json::to_string(&req)?)?)
Ok(req)
}
}

Expand Down Expand Up @@ -175,15 +187,15 @@ impl RequestManager {
for (id, sub) in self.subs.to_reissue() {
self.backend
.dispatcher
.unbounded_send(sub.serialize_raw(*id))
.unbounded_send(sub.serialize_raw(*id)?)
.map_err(|_| WsClientError::DeadChannel)?;
}

// reissue requests
for (id, req) in self.reqs.iter() {
self.backend
.dispatcher
.unbounded_send(req.serialize_raw(*id))
.unbounded_send(req.serialize_raw(*id)?)
.map_err(|_| WsClientError::DeadChannel)?;
}

Expand Down Expand Up @@ -247,11 +259,12 @@ impl RequestManager {
self.service_request(id, method, params, sender)?;
}
Instruction::Unsubscribe { id } => {
let req = self.subs.end_subscription(id.low_u64());
self.backend
.dispatcher
.unbounded_send(req)
.map_err(|_| WsClientError::DeadChannel)?;
if let Some(req) = self.subs.end_subscription(id.low_u64()) {
self.backend
.dispatcher
.unbounded_send(req)
.map_err(|_| WsClientError::DeadChannel)?;
}
}
}
Ok(())
Expand Down
23 changes: 14 additions & 9 deletions ethers-providers/src/rpc/transports/ws2/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ use crate::{common::Request, JsonRpcError};
// Normal JSON-RPC response
pub type Response = Result<Box<RawValue>, JsonRpcError>;

#[derive(serde::Deserialize)]
pub struct SubId {
pub subscription: U256,
#[derive(serde::Deserialize, serde::Serialize)]
pub struct SubId(pub U256);

impl SubId {
pub(super) fn serialize_raw(&self) -> Result<Box<RawValue>, serde_json::Error> {
let s = serde_json::to_string(&self)?;
RawValue::from_string(s)
}
}

#[derive(Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -45,9 +50,9 @@ impl InFlight {
Request::new(id, &self.method, self.params.clone())
}

pub(super) fn serialize_raw(&self, id: u64) -> Box<RawValue> {
let s = serde_json::to_string(&self.to_request(id)).unwrap();
RawValue::from_string(s).unwrap()
pub(super) fn serialize_raw(&self, id: u64) -> Result<Box<RawValue>, serde_json::Error> {
let s = serde_json::to_string(&self.to_request(id))?;
RawValue::from_string(s)
}
}

Expand All @@ -62,9 +67,9 @@ impl ActiveSub {
Request::new(id, "eth_subscribe", self.params.clone())
}

pub(super) fn serialize_raw(&self, id: u64) -> Box<RawValue> {
let s = serde_json::to_string(&self.to_request(id)).unwrap();
RawValue::from_string(s).unwrap()
pub(super) fn serialize_raw(&self, id: u64) -> Result<Box<RawValue>, serde_json::Error> {
let s = serde_json::to_string(&self.to_request(id))?;
RawValue::from_string(s)
}
}

Expand Down

0 comments on commit 22a0e33

Please sign in to comment.