Skip to content

Commit

Permalink
fix: change ws SubscriptionStream::unsubscribe to send the correct se…
Browse files Browse the repository at this point in the history
…rver-side ID (gakonst#2669)

Also add a test which calls unsubscribe.
  • Loading branch information
gbrew authored and jackiec22 committed Mar 6, 2024
1 parent 2d619b6 commit 2a85d90
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
10 changes: 7 additions & 3 deletions ethers-providers/src/rpc/pubsub.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{JsonRpcClient, Middleware, Provider};
use crate::{JsonRpcClient, Provider};
use ethers_core::types::U256;
use futures_util::stream::Stream;
use pin_project::{pin_project, pinned_drop};
Expand Down Expand Up @@ -28,7 +28,8 @@ pub trait PubsubClient: JsonRpcClient {
#[pin_project(PinnedDrop)]
/// Streams data from an installed filter via `eth_subscribe`
pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> {
/// The subscription's installed id on the ethereum node
/// A client-side ID for the subscription. This may not be the same
/// as the server-side ID for this subscription on the ethereum node.
pub id: U256,

loaded_elements: VecDeque<R>,
Expand Down Expand Up @@ -61,7 +62,10 @@ where

/// Unsubscribes from the subscription.
pub async fn unsubscribe(&self) -> Result<bool, crate::ProviderError> {
self.provider.unsubscribe(self.id).await
// Make sure to use PubSubClient unsubscribe() rather than Provider unsubscribe()
// Only the former handles mappings between client- and server-side subscription IDs
P::unsubscribe((*self.provider).as_ref(), self.id).map_err(Into::into)?;
Ok(true)
}

/// Set the loaded elements buffer. This buffer contains logs waiting for
Expand Down
6 changes: 0 additions & 6 deletions ethers-providers/src/rpc/transports/ws/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ pub type Response = Result<Box<RawValue>, JsonRpcError>;
#[derive(serde::Deserialize, serde::Serialize)]
pub struct SubId(pub U256);

impl SubId {
pub(super) fn serialize_raw(&self) -> Result<Box<RawValue>, serde_json::Error> {
to_raw_value(&self)
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct Notification {
pub subscription: U256,
Expand Down
32 changes: 32 additions & 0 deletions ethers-providers/tests/it/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,38 @@ mod eth_tests {
assert_eq!(&block, blocks.last().unwrap());
}

#[tokio::test]
#[cfg(feature = "ws")]
async fn unsubscribe_blocks_ws() {
let (provider, _anvil) = crate::spawn_anvil_ws().await;
generic_unsubscribe_blocks_test(provider).await;
}

#[tokio::test]
#[cfg(feature = "ipc")]
async fn unsubscribe_blocks_ipc() {
let (provider, _anvil, _ipc) = crate::spawn_anvil_ipc().await;
generic_unsubscribe_blocks_test(provider).await;
}

#[cfg(any(feature = "ws", feature = "ipc"))]
async fn generic_unsubscribe_blocks_test<M>(provider: M)
where
M: Middleware,
M::Provider: ethers_providers::PubsubClient,
{
{
let stream = provider.subscribe_blocks().await.unwrap();
stream.unsubscribe().await.unwrap();
}
{
let _stream = provider.subscribe_blocks().await.unwrap();
// stream will be unsubscribed automatically here on drop
}
// Sleep to give the unsubscription messages time to propagate
tokio::time::sleep(crate::Duration::from_millis(200)).await;
}

#[tokio::test]
async fn send_tx_http() {
let (provider, anvil) = spawn_anvil();
Expand Down

0 comments on commit 2a85d90

Please sign in to comment.