Skip to content

Commit

Permalink
remove JoinSet
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Jan 5, 2024
1 parent 60bca9b commit 43d7c52
Showing 1 changed file with 35 additions and 39 deletions.
74 changes: 35 additions & 39 deletions examples/rust/src/bin/subscribe-ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use {
futures::{sink::SinkExt, stream::StreamExt},
log::info,
std::env,
tokio::{
task::JoinSet,
time::{interval, Duration},
},
tokio::time::{interval, Duration},
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest,
Expand Down Expand Up @@ -38,9 +35,9 @@ async fn main() -> anyhow::Result<()> {
let mut client = GeyserGrpcClient::connect(args.endpoint, args.x_token, None)?;
let (mut subscribe_tx, mut stream) = client.subscribe().await?;

let mut tasks = JoinSet::new();
tasks.spawn(async move {
subscribe_tx
futures::try_join!(
async move {
subscribe_tx
.send(SubscribeRequest {
slots: maplit::hashmap! { "".to_owned() => SubscribeRequestFilterSlots { filter_by_commitment: Some(true) } },
commitment: Some(CommitmentLevel::Processed as i32),
Expand All @@ -49,41 +46,40 @@ async fn main() -> anyhow::Result<()> {
})
.await?;

let mut timer = interval(Duration::from_secs(3));
let mut id = 0;
loop {
timer.tick().await;
id += 1;
subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id }),
..Default::default()
})
.await?;
}
});
tasks.spawn(async move {
while let Some(message) = stream.next().await {
match message?.update_oneof.expect("valid message") {
UpdateOneof::Slot(slot) => {
//
info!("slot received: {slot:?}");
}
UpdateOneof::Ping(_msg) => {
info!("ping received");
}
UpdateOneof::Pong(SubscribeUpdatePong { id }) => {
info!("pong received: id#{id}");
let mut timer = interval(Duration::from_secs(3));
let mut id = 0;
loop {
timer.tick().await;
id += 1;
subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id }),
..Default::default()
})
.await?;
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
},
async move {
while let Some(message) = stream.next().await {
match message?.update_oneof.expect("valid message") {
UpdateOneof::Slot(slot) => {
//
info!("slot received: {slot:?}");
}
UpdateOneof::Ping(_msg) => {
info!("ping received");
}
UpdateOneof::Pong(SubscribeUpdatePong { id }) => {
info!("pong received: id#{id}");
}
msg => anyhow::bail!("received unexpected message: {msg:?}"),
}
msg => anyhow::bail!("received unexpected message: {msg:?}"),
}
Ok::<(), anyhow::Error>(())
}
Ok::<(), anyhow::Error>(())
});

while let Some(result) = tasks.join_next().await {
result??;
}
)?;

Ok(())
}

0 comments on commit 43d7c52

Please sign in to comment.