diff --git a/grpc-ingest/src/grpc.rs b/grpc-ingest/src/grpc.rs index 82f9237d5..59eda0492 100644 --- a/grpc-ingest/src/grpc.rs +++ b/grpc-ingest/src/grpc.rs @@ -4,9 +4,9 @@ use { util::create_shutdown, }, anyhow::Context, - futures::stream::StreamExt, + futures::{channel::mpsc::SendError, stream::StreamExt, Sink, SinkExt}, redis::streams::StreamMaxlen, - std::{collections::HashMap, sync::Arc, time::Duration}, + std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}, tokio::{sync::Mutex, time::sleep}, topograph::{ executor::{Executor, Nonblock, Tokio}, @@ -16,23 +16,28 @@ use { tracing::{debug, warn}, yellowstone_grpc_client::GeyserGrpcClient, yellowstone_grpc_proto::{ - geyser::{SubscribeRequest, SubscribeUpdate}, + geyser::{SubscribeRequest, SubscribeRequestPing, SubscribeUpdate}, prelude::subscribe_update::UpdateOneof, prost::Message, }, yellowstone_grpc_tools::config::GrpcRequestToProto, }; +const PING_ID: i32 = 0; + enum GrpcJob { FlushRedisPipe, ProcessSubscribeUpdate(Box), } +type SubscribeTx = Pin + Send + Sync>>; + #[derive(Clone)] pub struct GrpcJobHandler { connection: redis::aio::MultiplexedConnection, config: Arc, pipe: Arc>, + subscribe_tx: Arc>, } impl<'a> AsyncHandler>> @@ -49,6 +54,8 @@ impl<'a> AsyncHandler { @@ -98,6 +105,29 @@ impl<'a> AsyncHandler { + subscribe_tx + .lock() + .await + .send(SubscribeRequest { + ping: Some(SubscribeRequestPing { id: PING_ID }), + ..Default::default() + }) + .await + .map_err(|err| { + warn!(message = "Failed to send ping", ?err); + }) + .ok(); + + debug!(message = "Ping", id = PING_ID); + } + UpdateOneof::Pong(pong) => { + if pong.id == PING_ID { + debug!(message = "Pong", id = PING_ID); + } else { + warn!(message = "Unknown pong id", id = pong.id); + } + } var => warn!(message = "Unknown update variant", ?var), } } @@ -144,9 +174,10 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { .await .context("failed to connect to gRPC")?; - let (_subscribe_tx, stream) = dragon_mouth_client + let (subscribe_tx, stream) = dragon_mouth_client .subscribe_with_request(Some(request)) .await?; + tokio::pin!(stream); let exec = Executor::builder(Nonblock(Tokio)) @@ -155,6 +186,7 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { config: Arc::clone(&config), connection: connection.clone(), pipe: Arc::clone(&pipe), + subscribe_tx: Arc::new(Mutex::new(Box::pin(subscribe_tx))), })?; let deadline_config = Arc::clone(&config);