Skip to content

Commit

Permalink
add force shutdown with signals
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Dec 1, 2023
1 parent ab12e21 commit ec74b61
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ anchor-client = "0.28.0"
anchor-lang = "0.28.0"
anyhow = "1.0.75"
async-std = "1.0.0"
async-stream = "0.3.5"
async-trait = "0.1.60"
atty = "0.2.14"
blockbuster = "0.9.0-beta.1"
Expand Down
1 change: 1 addition & 0 deletions nft_ingester2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ publish = { workspace = true }

[dependencies]
anyhow = { workspace = true }
async-stream = { workspace = true }
atty = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive"] }
futures = { workspace = true }
Expand Down
27 changes: 14 additions & 13 deletions nft_ingester2/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use {
redis::{streams::StreamMaxlen, RedisResult, Value as RedisValue},
std::{sync::Arc, time::Duration},
tokio::{
signal::unix::SignalKind,
task::JoinSet,
time::{sleep, Instant},
},
tracing::warn,
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{prelude::subscribe_update::UpdateOneof, prost::Message},
};
Expand Down Expand Up @@ -64,15 +64,8 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
Ok(Err(error)) => break Err(error),
Err(error) => break Err(error.into()),
},
signal = &mut shutdown => {
let signal = if signal == SignalKind::interrupt() {
"SIGINT"
} else if signal == SignalKind::terminate() {
"SIGTERM"
} else {
"UNKNOWN"
};
tracing::warn!("{signal} received, waiting spawned tasks...");
Some(signal) = shutdown.next() => {
warn!("{signal} received, waiting spawned tasks...");
break Ok(());
},
msg = geyser.next() => {
Expand Down Expand Up @@ -142,9 +135,17 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
}
};

while let Some(result) = tasks.join_next().await {
result??;
}
tokio::select! {
Some(signal) = shutdown.next() => {
anyhow::bail!("{signal} received, force shutdown...");
}
result = async move {
while let Some(result) = tasks.join_next().await {
result??;
}
Ok::<(), anyhow::Error>(())
} => result?,
};

result
}
43 changes: 25 additions & 18 deletions nft_ingester2/src/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use {
redis::{metrics_xlen, ProgramTransformerInfo, RedisStream},
util::create_shutdown,
},
futures::future::{pending, BoxFuture, FusedFuture, FutureExt},
futures::{
future::{pending, BoxFuture, FusedFuture, FutureExt},
stream::StreamExt,
},
program_transformers::{
error::ProgramTransformerError, DownloadMetadataInfo, DownloadMetadataNotifier,
ProgramTransformer,
Expand All @@ -19,7 +22,6 @@ use {
Arc,
},
tokio::{
signal::unix::SignalKind,
task::JoinSet,
time::{sleep, Duration},
},
Expand Down Expand Up @@ -103,14 +105,7 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
Ok(Err(error)) => break Err(error),
Err(error) => break Err(error.into()),
},
signal = &mut shutdown => {
let signal = if signal == SignalKind::interrupt() {
"SIGINT"
} else if signal == SignalKind::terminate() {
"SIGTERM"
} else {
"UNKNOWN"
};
Some(signal) = shutdown.next() => {
warn!("{signal} received, waiting spawned tasks...");
break Ok(());
},
Expand Down Expand Up @@ -195,14 +190,26 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
});
};

redis_messages.shutdown();
while let Some(result) = pt_tasks.join_next().await {
result??;
}
if !redis_tasks_fut.is_terminated() {
redis_tasks_fut.await?;
}
pgpool.close().await;
tokio::select! {
Some(signal) = shutdown.next() => {
anyhow::bail!("{signal} received, force shutdown...");
}
result = async move {
// shutdown `prefetch` channel (but not Receiver)
redis_messages.shutdown();
// wait all `program_transformer` spawned tasks
while let Some(result) = pt_tasks.join_next().await {
result??;
}
// wait all `ack` spawned tasks
if !redis_tasks_fut.is_terminated() {
redis_tasks_fut.await?;
}
// shutdown database connection
pgpool.close().await;
Ok::<(), anyhow::Error>(())
} => result?,
};

result
}
Expand Down
14 changes: 8 additions & 6 deletions nft_ingester2/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ impl RedisStream {
.collect::<HashMap<_, _>>();

// spawn xack tasks
let mut tasks = ack_tasks
let ack_jh_vec = ack_tasks
.into_iter()
.map(|(stream, ack_rx)| {
let connection = connection.clone();
Expand All @@ -261,15 +261,16 @@ impl RedisStream {

// spawn prefetch task
let (messages_tx, messages_rx) = mpsc::channel(config.prefetch_queue_size);
tasks.push(tokio::spawn({
let jh_prefetch = tokio::spawn({
let shutdown = Arc::clone(&shutdown);
async move { Self::run_prefetch(config, streams, connection, messages_tx, shutdown).await }
}));
});

// merge spawned xack / prefetch tasks
let spawned_tasks = async move {
for task in tasks.into_iter() {
task.await??;
jh_prefetch.await??;
for jh in ack_jh_vec.into_iter() {
jh.await??;
}
Ok::<(), anyhow::Error>(())
};
Expand All @@ -287,8 +288,9 @@ impl RedisStream {
self.messages_rx.recv().await
}

pub fn shutdown(self) {
pub fn shutdown(mut self) {
self.shutdown.store(true, Ordering::Relaxed);
tokio::spawn(async move { while self.messages_rx.recv().await.is_some() {} });
}

async fn run_prefetch(
Expand Down
15 changes: 9 additions & 6 deletions nft_ingester2/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use {
futures::future::{BoxFuture, FutureExt},
async_stream::stream,
futures::stream::{BoxStream, StreamExt},
tokio::signal::unix::{signal, SignalKind},
};

pub fn create_shutdown() -> anyhow::Result<BoxFuture<'static, SignalKind>> {
pub fn create_shutdown() -> anyhow::Result<BoxStream<'static, &'static str>> {
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
Ok(async move {
tokio::select! {
_ = sigint.recv() => SignalKind::interrupt(),
_ = sigterm.recv() => SignalKind::terminate(),
Ok(stream! {
loop {
yield tokio::select! {
_ = sigint.recv() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
};
}
}
.boxed())
Expand Down

0 comments on commit ec74b61

Please sign in to comment.