Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove async from PubSubFrontend::unsubscribe #163

Closed
ZanCorDX opened this issue Jan 30, 2024 · 2 comments · Fixed by #168
Closed

Remove async from PubSubFrontend::unsubscribe #163

ZanCorDX opened this issue Jan 30, 2024 · 2 comments · Fixed by #168
Labels
enhancement New feature or request

Comments

@ZanCorDX
Copy link

Component

providers, pubsub

Describe the feature you would like

Remove async from PubSubFrontend::unsubscribe since it's not really async.
It's stopping me from being able to implement a RAII object that unsubscribes on drop.

Additional context

No response

@ZanCorDX ZanCorDX added the enhancement New feature or request label Jan 30, 2024
@prestwich
Copy link
Member

would you consider submitting your RAII types for inclusion in the main repo?

@ZanCorDX
Copy link
Author

ZanCorDX commented Jan 31, 2024

It's a very preliminary stuff while you finish streams stuff:

use alloy_primitives::U256;
use alloy_providers::provider::{Provider, TempProvider};
use alloy_pubsub::PubSubFrontend;
use alloy_rpc_client::RpcClient;
use alloy_rpc_types::Header;
use alloy_transport_http::Http;
use serde_json::value::RawValue;
use tokio::sync::broadcast::Receiver;
use tokio_stream::wrappers::BroadcastStream;

/// Wrapper over PubSubFrontend while alloy is finished (now = 2024/1/29)
/// It solves:
/// - Problem of keeping alive the PubSubFrontend while subscriptions are in use (if not subscription die :().
/// - is_local propagation from connector
/// Sample use:
/// let connector: IpcConnect<_> = config.el_node_ipc_path.clone().into();
/// let connection = RethConnection::new(connector).await?;
/// let mut block_sub = connection.subscribe_blocks().await?;
/// let provider = connection.create_provider();
/// let block_number = provider.get_block_number()await;
/// block_sub.as_mut().recv()
pub struct RethConnection {
    front_end: PubSubFrontend,
    connection_is_local: bool,
}

impl RethConnection {
    pub async fn new<T: alloy_pubsub::PubSubConnect>(connector: T) -> Result<Self, eyre::Error> {
        let connection_is_local = connector.is_local();
        let front_end = connector.into_service().await?;
        Ok(Self {
            front_end,
            connection_is_local,
        })
    }

    pub fn create_provider(&self) -> Provider<PubSubFrontend> {
        // Can't use alloy_providers::provider::Provider::new since it has a hardcoded false for is_local
        let rpc_client = RpcClient::new(self.front_end.clone(), self.connection_is_local);
        alloy_providers::provider::Provider::new_with_client(rpc_client)
    }

    /// Creates a subscription to event that will unsubscribe automatically on Drop
    async fn subscribe(&self, event: &str) -> Result<Subscription, eyre::Error> {
        let (receiver, subscription_id) = self.send_subscribe_request(event).await?;
        let receiver_stream = BroadcastStream::new(receiver);
        Ok(Subscription {
            connection: self,
            receiver_stream,
            subscription_id,
        })
    }

    /// subscribes via "eth_subscribe" returning the (Receiver channel , subscription_id)
    async fn send_subscribe_request(
        &self,
        event: &str,
    ) -> Result<(Receiver<Box<RawValue>>, U256), eyre::Error> {
        let provider = self.create_provider();
        let subscription_id = provider.raw_request(ETH_SUBSCRIBE, vec![event]).await?;
        let receiver = self.front_end.get_subscription(subscription_id).await?;
        Ok((receiver, subscription_id))
    }

    pub async fn subscribe_blocks(&self) -> Result<Subscription, eyre::Error> {
        self.subscribe(NEW_HEADS_SUBSCRIPTION).await
    }

    async fn unsubscribe(&self, id: U256) -> Result<(), eyre::Error> {
        self.front_end.unsubscribe(id).await?;
        Ok(())
    }

    pub async fn test_subscribe_blocks(&self) -> Result<(), eyre::Error> {
        let (_, subscription_id) = self.send_subscribe_request(NEW_HEADS_SUBSCRIPTION).await?;
        let _ = self.unsubscribe(subscription_id).await;
        Ok(())
    }
}

/// Since we still don't have auto unsubscribe on Drop we MUST call unsubscribe when done using the Subscription.
/// Look a [RethConnection] for use.
pub struct Subscription<'a> {
    /// service must be kept alive for receiver to keep working.
    connection: &'a RethConnection,
    receiver_stream: BroadcastStream<Box<serde_json::value::RawValue>>,
    /// Should unsubscribe on Drop but wasn't able to do it (due to async call on Drop problem)
    subscription_id: U256,
}

impl<'a> Subscription<'a> {
    pub async fn unsubscribe(&self) -> Result<(), eyre::Error> {
        self.connection.unsubscribe(self.subscription_id).await
    }
}

impl<'a> AsMut<BroadcastStream<Box<serde_json::value::RawValue>>> for Subscription<'a> {
    fn as_mut(&mut self) -> &mut BroadcastStream<Box<serde_json::value::RawValue>> {
        &mut self.receiver_stream
    }
}

// impl<'a> Drop for Subscription<'a> {
//    fn drop(&mut self) {
//        // PENDING
//        // Call somehow RethConnection::front_end::unsubscribe(subscription_id)
//    }
//}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants