From f2662dc21e4759c200c84433925184784c1d8bec Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 26 Oct 2023 21:09:16 +0600 Subject: [PATCH] tools: correctly handle SIGINT in kafka (#219) (cherry picked from commit c6041386e887228878cec987a61a9e47b1bc981c) --- CHANGELOG.md | 3 +- Cargo.lock | 4 +- yellowstone-grpc-tools/Cargo.toml | 2 +- yellowstone-grpc-tools/src/bin/grpc-kafka.rs | 82 ++++++++++++++++---- yellowstone-grpc-tools/src/kafka/prom.rs | 68 +++++++++++++--- 5 files changed, 132 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 726e24a3..1919ea9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,8 @@ The minor version will be incremented upon a breaking change and the patch versi ### Fixes -- Trigger end of startup when parent slot 0 seen in `update_slot_status` notification because `notify_end_of_startup` is not triggered when cluster started from genesis ([#207](https://github.com/rpcpool/yellowstone-grpc/pull/207)) +- geyser: trigger end of startup when parent slot 0 seen in `update_slot_status` notification because `notify_end_of_startup` is not triggered when cluster started from genesis ([#207](https://github.com/rpcpool/yellowstone-grpc/pull/207)) +- tools: correctly handle SIGINT in kafka ([#219](https://github.com/rpcpool/yellowstone-grpc/pull/219)) ### Features diff --git a/Cargo.lock b/Cargo.lock index 25702968..5bc5fd34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2925,9 +2925,9 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.33.2" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da18026aad1c24033da3da726200de7e911e75c2e2cc2f77ffb9b4502720faae" +checksum = "053adfa02fab06e86c01d586cc68aa47ee0ff4489a59469081dc12cbcde578bf" dependencies = [ "futures-channel", "futures-util", diff --git a/yellowstone-grpc-tools/Cargo.toml b/yellowstone-grpc-tools/Cargo.toml index 8c6bcb98..82f43ad5 100644 --- a/yellowstone-grpc-tools/Cargo.toml +++ b/yellowstone-grpc-tools/Cargo.toml @@ -19,7 +19,7 @@ hyper = { version = "0.14.27", features = ["server"] } json5 = "0.4.1" lazy_static = "1.4.0" prometheus = "0.13.2" -rdkafka = { version = "0.33.2", features = ["ssl", "sasl"] } +rdkafka = { version = "0.34.0", features = ["ssl", "sasl"] } serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0.86" serde_yaml = "0.9.25" diff --git a/yellowstone-grpc-tools/src/bin/grpc-kafka.rs b/yellowstone-grpc-tools/src/bin/grpc-kafka.rs index 34f7ef9a..23095f8d 100644 --- a/yellowstone-grpc-tools/src/bin/grpc-kafka.rs +++ b/yellowstone-grpc-tools/src/bin/grpc-kafka.rs @@ -92,14 +92,18 @@ impl ArgsAction { } // input - let consumer = prom::StatsContext::create_stream_consumer(&kafka_config) + let (consumer, kafka_error_rx1) = prom::StatsContext::create_stream_consumer(&kafka_config) .context("failed to create kafka consumer")?; consumer.subscribe(&[&config.kafka_input])?; // output - let kafka = prom::StatsContext::create_future_producer(&kafka_config) + let (kafka, kafka_error_rx2) = prom::StatsContext::create_future_producer(&kafka_config) .context("failed to create kafka producer")?; + let mut kafka_error = false; + let kafka_error_rx = futures::future::join(kafka_error_rx1, kafka_error_rx2); + tokio::pin!(kafka_error_rx); + // dedup let dedup = config.backend.create().await?; @@ -109,6 +113,10 @@ impl ArgsAction { loop { let message = tokio::select! { _ = &mut shutdown => break, + _ = &mut kafka_error_rx => { + kafka_error = true; + break; + } maybe_result = send_tasks.join_next() => match maybe_result { Some(result) => { result??; @@ -116,6 +124,10 @@ impl ArgsAction { } None => tokio::select! { _ = &mut shutdown => break, + _ = &mut kafka_error_rx => { + kafka_error = true; + break; + } message = consumer.recv() => message, } }, @@ -175,6 +187,10 @@ impl ArgsAction { if send_tasks.len() >= config.kafka_queue_size { tokio::select! { _ = &mut shutdown => break, + _ = &mut kafka_error_rx => { + kafka_error = true; + break; + } result = send_tasks.join_next() => { if let Some(result) = result { result??; @@ -183,9 +199,17 @@ impl ArgsAction { } } } - warn!("shutdown received..."); - while let Some(result) = send_tasks.join_next().await { - result??; + if !kafka_error { + warn!("shutdown received..."); + loop { + tokio::select! { + _ = &mut kafka_error_rx => break, + result = send_tasks.join_next() => match result { + Some(result) => result??, + None => break + } + } + } } Ok(()) } @@ -200,8 +224,10 @@ impl ArgsAction { } // Connect to kafka - let kafka = prom::StatsContext::create_future_producer(&kafka_config) + let (kafka, kafka_error_rx) = prom::StatsContext::create_future_producer(&kafka_config) .context("failed to create kafka producer")?; + let mut kafka_error = false; + tokio::pin!(kafka_error_rx); // Create gRPC client & subscribe let mut client = GeyserGrpcClient::connect_with_timeout( @@ -220,13 +246,21 @@ impl ArgsAction { loop { let message = tokio::select! { _ = &mut shutdown => break, + _ = &mut kafka_error_rx => { + kafka_error = true; + break; + } maybe_result = send_tasks.join_next() => match maybe_result { Some(result) => { - let _ = result??; + result??; continue; } None => tokio::select! { _ = &mut shutdown => break, + _ = &mut kafka_error_rx => { + kafka_error = true; + break; + } message = geyser.next() => message, } }, @@ -264,13 +298,17 @@ impl ArgsAction { let result = future.await; debug!("kafka send message with key: {key}, result: {result:?}"); - let result = result?.map_err(|(error, _message)| error)?; + let _ = result?.map_err(|(error, _message)| error)?; prom::sent_inc(prom_kind); - Ok::<(i32, i64), anyhow::Error>(result) + Ok::<(), anyhow::Error>(()) }); if send_tasks.len() >= config.kafka_queue_size { tokio::select! { _ = &mut shutdown => break, + _ = &mut kafka_error_rx => { + kafka_error = true; + break; + } result = send_tasks.join_next() => { if let Some(result) = result { result??; @@ -285,9 +323,17 @@ impl ArgsAction { None => break, } } - warn!("shutdown received..."); - while let Some(result) = send_tasks.join_next().await { - let _ = result??; + if !kafka_error { + warn!("shutdown received..."); + loop { + tokio::select! { + _ = &mut kafka_error_rx => break, + result = send_tasks.join_next() => match result { + Some(result) => result??, + None => break + } + } + } } Ok(()) } @@ -303,13 +349,19 @@ impl ArgsAction { let (grpc_tx, grpc_shutdown) = GrpcService::run(config.listen, config.channel_capacity)?; - let consumer = prom::StatsContext::create_stream_consumer(&kafka_config) + let (consumer, kafka_error_rx) = prom::StatsContext::create_stream_consumer(&kafka_config) .context("failed to create kafka consumer")?; + let mut kafka_error = false; + tokio::pin!(kafka_error_rx); consumer.subscribe(&[&config.kafka_topic])?; loop { let message = tokio::select! { _ = &mut shutdown => break, + _ = &mut kafka_error_rx => { + kafka_error = true; + break + }, message = consumer.recv() => message?, }; prom::recv_inc(); @@ -330,7 +382,9 @@ impl ArgsAction { } } - warn!("shutdown received..."); + if !kafka_error { + warn!("shutdown received..."); + } Ok(grpc_shutdown.await??) } } diff --git a/yellowstone-grpc-tools/src/kafka/prom.rs b/yellowstone-grpc-tools/src/kafka/prom.rs index 9fa48cd7..7fa87624 100644 --- a/yellowstone-grpc-tools/src/kafka/prom.rs +++ b/yellowstone-grpc-tools/src/kafka/prom.rs @@ -2,13 +2,15 @@ use { crate::prom::GprcMessageKind, prometheus::{GaugeVec, IntCounter, IntCounterVec, Opts}, rdkafka::{ - client::ClientContext, - config::{ClientConfig, FromClientConfigAndContext}, + client::{ClientContext, DefaultClientContext}, + config::{ClientConfig, FromClientConfigAndContext, RDKafkaLogLevel}, consumer::{ConsumerContext, StreamConsumer}, - error::KafkaResult, + error::{KafkaError, KafkaResult}, producer::FutureProducer, statistics::Statistics, }, + std::sync::Mutex, + tokio::sync::oneshot, }; lazy_static::lazy_static! { @@ -31,8 +33,30 @@ lazy_static::lazy_static! { ).unwrap(); } -#[derive(Debug, Default, Clone, Copy)] -pub struct StatsContext; +#[derive(Debug)] +pub struct StatsContext { + default: DefaultClientContext, + error_tx: Mutex>>, +} + +impl StatsContext { + fn new() -> (Self, oneshot::Receiver<()>) { + let (error_tx, error_rx) = oneshot::channel(); + ( + Self { + default: DefaultClientContext, + error_tx: Mutex::new(Some(error_tx)), + }, + error_rx, + ) + } + + fn send_error(&self) { + if let Some(error_tx) = self.error_tx.lock().expect("alive mutex").take() { + let _ = error_tx.send(()); + } + } +} impl ClientContext for StatsContext { fn stats(&self, statistics: Statistics) { @@ -89,17 +113,43 @@ impl ClientContext for StatsContext { } } } + + fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) { + self.default.log(level, fac, log_message); + if matches!( + level, + RDKafkaLogLevel::Emerg + | RDKafkaLogLevel::Alert + | RDKafkaLogLevel::Critical + | RDKafkaLogLevel::Error + ) { + self.send_error() + } + } + + fn error(&self, error: KafkaError, reason: &str) { + self.default.error(error, reason); + self.send_error() + } } impl ConsumerContext for StatsContext {} impl StatsContext { - pub fn create_future_producer(config: &ClientConfig) -> KafkaResult> { - FutureProducer::from_config_and_context(config, Self) + pub fn create_future_producer( + config: &ClientConfig, + ) -> KafkaResult<(FutureProducer, oneshot::Receiver<()>)> { + let (context, error_rx) = Self::new(); + FutureProducer::from_config_and_context(config, context) + .map(|producer| (producer, error_rx)) } - pub fn create_stream_consumer(config: &ClientConfig) -> KafkaResult> { - StreamConsumer::from_config_and_context(config, Self) + pub fn create_stream_consumer( + config: &ClientConfig, + ) -> KafkaResult<(StreamConsumer, oneshot::Receiver<()>)> { + let (context, error_rx) = Self::new(); + StreamConsumer::from_config_and_context(config, context) + .map(|consumer| (consumer, error_rx)) } }