From 10ba7223ea24454105b294a4ff3cd9c39ac13efc Mon Sep 17 00:00:00 2001 From: Nagaprasadvr Date: Thu, 17 Oct 2024 20:37:05 +0530 Subject: [PATCH] refactor: add structs with builder for SubscriptionTask and add better shutdown handling --- grpc-ingest/config-grpc2redis.yml | 64 ++++---- grpc-ingest/src/config.rs | 93 +++++++---- grpc-ingest/src/grpc.rs | 248 +++++++++++++++++++----------- grpc-ingest/src/prom.rs | 22 +-- 4 files changed, 259 insertions(+), 168 deletions(-) diff --git a/grpc-ingest/config-grpc2redis.yml b/grpc-ingest/config-grpc2redis.yml index 8f61c9e7a..67d5fa22b 100644 --- a/grpc-ingest/config-grpc2redis.yml +++ b/grpc-ingest/config-grpc2redis.yml @@ -1,51 +1,59 @@ prometheus: 0.0.0.0:8873 -geyser_endpoint: http://127.0.0.1:10000 -x_token: null -commitment: finalized -max_concurrency: 5 +geyser: + endpoint: http://127.0.0.1:10000 + x_token: null + commitment: finalized + connection_timeout: 10 + timeout: 10 subscriptions: token-metadata: stream: name: !ACCOUNTS max_len: 100_000_000 - filter: !ACCOUNTS - owner: - - metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s + max_concurrency: 2 + filter: + accounts: + owner: + - metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s token-extension: stream: name: !ACCOUNTS max_len: 100_000_000 - filter: !ACCOUNTS - owner: - - TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb + max_concurrency: 2 + filter: + accounts: + owner: + - TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb token: stream: name: !ACCOUNTS max_len: 100_000_000 - filter: !ACCOUNTS - owner: - - TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA + max_concurrency: 5 + filter: + accounts: + owner: + - TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA tcomp: stream: name: !ACCOUNTS max_len: 100_000_000 - filter: !ACCOUNTS - owner: - - CoREENxT6tW1HoK8ypY1SxRMZTcVPm7R94rH4PZNhX7d - bubblegum-accounts: + max_concurrency: 2 + filter: + accounts: + owner: + - CoREENxT6tW1HoK8ypY1SxRMZTcVPm7R94rH4PZNhX7d + bubblegum: stream: - name: !ACCOUNTS + name: !ACCOUNTS_WITH_TRANSACTIONS max_len: 100_000_000 - filter: !ACCOUNTS - owner: - - BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY - bubblegum-transactions: - stream: - name: !TRANSACTIONS - max_len: 10_000_000 - filter: !TRANSACTIONS - account_include: - - BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY + max_concurrency: 2 + filter: + accounts: + owner: + - BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY + transactions: + account_include: + - BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY redis: url: redis://localhost:6379 diff --git a/grpc-ingest/src/config.rs b/grpc-ingest/src/config.rs index 14c23fe62..b6a1e610c 100644 --- a/grpc-ingest/src/config.rs +++ b/grpc-ingest/src/config.rs @@ -120,27 +120,71 @@ pub struct ConfigPrometheus { pub prometheus: Option, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Default)] +pub struct ConfigGeyser { + pub endpoint: String, + pub x_token: Option, + #[serde(default = "ConfigGeyser::default_commitment")] + pub commitment: ConfigGrpcRequestCommitment, + #[serde( + default = "ConfigGeyser::default_connection_timeout", + deserialize_with = "deserialize_usize_str" + )] + pub connect_timeout: usize, + #[serde( + default = "ConfigGeyser::default_timeout", + deserialize_with = "deserialize_usize_str" + )] + pub timeout: usize, +} + +impl ConfigGeyser { + pub const fn default_commitment() -> ConfigGrpcRequestCommitment { + ConfigGrpcRequestCommitment::Finalized + } + + pub const fn default_connection_timeout() -> usize { + 10 + } + + pub const fn default_timeout() -> usize { + 10 + } +} + +#[derive(Debug, Clone, Deserialize, Default)] #[serde(rename_all = "UPPERCASE")] pub enum StreamName { + #[default] Accounts, Transactions, + #[serde(rename = "ACCOUNTS_WITH_TRANSACTIONS")] + AccountsWithTransactions, } -#[derive(Debug, Clone, Deserialize)] -pub struct StreamConfig { +#[derive(Debug, Clone, Deserialize, Default)] +pub struct ConfigStream { pub name: StreamName, #[serde( - default = "StreamConfig::default_stream_maxlen", + default = "ConfigStream::default_stream_maxlen", deserialize_with = "deserialize_usize_str" )] pub max_len: usize, + #[serde( + default = "ConfigStream::default_max_concurrency", + deserialize_with = "deserialize_usize_str" + )] + pub max_concurrency: usize, } -impl StreamConfig { +impl ConfigStream { pub const fn default_stream_maxlen() -> usize { 10_000_000 } + + pub const fn default_max_concurrency() -> usize { + 10 + } } impl ToString for StreamName { @@ -148,50 +192,35 @@ impl ToString for StreamName { match self { StreamName::Accounts => "ACCOUNTS".to_string(), StreamName::Transactions => "TRANSACTIONS".to_string(), + StreamName::AccountsWithTransactions => "ACCOUNTS_WITH_TRANSACTIONS".to_string(), } } } -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "UPPERCASE")] -pub enum ConfigGrpcRequestFilter { - Transactions(ConfigGrpcRequestTransactions), - Accounts(ConfigGrpcRequestAccounts), +#[derive(Debug, Clone, Deserialize, Default)] +pub struct ConfigGrpcRequestFilter { + pub accounts: Option, + pub transactions: Option, } -#[derive(Debug, Clone, Deserialize)] -pub struct SubscriptionConfig { - pub stream: StreamConfig, +#[derive(Debug, Clone, Deserialize, Default)] +pub struct ConfigSubscription { + pub stream: ConfigStream, pub filter: ConfigGrpcRequestFilter, } -pub type ConfigGrpcSubscriptions = HashMap; +pub type ConfigGrpcSubscriptions = HashMap; -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Default)] pub struct ConfigGrpc { - pub x_token: Option, + pub geyser: ConfigGeyser, - pub commitment: ConfigGrpcRequestCommitment, pub subscriptions: ConfigGrpcSubscriptions, - pub geyser_endpoint: String, - pub redis: ConfigGrpcRedis, - - #[serde( - default = "ConfigGrpc::default_max_concurrency", - deserialize_with = "deserialize_usize_str" - )] - pub max_concurrency: usize, } -impl ConfigGrpc { - pub const fn default_max_concurrency() -> usize { - 10 - } -} - -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Default)] pub struct ConfigGrpcRedis { pub url: String, #[serde( diff --git a/grpc-ingest/src/grpc.rs b/grpc-ingest/src/grpc.rs index b7ed5b437..a5e2c88f7 100644 --- a/grpc-ingest/src/grpc.rs +++ b/grpc-ingest/src/grpc.rs @@ -1,10 +1,7 @@ use { crate::{ - config::{ConfigGrpc, ConfigGrpcRequestFilter, StreamConfig, SubscriptionConfig}, - prom::{ - grpc_subscription_task_inc, grpc_tasks_total_dec, grpc_tasks_total_inc, - redis_xadd_status_inc, - }, + config::{ConfigGrpc, ConfigGrpcRequestFilter, ConfigStream, ConfigSubscription}, + prom::{grpc_tasks_total_dec, grpc_tasks_total_inc, redis_xadd_status_inc}, redis::TrackedPipeline, util::create_shutdown, }, @@ -12,7 +9,7 @@ use { futures::{stream::StreamExt, SinkExt}, redis::streams::StreamMaxlen, std::{collections::HashMap, sync::Arc, time::Duration}, - tokio::{sync::Mutex, task::JoinHandle, time::sleep}, + tokio::{sync::Mutex, time::sleep}, topograph::{ executor::{Executor, Nonblock, Tokio}, prelude::*, @@ -38,9 +35,9 @@ enum GrpcJob { #[derive(Clone)] pub struct GrpcJobHandler { connection: redis::aio::MultiplexedConnection, - stream_config: Arc, + stream_config: Arc, pipe: Arc>, - subscription_label: String, + label: String, } impl<'a> AsyncHandler>> @@ -56,9 +53,9 @@ impl<'a> AsyncHandler AsyncHandler { @@ -108,10 +101,6 @@ impl<'a> AsyncHandler { warn!(target: "grpc2redis", action = "unknown_update_variant", message = "Unknown update variant") @@ -121,85 +110,153 @@ impl<'a> AsyncHandler anyhow::Result<()> { let redis_client = redis::Client::open(config.redis.url.clone())?; - let config = Arc::new(config); let connection = redis_client.get_multiplexed_tokio_connection().await?; - let pipe = Arc::new(Mutex::new(TrackedPipeline::default())); - - let subscriptions = config.subscriptions.clone(); - - let mut subscription_tasks: Vec>> = - Vec::with_capacity(subscriptions.len()); - - for subscription in subscriptions { - let subscription_label = subscription.0.clone(); - let subscription_config = Arc::new(subscription.1); - let config = Arc::clone(&config); - let connection = connection.clone(); - let pipe = Arc::clone(&pipe); - let subscription_task_handle: JoinHandle> = tokio::spawn( - async move { - let mut shutdown = create_shutdown()?; - let SubscriptionConfig { stream, filter } = subscription_config.as_ref(); - - let stream_config = Arc::new(stream.clone()); - let mut accounts = HashMap::with_capacity(1); - let mut transactions = HashMap::with_capacity(1); - - match filter { - ConfigGrpcRequestFilter::Accounts(accounts_filter) => { - accounts.insert( - subscription_label.clone(), - accounts_filter.clone().to_proto(), - ); - } - ConfigGrpcRequestFilter::Transactions(transactions_filter) => { - transactions.insert( - subscription_label.clone(), - transactions_filter.clone().to_proto(), - ); - } - } + let mut shutdown = create_shutdown()?; + + let subscription_tasks = SubscriptionTask::build() + .config(config) + .pipeline(TrackedPipeline::default()) + .connection(connection) + .start() + .await?; + + if let Some(signal) = shutdown.next().await { + warn!( + target: "grpc2redis", + action = "shutdown_signal_received", + message = "Shutdown signal received, waiting for spawned tasks to complete", + signal = ?signal + ); + } + + let res = futures::future::join_all( + subscription_tasks + .into_iter() + .map(|task| task.stop()) + .collect::>(), + ) + .await; + + res.into_iter().collect::>()?; + + Ok(()) +} + +#[derive(Default)] +pub struct SubscriptionTask { + pub config: Arc, + pub pipe: Option>>, + pub connection: Option, +} + +impl SubscriptionTask { + pub fn build() -> Self { + Self::default() + } + + pub fn config(mut self, config: ConfigGrpc) -> Self { + self.config = Arc::new(config); + self + } + + pub fn pipeline(mut self, pipe: TrackedPipeline) -> Self { + self.pipe = Some(Arc::new(Mutex::new(pipe))); + self + } + + pub fn connection(mut self, connection: redis::aio::MultiplexedConnection) -> Self { + self.connection = Some(connection); + self + } - let request = SubscribeRequest { - accounts, - transactions, - ..Default::default() - }; - - let mut dragon_mouth_client = - GeyserGrpcClient::build_from_shared(config.geyser_endpoint.clone())? - .x_token(config.x_token.clone())? - .connect_timeout(Duration::from_secs(10)) - .timeout(Duration::from_secs(10)) + pub async fn start(mut self) -> anyhow::Result> { + let mut subscription_tasks: Vec = Vec::new(); + + let config = Arc::clone(&self.config); + let connection = self + .connection + .take() + .expect("Redis Connection is required"); + let pipe = self.pipe.take().expect("Pipeline is required"); + + let subscriptions = config.subscriptions.clone(); + + for subscription in subscriptions { + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + let label = subscription.0.clone(); + let subscription_config = Arc::new(subscription.1); + let config = Arc::clone(&config); + let connection = connection.clone(); + let pipe = Arc::clone(&pipe); + + let ConfigSubscription { stream, filter } = subscription_config.as_ref().clone(); + + let stream_config = Arc::new(stream.clone()); + let mut req_accounts = HashMap::with_capacity(1); + let mut req_transactions = HashMap::with_capacity(1); + + let ConfigGrpcRequestFilter { + accounts, + transactions, + } = filter; + + if let Some(accounts) = accounts { + req_accounts.insert(label.clone(), accounts.to_proto()); + } + + if let Some(transactions) = transactions { + req_transactions.insert(label.clone(), transactions.to_proto()); + } + + let request = SubscribeRequest { + accounts: req_accounts, + transactions: req_transactions, + ..Default::default() + }; + + let mut dragon_mouth_client = + GeyserGrpcClient::build_from_shared(config.geyser.endpoint.clone())? + .x_token(config.geyser.x_token.clone())? + .connect_timeout(Duration::from_secs(config.geyser.connect_timeout as u64)) + .timeout(Duration::from_secs(config.geyser.timeout as u64)) .connect() .await - .context("failed to connect to gRPC")?; + .context("failed to connect to gRPC").map_err(|err| { + error!(target: "grpc2redis", action = "grpc_connection_failed", message = "Failed to connect to gRPC", ?err); + err + })?; - let (mut subscribe_tx, stream) = dragon_mouth_client + let (mut subscribe_tx, stream) = dragon_mouth_client .subscribe_with_request(Some(request)) - .await?; - - tokio::pin!(stream); + .await.map_err(|err| { + error!(target: "grpc2redis", action = "subscribe_failed", message = "Failed to subscribe", ?err); + err + })?; - let exec = Executor::builder(Nonblock(Tokio)) - .max_concurrency(Some(config.max_concurrency)) + let exec = Executor::builder(Nonblock(Tokio)) + .max_concurrency(Some(stream_config.max_concurrency)) .build_async(GrpcJobHandler { stream_config: Arc::clone(&stream_config), connection: connection.clone(), pipe: Arc::clone(&pipe), - subscription_label: subscription_label.clone(), + label: label.clone(), + }).map_err(|err| { + warn!(target: "grpc2redis", action = "executor_failed", message = "Failed to create executor", ?err); + err })?; - let deadline_config = Arc::clone(&config); + let deadline_config = Arc::clone(&config); + let control = tokio::spawn(async move { + tokio::pin!(stream); loop { tokio::select! { _ = sleep(deadline_config.redis.pipeline_max_idle) => { @@ -239,7 +296,8 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { } } } - _ = shutdown.next() => { + _ = &mut shutdown_rx => { + debug!(target: "grpc2redis", action = "shutdown_signal_received", message = "Shutdown signal received, stopping subscription task", ?label); exec.push(GrpcJob::FlushRedisPipe); break; } @@ -247,24 +305,32 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { } exec.join_async().await; + }); - Ok(()) - }, - ); + subscription_tasks.push(SubscriptionTaskStop { + shutdown_tx, + control, + }); + } - subscription_tasks.push(subscription_task_handle); + Ok(subscription_tasks) } +} - // Wait for all subscription tasks to finish - let task_results = futures::future::join_all(subscription_tasks).await; +#[derive(Debug)] +pub struct SubscriptionTaskStop { + pub shutdown_tx: tokio::sync::oneshot::Sender<()>, + pub control: tokio::task::JoinHandle<()>, +} - for task_result in task_results { - if let Err(e) = task_result { - error!(target: "grpc2redis", action = "subscription_task_error", message = "Subscription task failed", ?e); - } else if let Ok(Err(e)) = task_result { - error!(target: "grpc2redis", action = "subscription_task_error", message = "Subscription task failed", ?e); - } - } +impl SubscriptionTaskStop { + pub async fn stop(self) -> anyhow::Result<()> { + self.shutdown_tx + .send(()) + .map_err(|_| anyhow::anyhow!("Failed to send shutdown signal"))?; - Ok(()) + self.control.await?; + + Ok(()) + } } diff --git a/grpc-ingest/src/prom.rs b/grpc-ingest/src/prom.rs index ea93dc6b3..ba5208586 100644 --- a/grpc-ingest/src/prom.rs +++ b/grpc-ingest/src/prom.rs @@ -70,12 +70,7 @@ lazy_static::lazy_static! { static ref GRPC_TASKS: IntGaugeVec = IntGaugeVec::new( Opts::new("grpc_tasks", "Number of tasks spawned for writing grpc messages to redis "), - &[] - ).unwrap(); - - static ref GRPC_SUBSCRIPTION_TASK: IntGaugeVec = IntGaugeVec::new( - Opts::new("grpc_subscription_task", "Number of tasks spawned for writing grpc messages to redis "), - &["subscription_label", "stream"] + &["label","stream"] ).unwrap(); } @@ -102,7 +97,6 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { register!(INGEST_TASKS); register!(ACK_TASKS); register!(GRPC_TASKS); - register!(GRPC_SUBSCRIPTION_TASK); VERSION_INFO_METRIC .with_label_values(&[ @@ -209,18 +203,12 @@ pub fn ack_tasks_total_dec(stream: &str) { ACK_TASKS.with_label_values(&[stream]).dec() } -pub fn grpc_tasks_total_inc() { - GRPC_TASKS.with_label_values(&[]).inc() +pub fn grpc_tasks_total_inc(label: &str, stream: &str) { + GRPC_TASKS.with_label_values(&[label, stream]).inc() } -pub fn grpc_tasks_total_dec() { - GRPC_TASKS.with_label_values(&[]).dec() -} - -pub fn grpc_subscription_task_inc(subscription_label: &str, stream: &str) { - GRPC_SUBSCRIPTION_TASK - .with_label_values(&[subscription_label, stream]) - .inc() +pub fn grpc_tasks_total_dec(label: &str, stream: &str) { + GRPC_TASKS.with_label_values(&[label, stream]).dec() } #[derive(Debug, Clone, Copy)]