Skip to content

Commit

Permalink
Expose subxt onclineclient / rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
pgherveou committed Oct 20, 2023
1 parent d1354c7 commit 97580de
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
13 changes: 8 additions & 5 deletions crates/examples/examples/simple_network_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pm = OsProcessManager;
let provider = NativeProvider::new(fs.clone(), pm);
let orchestrator = Orchestrator::new(fs, provider);
orchestrator.spawn(config).await?;
let network = orchestrator.spawn(config).await?;
println!("🚀🚀🚀🚀 network deployed");
// For now let just loop....
#[allow(clippy::empty_loop)]
loop {}

// Ok(())
let client = network.get_node("alice").unwrap().client();
let mut blocks = client.blocks().subscribe_finalized().await?;
while let Some(block) = blocks.next().await {
println!("Block #{}", block?.header().number);
}

Ok(())
}
15 changes: 15 additions & 0 deletions crates/orchestrator/src/network/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration};
use anyhow::anyhow;
use prom_metrics_parser::MetricMap;
use provider::DynNode;
use subxt::{backend::rpc::RpcClient, OnlineClient, PolkadotConfig};
use tokio::sync::RwLock;

use crate::network_spec::node::NodeSpec;
Expand All @@ -15,6 +16,8 @@ pub struct NetworkNode {
pub(crate) spec: NodeSpec,
pub(crate) name: String,
pub(crate) ws_uri: String,
rpc: RpcClient,
client: OnlineClient<PolkadotConfig>,
pub(crate) prometheus_uri: String,
metrics_cache: Arc<RwLock<MetricMap>>,
}
Expand All @@ -24,13 +27,17 @@ impl NetworkNode {
pub(crate) fn new<T: Into<String>>(
name: T,
ws_uri: T,
rpc: RpcClient,
client: OnlineClient<PolkadotConfig>,
prometheus_uri: T,
spec: NodeSpec,
inner: DynNode,
) -> Self {
Self {
name: name.into(),
ws_uri: ws_uri.into(),
rpc,
client,
prometheus_uri: prometheus_uri.into(),
inner,
spec,
Expand All @@ -45,6 +52,14 @@ impl NetworkNode {
Ok(())
}

pub fn rpc(&self) -> RpcClient {
self.rpc.clone()
}

pub fn client(&self) -> OnlineClient<PolkadotConfig> {
self.client.clone()
}

/// Resume the node, this is implemented by resuming the
/// actual process (e.g polkadot) with sendig `SIGCONT` signal
pub async fn resume(&self) -> Result<(), anyhow::Error> {
Expand Down
36 changes: 36 additions & 0 deletions crates/orchestrator/src/spawner.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::path::PathBuf;

use anyhow::Context;
use futures::Future;
use provider::{
constants::LOCALHOST,
types::{SpawnNodeOptions, TransferedFile},
DynNamespace,
};
use subxt::{backend::rpc::RpcClient, OnlineClient, PolkadotConfig};
use support::fs::FileSystem;

use crate::{
Expand Down Expand Up @@ -149,6 +152,7 @@ where
let running_node = ctx.ns.spawn_node(spawn_ops).await?;

let ws_uri = format!("ws://{}:{}", LOCALHOST, node.rpc_port.0);

let prometheus_uri = format!("http://{}:{}/metrics", LOCALHOST, node.prometheus_port.0);
println!("🚀 {}, should be running now", node.name);
println!(
Expand All @@ -158,11 +162,43 @@ where
println!("🚀 {} : metrics link {prometheus_uri}", node.name);
println!("📓 logs cmd: tail -f {}/{}.log", base_dir, node.name);
println!("\n");

let client = retry(|| async {
OnlineClient::from_url(&ws_uri)
.await
.context(format!("Failed to connect to node rpc at {ws_uri}"))
})
.await?;

let rpc = RpcClient::from_url(&ws_uri)
.await
.context(format!("Failed to connect to rpc client at {ws_uri}"))?;

Ok(NetworkNode::new(
node.name.clone(),
ws_uri,
rpc,
client,
prometheus_uri,
node.clone(),
running_node,
))
}

async fn retry<T, F>(connect: F) -> anyhow::Result<OnlineClient<PolkadotConfig>>
where
T: Future<Output = anyhow::Result<OnlineClient<PolkadotConfig>>>,
F: Fn() -> T,
{
let mut retries = 5;
loop {
match connect().await {
Err(_) if retries >= 0 => {
println!("Error connecting, retrying ...");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
retries -= 1;
},
res => break res,
}
}
}

0 comments on commit 97580de

Please sign in to comment.