Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: correctly handle SIGINT in kafka #219

Merged
merged 2 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ The minor version will be incremented upon a breaking change and the patch versi

### Fixes

- Trigger end of startup when parent slot 0 seen in `update_slot_status` notification because `notify_end_of_startup` is not triggered when cluster started from genesis ([#207](https://github.com/rpcpool/yellowstone-grpc/pull/207))
- geyser: trigger end of startup when parent slot 0 seen in `update_slot_status` notification because `notify_end_of_startup` is not triggered when cluster started from genesis ([#207](https://github.com/rpcpool/yellowstone-grpc/pull/207))
- tools: correctly handle SIGINT in kafka ([#219](https://github.com/rpcpool/yellowstone-grpc/pull/219))

### Features

Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion yellowstone-grpc-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ hyper = { version = "0.14.27", features = ["server"] }
json5 = "0.4.1"
lazy_static = "1.4.0"
prometheus = "0.13.2"
rdkafka = { version = "0.33.2", features = ["ssl", "sasl"] }
rdkafka = { version = "0.34.0", features = ["ssl", "sasl"] }
serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0.86"
serde_yaml = "0.9.25"
Expand Down
82 changes: 68 additions & 14 deletions yellowstone-grpc-tools/src/bin/grpc-kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,18 @@ impl ArgsAction {
}

// input
let consumer = prom::StatsContext::create_stream_consumer(&kafka_config)
let (consumer, kafka_error_rx1) = prom::StatsContext::create_stream_consumer(&kafka_config)
.context("failed to create kafka consumer")?;
consumer.subscribe(&[&config.kafka_input])?;

// output
let kafka = prom::StatsContext::create_future_producer(&kafka_config)
let (kafka, kafka_error_rx2) = prom::StatsContext::create_future_producer(&kafka_config)
.context("failed to create kafka producer")?;

let mut kafka_error = false;
let kafka_error_rx = futures::future::join(kafka_error_rx1, kafka_error_rx2);
tokio::pin!(kafka_error_rx);

// dedup
let dedup = config.backend.create().await?;

Expand All @@ -109,13 +113,21 @@ impl ArgsAction {
loop {
let message = tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
maybe_result = send_tasks.join_next() => match maybe_result {
Some(result) => {
result??;
continue;
}
None => tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
message = consumer.recv() => message,
}
},
Expand Down Expand Up @@ -175,6 +187,10 @@ impl ArgsAction {
if send_tasks.len() >= config.kafka_queue_size {
tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
result = send_tasks.join_next() => {
if let Some(result) = result {
result??;
Expand All @@ -183,9 +199,17 @@ impl ArgsAction {
}
}
}
warn!("shutdown received...");
while let Some(result) = send_tasks.join_next().await {
result??;
if !kafka_error {
warn!("shutdown received...");
loop {
tokio::select! {
_ = &mut kafka_error_rx => break,
result = send_tasks.join_next() => match result {
Some(result) => result??,
None => break
}
}
}
}
Ok(())
}
Expand All @@ -200,8 +224,10 @@ impl ArgsAction {
}

// Connect to kafka
let kafka = prom::StatsContext::create_future_producer(&kafka_config)
let (kafka, kafka_error_rx) = prom::StatsContext::create_future_producer(&kafka_config)
.context("failed to create kafka producer")?;
let mut kafka_error = false;
tokio::pin!(kafka_error_rx);

// Create gRPC client & subscribe
let mut client = GeyserGrpcClient::connect_with_timeout(
Expand All @@ -220,13 +246,21 @@ impl ArgsAction {
loop {
let message = tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
maybe_result = send_tasks.join_next() => match maybe_result {
Some(result) => {
let _ = result??;
result??;
continue;
}
None => tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
message = geyser.next() => message,
}
},
Expand Down Expand Up @@ -264,13 +298,17 @@ impl ArgsAction {
let result = future.await;
debug!("kafka send message with key: {key}, result: {result:?}");

let result = result?.map_err(|(error, _message)| error)?;
let _ = result?.map_err(|(error, _message)| error)?;
prom::sent_inc(prom_kind);
Ok::<(i32, i64), anyhow::Error>(result)
Ok::<(), anyhow::Error>(())
});
if send_tasks.len() >= config.kafka_queue_size {
tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break;
}
result = send_tasks.join_next() => {
if let Some(result) = result {
result??;
Expand All @@ -285,9 +323,17 @@ impl ArgsAction {
None => break,
}
}
warn!("shutdown received...");
while let Some(result) = send_tasks.join_next().await {
let _ = result??;
if !kafka_error {
warn!("shutdown received...");
loop {
tokio::select! {
_ = &mut kafka_error_rx => break,
result = send_tasks.join_next() => match result {
Some(result) => result??,
None => break
}
}
}
}
Ok(())
}
Expand All @@ -303,13 +349,19 @@ impl ArgsAction {

let (grpc_tx, grpc_shutdown) = GrpcService::run(config.listen, config.channel_capacity)?;

let consumer = prom::StatsContext::create_stream_consumer(&kafka_config)
let (consumer, kafka_error_rx) = prom::StatsContext::create_stream_consumer(&kafka_config)
.context("failed to create kafka consumer")?;
let mut kafka_error = false;
tokio::pin!(kafka_error_rx);
consumer.subscribe(&[&config.kafka_topic])?;

loop {
let message = tokio::select! {
_ = &mut shutdown => break,
_ = &mut kafka_error_rx => {
kafka_error = true;
break
},
message = consumer.recv() => message?,
};
prom::recv_inc();
Expand All @@ -330,7 +382,9 @@ impl ArgsAction {
}
}

warn!("shutdown received...");
if !kafka_error {
warn!("shutdown received...");
}
Ok(grpc_shutdown.await??)
}
}
Expand Down
68 changes: 59 additions & 9 deletions yellowstone-grpc-tools/src/kafka/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use {
crate::prom::GprcMessageKind,
prometheus::{GaugeVec, IntCounter, IntCounterVec, Opts},
rdkafka::{
client::ClientContext,
config::{ClientConfig, FromClientConfigAndContext},
client::{ClientContext, DefaultClientContext},
config::{ClientConfig, FromClientConfigAndContext, RDKafkaLogLevel},
consumer::{ConsumerContext, StreamConsumer},
error::KafkaResult,
error::{KafkaError, KafkaResult},
producer::FutureProducer,
statistics::Statistics,
},
std::sync::Mutex,
tokio::sync::oneshot,
};

lazy_static::lazy_static! {
Expand All @@ -31,8 +33,30 @@ lazy_static::lazy_static! {
).unwrap();
}

#[derive(Debug, Default, Clone, Copy)]
pub struct StatsContext;
#[derive(Debug)]
pub struct StatsContext {
default: DefaultClientContext,
error_tx: Mutex<Option<oneshot::Sender<()>>>,
}

impl StatsContext {
fn new() -> (Self, oneshot::Receiver<()>) {
let (error_tx, error_rx) = oneshot::channel();
(
Self {
default: DefaultClientContext,
error_tx: Mutex::new(Some(error_tx)),
},
error_rx,
)
}

fn send_error(&self) {
if let Some(error_tx) = self.error_tx.lock().expect("alive mutex").take() {
let _ = error_tx.send(());
}
}
}

impl ClientContext for StatsContext {
fn stats(&self, statistics: Statistics) {
Expand Down Expand Up @@ -89,17 +113,43 @@ impl ClientContext for StatsContext {
}
}
}

fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
self.default.log(level, fac, log_message);
if matches!(
level,
RDKafkaLogLevel::Emerg
| RDKafkaLogLevel::Alert
| RDKafkaLogLevel::Critical
| RDKafkaLogLevel::Error
) {
self.send_error()
}
}

fn error(&self, error: KafkaError, reason: &str) {
self.default.error(error, reason);
self.send_error()
}
}

impl ConsumerContext for StatsContext {}

impl StatsContext {
pub fn create_future_producer(config: &ClientConfig) -> KafkaResult<FutureProducer<Self>> {
FutureProducer::from_config_and_context(config, Self)
pub fn create_future_producer(
config: &ClientConfig,
) -> KafkaResult<(FutureProducer<Self>, oneshot::Receiver<()>)> {
let (context, error_rx) = Self::new();
FutureProducer::from_config_and_context(config, context)
.map(|producer| (producer, error_rx))
}

pub fn create_stream_consumer(config: &ClientConfig) -> KafkaResult<StreamConsumer<Self>> {
StreamConsumer::from_config_and_context(config, Self)
pub fn create_stream_consumer(
config: &ClientConfig,
) -> KafkaResult<(StreamConsumer<Self>, oneshot::Receiver<()>)> {
let (context, error_rx) = Self::new();
StreamConsumer::from_config_and_context(config, context)
.map(|consumer| (consumer, error_rx))
}
}

Expand Down
Loading