-
Notifications
You must be signed in to change notification settings - Fork 246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ClientRefs, Poller, and Streams #179
Conversation
a2a7f5d
to
0188a78
Compare
crates/providers/src/chain.rs
Outdated
// Then try to fill as many blocks as possible | ||
while !known_blocks.contains(&block_number) { | ||
let block = provider.get_block_by_number(block_number, false).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would consider doing a tokio::future::join_all if the gap between block_number
and next_yield
is >N blocks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a TODO comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crates/providers/src/heart.rs
Outdated
impl WatchConfig { | ||
/// Create a new watch for a transaction. | ||
pub fn new(tx_hash: B256, tx: oneshot::Sender<()>) -> Self { | ||
Self { tx_hash, confirmations: 0, timeout: Default::default(), tx } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think?
Self { tx_hash, confirmations: 0, timeout: Default::default(), tx } | |
Self { tx_hash, confirmations: 1, timeout: Default::default(), tx } |
crates/rpc-client/src/poller.rs
Outdated
let fut = async move { | ||
let limit = self.limit.unwrap_or(usize::MAX); | ||
for _ in 0..limit { | ||
let client = match self.client.upgrade() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to upgrade each time in the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we want to break if this loop is the only provider left
/// An [`RpcClient`] in a [`Weak`] reference. | ||
pub type WeakClient<T> = Weak<RpcClientInner<T>>; | ||
|
||
/// A borrowed [`RpcClient`]. | ||
pub type ClientRef<'a, T> = &'a RpcClientInner<T>; | ||
|
||
/// A JSON-RPC client. | ||
#[derive(Debug)] | ||
pub struct RpcClient<T>(Arc<RpcClientInner<T>>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any amazing readout for how to think about Weak vs full Arc? https://doc.rust-lang.org/std/sync/struct.Weak.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allows breaking out of task loops when the provider is dropped in addition to the channel dropping, makes sense with broadcast channels ig
Some(Ok(value)) => match serde_json::from_str(value.get()) { | ||
Ok(item) => return task::Poll::Ready(Some(Ok(item))), | ||
Err(e) => { | ||
trace!(value = value.get(), error = ?e, "Received unexpected value in subscription."); | ||
continue; | ||
} | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awesome
/// stream will attempt to deserialize the notifications and yield the | ||
/// `serde_json::Result` of the deserialization. | ||
#[derive(Debug)] | ||
pub struct SubResultStream<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be called SubscriptionStream
& SubscriptionStreamResult
?
} | ||
|
||
/// A stream of notifications from the server, identified by a local ID. This | ||
/// stream may yield unexpected types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's an example of an unexpected type?
crates/providers/src/heart.rs
Outdated
|
||
/// Reap any timeout | ||
fn reap_timeouts(&mut self) { | ||
let now = std::time::Instant::now(); | ||
let to_keep = self.reap_at.split_off(&now); | ||
let to_reap = std::mem::replace(&mut self.reap_at, to_keep); | ||
|
||
for tx_hash in to_reap.values() { | ||
self.unconfirmed.remove(tx_hash); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reap is basically "unsubscribe from transactions that have been pending for longer than T"?
crates/providers/src/heart.rs
Outdated
/// Handle a new block by checking if any of the transactions we're | ||
/// watching are in it, and if so, notifying the watcher. Also updates | ||
/// the latest block. | ||
fn handle_new_block(&mut self, block: Block, latest: &watch::Sender<Block>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
let http = Http::<Client>::new(url); | ||
let provider = RootProvider::<TmpNetwork, _>::new(RpcClient::new(http, true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should sugar this to Provider::new(url)
and have it work for all HTTP/WS/IPC etc.
crates/providers/src/new.rs
Outdated
let pending_tx = provider.send_transaction(&TxLegacy(tx)).await.expect("failed to send tx"); | ||
eprintln!("{pending_tx:?}"); | ||
let () = pending_tx.await.expect("failed to await pending tx"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yay
@@ -552,7 +552,7 @@ impl<'a> TryFrom<&'a String> for Provider<Http<Client>> { | |||
#[cfg(test)] | |||
mod tests { | |||
use crate::{ | |||
provider::{Provider, TempProvider}, | |||
tmp::{Provider, TempProvider}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note we're gonna get out of the tmp stuff once @onbjerg work is done
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] | ||
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit should we start replacing these with native async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should work,
some nits
@DaniPopes let's review & get this in if feature complete and proceed with events? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I can see how it all fits together.
I have one concern re usage of sleep in select!
otherwise lgtm
Separate
RpcClient
fromRpcClientInner
to allow weak and lifetime-borrowed use of RpcClient without holding a reference beyondMotivation
Solution
PR Checklist