Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: Add async to anchor-client #2488

Merged
merged 10 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions client/example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ use anyhow::Result;
use clap::Parser;

#[cfg(not(feature = "async"))]
#[path = "blocking.rs"]
mod lib;
mod blocking;

#[cfg(feature = "async")]
#[path = "nonblocking.rs"]
mod lib;
mod nonblocking;

#[derive(Parser, Debug)]
pub struct Opts {
Expand All @@ -30,11 +28,11 @@ pub struct Opts {
// deployed at the addresses given by the CLI args.
#[cfg(not(feature = "async"))]
fn main() -> Result<()> {
lib::main()
blocking::main()
}

#[cfg(feature = "async")]
#[tokio::main]
async fn main() -> Result<()> {
lib::main().await
nonblocking::main().await
}
18 changes: 9 additions & 9 deletions client/example/src/nonblocking.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anchor_client::solana_sdk::pubkey::Pubkey;
use anchor_client::solana_sdk::signature::{Keypair, Signer};
use anchor_client::solana_sdk::system_instruction;
use anchor_client::{Client, Cluster, EventContext};
use anchor_client::{Client, Cluster};
use anyhow::Result;
use clap::Parser;
use solana_sdk::commitment_config::CommitmentConfig;
Expand Down Expand Up @@ -180,13 +180,13 @@ pub async fn events<C: Deref<Target = impl Signer> + Clone>(
let program = client.program(pid);

let (sender, mut receiver) = mpsc::unbounded_channel();
let mut event_handler = program.create_event_handler().await?;

let event_id = event_handler.subscribe(move |_ctx: &EventContext, event: MyEvent| {
if sender.send(event).is_err() {
println!("Error while transferring the event.")
}
})?;
let event_unsubscriber = program
.on(move |_, event: MyEvent| {
if sender.send(event).is_err() {
println!("Error while transferring the event.")
}
})
.await?;

sleep(Duration::from_millis(1000)).await;

Expand All @@ -200,7 +200,7 @@ pub async fn events<C: Deref<Target = impl Signer> + Clone>(
assert_eq!(event.data, 5);
assert_eq!(event.label, "hello".to_string());

event_handler.unsubscribe(event_id).await;
event_unsubscriber.unsubscribe().await;

println!("Events success!");

Expand Down
3 changes: 0 additions & 3 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,6 @@ pub enum ClientError {
SolanaClientPubsubError(#[from] solana_client::nonblocking::pubsub_client::PubsubClientError),
#[error("Unable to parse log: {0}")]
LogParseError(String),
#[cfg(feature = "async")]
#[error("No id was found for the event")]
NoIdFound,
}

/// `RequestBuilder` provides a builder interface to create and send
Expand Down
133 changes: 50 additions & 83 deletions client/src/nonblocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,90 +19,33 @@ use solana_sdk::{
commitment_config::CommitmentConfig, signature::Signature, signer::Signer,
transaction::Transaction,
};
use std::{collections::BTreeMap, ops::Deref, pin::Pin, sync::Arc};
use std::{ops::Deref, pin::Pin, sync::Arc};
use tokio::{
spawn,
sync::mpsc::{unbounded_channel, UnboundedReceiver},
task::JoinHandle,
};

type UnsubscribeFunc = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
type Event = (
JoinHandle<Result<(), ClientError>>,
UnboundedReceiver<UnsubscribeFunc>,
);

pub struct EventHandler {
client: Arc<PubsubClient>,
program_id: Pubkey,
config: RpcTransactionLogsConfig,
events: BTreeMap<u8, Event>,
type UnsubscribeFn = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;

pub struct EventUnsubscriber {
handle: JoinHandle<Result<(), ClientError>>,
rx: UnboundedReceiver<UnsubscribeFn>,
}

impl EventHandler {
pub fn new(client: PubsubClient, program_id: Pubkey, config: RpcTransactionLogsConfig) -> Self {
Self {
client: Arc::new(client),
program_id,
config,
events: BTreeMap::new(),
impl EventUnsubscriber {
/// Unsubscribe gracefully.
pub async fn unsubscribe(mut self) {
if let Some(unsubscribe) = self.rx.recv().await {
unsubscribe().await;
}
}
pub fn subscribe<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
&mut self,
f: impl Fn(&EventContext, T) + Send + 'static,
) -> Result<u8, ClientError> {
let addresses = vec![self.program_id.to_string()];
let filter = RpcTransactionLogsFilter::Mentions(addresses);
let self_program_str = self.program_id.to_string();
let config = self.config.clone();
let (unsubscribe_sender, unsubscribe_receiver) = unbounded_channel::<_>();

let handle = spawn({
let client = Arc::clone(&self.client);

async move {
let (mut notifications, unsubscribe) =
client.logs_subscribe(filter, config).await?;

unsubscribe_sender.send(unsubscribe).map_err(|e| {
ClientError::SolanaClientPubsubError(PubsubClientError::UnexpectedMessageError(
e.to_string(),
))
})?;

while let Some(logs) = notifications.next().await {
let ctx = EventContext {
signature: logs.value.signature.parse().unwrap(),
slot: logs.context.slot,
};
let events = parse_logs_response(logs, &self_program_str);
for e in events {
f(&ctx, e);
}
}
Ok::<(), ClientError>(())
}
});
let id = find_id(&self.events)?;
self.events.insert(id, (handle, unsubscribe_receiver));

Ok(id)
}

pub async fn unsubscribe(&mut self, id: u8) {
if let Some(mut event) = self.events.remove(&id) {
if let Some(unsubscribe) = event.1.recv().await {
unsubscribe().await;
}

let _ = event.0.await;
}
let _ = self.handle.await;
}
}

impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
/// Returns the account at the given address.
/// Returns the account at the given address
pub async fn account<T: AccountDeserialize>(&self, address: Pubkey) -> Result<T, ClientError> {
let rpc_client = RpcClient::new_with_commitment(
self.cfg.cluster.url().to_string(),
Expand Down Expand Up @@ -160,14 +103,47 @@ impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
)
}

pub async fn create_event_handler(&self) -> Result<EventHandler, ClientError> {
let ws_url = self.cfg.cluster.ws_url();
let client = PubsubClient::new(ws_url).await?;
/// Subscribe to program logs.
///
/// Returns an [`EventUnsubscriber`] to unsubscribe and close connection gracefully.
pub async fn on<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
&self,
f: impl Fn(&EventContext, T) + Send + 'static,
) -> Result<EventUnsubscriber, ClientError> {
let client = Arc::new(PubsubClient::new(self.cfg.cluster.ws_url()).await?);
Aursen marked this conversation as resolved.
Show resolved Hide resolved
let config = RpcTransactionLogsConfig {
commitment: self.cfg.options,
};

Ok(EventHandler::new(client, self.program_id, config))
let program_id_str = self.program_id.to_string();
let filter = RpcTransactionLogsFilter::Mentions(vec![program_id_str.clone()]);

let (tx, rx) = unbounded_channel::<_>();

let handle = spawn(async move {
let client = Arc::clone(&client);
let (mut notifications, unsubscribe) = client.logs_subscribe(filter, config).await?;

tx.send(unsubscribe).map_err(|e| {
ClientError::SolanaClientPubsubError(PubsubClientError::UnexpectedMessageError(
e.to_string(),
))
})?;

while let Some(logs) = notifications.next().await {
let ctx = EventContext {
signature: logs.value.signature.parse().unwrap(),
slot: logs.context.slot,
};
let events = parse_logs_response(logs, &program_id_str);
for e in events {
f(&ctx, e);
}
}
Ok::<(), ClientError>(())
});

Ok(EventUnsubscriber { handle, rx })
}
}

Expand Down Expand Up @@ -210,12 +186,3 @@ impl<'a, C: Deref<Target = impl Signer> + Clone> RequestBuilder<'a, C> {
.map_err(Into::into)
}
}

fn find_id<T>(map: &BTreeMap<u8, T>) -> Result<u8, ClientError> {
for i in u8::MIN..u8::MAX {
if !map.contains_key(&i) {
return Ok(i);
}
}
Err(ClientError::NoIdFound)
}