diff --git a/digital_asset_types/src/dapi/change_logs.rs b/digital_asset_types/src/dapi/change_logs.rs index 7023ad12f..fdd19ddf9 100644 --- a/digital_asset_types/src/dapi/change_logs.rs +++ b/digital_asset_types/src/dapi/change_logs.rs @@ -200,7 +200,7 @@ fn build_asset_proof( tree_id: Vec, leaf_node_idx: i64, leaf_hash: Vec, - req_indexes: &Vec, + req_indexes: &[i64], required_nodes: &[SimpleChangeLog], ) -> AssetProof { let mut final_node_list = vec![SimpleChangeLog::default(); req_indexes.len()]; @@ -211,7 +211,7 @@ fn build_asset_proof( } for (i, (n, nin)) in final_node_list .iter_mut() - .zip(req_indexes.clone()) + .zip(req_indexes.to_owned()) .enumerate() { if *n == SimpleChangeLog::default() { diff --git a/grpc-ingest/config-ingester.yml b/grpc-ingest/config-ingester.yml index 0dde2dfd4..123d2e16f 100644 --- a/grpc-ingest/config-ingester.yml +++ b/grpc-ingest/config-ingester.yml @@ -13,9 +13,6 @@ redis: xack_batch_max_size: 100 xack_batch_max_idle_ms: 10 xack_max_in_process: 100 - - type: metadata_json - stream: METADATA_JSONS - data_key: data prefetch_queue_size: 1_000 # max number of messages available in the read queue for processing xpending_max: 250 # used for reading pending messages xpending_only: false # exit once all pending messages consumed (should be applied if you want downscale number of ingesters) diff --git a/grpc-ingest/src/grpc.rs b/grpc-ingest/src/grpc.rs index af031318f..97ba2b406 100644 --- a/grpc-ingest/src/grpc.rs +++ b/grpc-ingest/src/grpc.rs @@ -14,6 +14,7 @@ use { Value as RedisValue, }, serde::de, + sqlx::pool, std::{collections::HashMap, num::NonZeroUsize, sync::Arc, time::Duration}, tokio::{ spawn, @@ -37,11 +38,11 @@ use { }; enum GrpcJob { - FlushRedisPipe(Arc>, MultiplexedConnection), - ProcessSubscribeUpdate(Arc>, SubscribeUpdate), + FlushRedisPipe, + ProcessSubscribeUpdate(Box), } -pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { +pub async fn run_v2(config: ConfigGrpc) -> anyhow::Result<()> { let redis_client = redis::Client::open(config.redis.url.clone())?; let config = Arc::new(config); let connection = redis_client.get_multiplexed_tokio_connection().await?; @@ -74,99 +75,103 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { .await .context("failed to connect to gRPC")?; - let (_subscribe_tx, mut stream) = dragon_mouth_client + let (_subscribe_tx, stream) = dragon_mouth_client .subscribe_with_request(Some(request)) .await?; + tokio::pin!(stream); let pool_config = Arc::clone(&config); + let pool_connection = connection.clone(); + let pool_pipe = Arc::clone(&pipe); - let exec = Executor::builder(Nonblock(Tokio)) - .num_threads(None) - .build(move |update, _| { - let config = pool_config.clone(); + let exec = + Executor::builder(Nonblock(Tokio)) + .num_threads(None) + .build(move |update, _handle| { + let config = Arc::clone(&pool_config); + let connection = pool_connection.clone(); + let pipe = Arc::clone(&pool_pipe); - async move { - match update { - GrpcJob::FlushRedisPipe(pipe, connection) => { - let mut pipe = pipe.lock().await; - let mut connection = connection; + async move { + match update { + GrpcJob::FlushRedisPipe => { + let mut pipe = pipe.lock().await; + let mut connection = connection; - let flush = pipe.flush(&mut connection).await; + let flush = pipe.flush(&mut connection).await; - let status = flush.as_ref().map(|_| ()).map_err(|_| ()); - let counts = flush.as_ref().unwrap_or_else(|counts| counts); + let status = flush.as_ref().map(|_| ()).map_err(|_| ()); + let counts = flush.as_ref().unwrap_or_else(|counts| counts); + + for (stream, count) in counts.iter() { + redis_xadd_status_inc(stream, status, *count); + } - for (stream, count) in counts.iter() { - redis_xadd_status_inc(stream, status, count); + debug!(message = "Redis pipe flushed", ?status, ?counts); } + GrpcJob::ProcessSubscribeUpdate(update) => { + let accounts_stream = config.accounts.stream.clone(); + let accounts_stream_maxlen = config.accounts.stream_maxlen; + let accounts_stream_data_key = config.accounts.stream_data_key.clone(); + let transactions_stream = config.transactions.stream.clone(); + let transactions_stream_maxlen = config.transactions.stream_maxlen; + let transactions_stream_data_key = + config.transactions.stream_data_key.clone(); - debug!(message = "Redis pipe flushed", ?status, ?counts); - } - GrpcJob::ProcessSubscribeUpdate(pipe, update) => { - let accounts_stream = config.accounts.stream.clone(); - let accounts_stream_maxlen = config.accounts.stream_maxlen; - let accounts_stream_data_key = config.accounts.stream_data_key.clone(); - let transactions_stream = config.transactions.stream.clone(); - let transactions_stream_maxlen = config.transactions.stream_maxlen; - let transactions_stream_data_key = - config.transactions.stream_data_key.clone(); - - let SubscribeUpdate { update_oneof, .. } = update; - let mut pipe = pipe.lock().await; - - if let Some(update) = update_oneof { - match update { - UpdateOneof::Account(account) => { - pipe.xadd_maxlen( - &accounts_stream, - StreamMaxlen::Approx(accounts_stream_maxlen), - "*", - &[(&accounts_stream_data_key, account.encode_to_vec())], - ); - - debug!( - message = "Account update", - ?account, - ?accounts_stream, - ?accounts_stream_maxlen - ); - } - UpdateOneof::Transaction(transaction) => { - pipe.xadd_maxlen( - &transactions_stream, - StreamMaxlen::Approx(transactions_stream_maxlen), - "*", - &[( - &transactions_stream_data_key, - transaction.encode_to_vec(), - )], - ); - - debug!(message = "Transaction update", ?transaction,); + let SubscribeUpdate { update_oneof, .. } = *update; + + let mut pipe = pipe.lock().await; + + if let Some(update) = update_oneof { + match update { + UpdateOneof::Account(account) => { + pipe.xadd_maxlen( + &accounts_stream, + StreamMaxlen::Approx(accounts_stream_maxlen), + "*", + &[(&accounts_stream_data_key, account.encode_to_vec())], + ); + + debug!(message = "Account update", ?account,); + } + UpdateOneof::Transaction(transaction) => { + pipe.xadd_maxlen( + &transactions_stream, + StreamMaxlen::Approx(transactions_stream_maxlen), + "*", + &[( + &transactions_stream_data_key, + transaction.encode_to_vec(), + )], + ); + + debug!(message = "Transaction update", ?transaction); + } + var => warn!(message = "Unknown update variant", ?var), } - var => warn!(message = "Unknown update variant", ?var), + } + + if pipe.size() >= config.redis.pipeline_max_size { + // handle.push(GrpcJob::FlushRedisPipe); } } } } - } - })?; + })?; - let deadline_pipe = Arc::clone(&pipe); let deadline_config = Arc::clone(&config); - let deadline_connection = connection.clone(); loop { tokio::select! { _ = sleep(deadline_config.redis.pipeline_max_idle) => { - exec.push(GrpcJob::FlushRedisPipe(deadline_pipe.clone(), deadline_connection.clone())); + exec.push(GrpcJob::FlushRedisPipe); } Some(Ok(msg)) = stream.next() => { debug!(message = "Received gRPC message", ?msg); - exec.push(GrpcJob::ProcessSubscribeUpdate(Arc::clone(&pipe), msg)); + exec.push(GrpcJob::ProcessSubscribeUpdate(Box::new(msg))); } _ = shutdown.next() => { - exec.push(GrpcJob::FlushRedisPipe(Arc::clone(&pipe), connection.clone())); + exec.push(GrpcJob::FlushRedisPipe); break; } } @@ -176,3 +181,182 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { Ok(()) } + +pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { + let config = Arc::new(config); + let (tx, mut rx) = mpsc::channel::(config.geyser_update_message_buffer_size); // Adjust buffer size as needed + + // Connect to Redis + let client = redis::Client::open(config.redis.url.clone())?; + let connection = client.get_multiplexed_tokio_connection().await?; + + // Check stream length for the metrics + let jh_metrics_xlen = spawn({ + let connection = connection.clone(); + let streams = vec![ + config.accounts.stream.clone(), + config.transactions.stream.clone(), + ]; + async move { metrics_xlen(connection, &streams).await } + }); + tokio::pin!(jh_metrics_xlen); + + // Spawn gRPC client connections + let config = Arc::clone(&config); + let mut tx = tx.clone(); + + let mut client = GeyserGrpcClient::build_from_shared(config.geyser_endpoint.clone())? + .x_token(config.x_token.clone())? + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(10)) + .connect() + .await + .context("failed to connect to gRPC")?; + + let grc_config = Arc::clone(&config); + spawn(async move { + let mut accounts = HashMap::with_capacity(1); + let mut transactions = HashMap::with_capacity(1); + + accounts.insert( + "das".to_string(), + grc_config.accounts.filter.clone().to_proto(), + ); + transactions.insert( + "das".to_string(), + grc_config.transactions.filter.clone().to_proto(), + ); + + let request = SubscribeRequest { + accounts, + transactions, + ..Default::default() + }; + + let (_subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?; + + while let Some(Ok(msg)) = stream.next().await { + if let Some(update) = msg.update_oneof { + tx.send(update) + .await + .expect("Failed to send update to management thread"); + } + } + Ok::<(), anyhow::Error>(()) + }); + + // Management thread + let mut shutdown = create_shutdown()?; + let mut tasks = JoinSet::new(); + let mut pipe = redis::pipe(); + let mut pipe_accounts = 0; + let mut pipe_transactions = 0; + + let pipeline_max_idle = config.redis.pipeline_max_idle; + let deadline = sleep(pipeline_max_idle); + tokio::pin!(deadline); + + let result = loop { + tokio::select! { + result = &mut jh_metrics_xlen => match result { + Ok(Ok(_)) => unreachable!(), + Ok(Err(error)) => break Err(error), + Err(error) => break Err(error.into()), + }, + Some(signal) = shutdown.next() => { + warn!("{signal} received, waiting spawned tasks..."); + break Ok(()); + }, + Some(update) = rx.next() => { + match update { + UpdateOneof::Account(account) => { + + pipe.xadd_maxlen( + &config.accounts.stream, + StreamMaxlen::Approx(config.accounts.stream_maxlen), + "*", + &[(&config.accounts.stream_data_key, account.encode_to_vec())], + ); + + pipe_accounts += 1; + } + UpdateOneof::Transaction(transaction) => { + pipe.xadd_maxlen( + &config.transactions.stream, + StreamMaxlen::Approx(config.transactions.stream_maxlen), + "*", + &[(&config.transactions.stream_data_key, transaction.encode_to_vec())] + ); + + pipe_transactions += 1; + } + _ => continue, + } + if pipe_accounts + pipe_transactions >= config.redis.pipeline_max_size { + let mut pipe = std::mem::replace(&mut pipe, redis::pipe()); + let pipe_accounts = std::mem::replace(&mut pipe_accounts, 0); + let pipe_transactions = std::mem::replace(&mut pipe_transactions, 0); + deadline.as_mut().reset(Instant::now() + config.redis.pipeline_max_idle); + + tasks.spawn({ + let mut connection = connection.clone(); + let config = Arc::clone(&config); + async move { + let result: RedisResult = + pipe.atomic().query_async(&mut connection).await; + + let status = result.map(|_| ()).map_err(|_| ()); + redis_xadd_status_inc(&config.accounts.stream, status, pipe_accounts); + redis_xadd_status_inc(&config.transactions.stream, status, pipe_transactions); + + Ok::<(), anyhow::Error>(()) + } + }); + } + }, + _ = &mut deadline => { + if pipe_accounts + pipe_transactions > 0 { + let mut pipe = std::mem::replace(&mut pipe, redis::pipe()); + let pipe_accounts = std::mem::replace(&mut pipe_accounts, 0); + let pipe_transactions = std::mem::replace(&mut pipe_transactions, 0); + deadline.as_mut().reset(Instant::now() + config.redis.pipeline_max_idle); + + tasks.spawn({ + let mut connection = connection.clone(); + let config = Arc::clone(&config); + async move { + let result: RedisResult = + pipe.atomic().query_async(&mut connection).await; + + let status = result.map(|_| ()).map_err(|_| ()); + redis_xadd_status_inc(&config.accounts.stream, status, pipe_accounts); + redis_xadd_status_inc(&config.transactions.stream, status, pipe_transactions); + + Ok::<(), anyhow::Error>(()) + } + }); + } + }, + }; + + while tasks.len() >= config.redis.max_xadd_in_process { + if let Some(result) = tasks.join_next().await { + result??; + } + } + }; + + tokio::select! { + Some(signal) = shutdown.next() => { + anyhow::bail!("{signal} received, force shutdown..."); + } + result = async move { + while let Some(result) = tasks.join_next().await { + result??; + } + Ok::<(), anyhow::Error>(()) + } => result?, + }; + + result +} diff --git a/grpc-ingest/src/ingester.rs b/grpc-ingest/src/ingester.rs index d0fd4ab69..d79364ee3 100644 --- a/grpc-ingest/src/ingester.rs +++ b/grpc-ingest/src/ingester.rs @@ -7,7 +7,7 @@ use { download_metadata_inserted_total_inc, program_transformer_task_status_inc, program_transformer_tasks_total_set, ProgramTransformerTaskStatusKind, }, - redis::{metrics_xlen, ProgramTransformerInfo, RedisStream}, + redis::{metrics_xlen, ProgramTransformerInfo, RedisStream, RedisStreamMessageInfo}, util::create_shutdown, }, chrono::Utc, @@ -18,6 +18,7 @@ use { future::{pending, BoxFuture, FusedFuture, FutureExt}, stream::StreamExt, }, + opentelemetry_sdk::trace::Config, program_transformers::{error::ProgramTransformerError, ProgramTransformer}, sea_orm::{ entity::{ActiveModelTrait, ActiveValue}, @@ -36,9 +37,125 @@ use { task::JoinSet, time::{sleep, Duration}, }, - tracing::warn, + topograph::{ + executor::{Executor, Nonblock, Tokio}, + prelude::*, + }, + tracing::{error, warn}, }; +enum IngestJob { + SaveMessage(RedisStreamMessageInfo), +} + +pub async fn run_v2(config: ConfigIngester) -> anyhow::Result<()> { + let client = redis::Client::open(config.redis.url.clone())?; + let connection = client.get_multiplexed_tokio_connection().await?; + let pool = pg_create_pool(config.postgres).await?; + + let (mut redis_messages, redis_tasks_fut) = RedisStream::new(config.redis, connection).await?; + tokio::pin!(redis_tasks_fut); + + let pt_accounts = Arc::new(ProgramTransformer::new( + pool.clone(), + create_download_metadata_notifier(pool.clone(), config.download_metadata)?, + )); + let pt_transactions = Arc::new(ProgramTransformer::new( + pool.clone(), + create_download_metadata_notifier(pool.clone(), config.download_metadata)?, + )); + + let exec = + Executor::builder(Nonblock(Tokio)) + .num_threads(None) + .build(move |update, _handle| { + let pt_accounts = Arc::clone(&pt_accounts); + let pt_transactions = Arc::clone(&pt_transactions); + + async move { + match update { + IngestJob::SaveMessage(msg) => { + let result = match &msg.get_data() { + ProgramTransformerInfo::Account(account) => { + pt_accounts.handle_account_update(account).await + } + ProgramTransformerInfo::Transaction(transaction) => { + pt_transactions.handle_transaction(transaction).await + } + }; + match result { + Ok(()) => program_transformer_task_status_inc( + ProgramTransformerTaskStatusKind::Success, + ), + Err(ProgramTransformerError::NotImplemented) => { + program_transformer_task_status_inc( + ProgramTransformerTaskStatusKind::NotImplemented, + ); + error!("not implemented") + } + Err(ProgramTransformerError::DeserializationError(error)) => { + program_transformer_task_status_inc( + ProgramTransformerTaskStatusKind::DeserializationError, + ); + + error!("failed to deserialize {:?}", error) + } + Err(ProgramTransformerError::ParsingError(error)) => { + program_transformer_task_status_inc( + ProgramTransformerTaskStatusKind::ParsingError, + ); + + error!("failed to parse {:?}", error) + } + Err(ProgramTransformerError::DatabaseError(error)) => { + error!("database error for {:?}", error) + } + Err(ProgramTransformerError::AssetIndexError(error)) => { + error!("indexing error for {:?}", error) + } + Err(error) => { + error!("failed to handle {:?}", error) + } + } + + let _ = msg.ack(); + + () + } + } + } + })?; + + let mut shutdown = create_shutdown()?; + + loop { + tokio::select! { + Some(msg) = redis_messages.recv() => { + exec.push(IngestJob::SaveMessage(msg)); + } + Some(signal) = shutdown.next() => { + warn!("{signal} received, waiting spawned tasks..."); + break; + } + result = &mut redis_tasks_fut => { + if let Err(error) = result { + error!("Error in redis_tasks_fut: {:?}", error); + } + break; + } + } + } + + redis_messages.shutdown(); + + exec.join_async().await; + + pool.close().await; + + Ok::<(), anyhow::Error>(()) +} + +#[allow(dead_code)] pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { // connect to Redis let client = redis::Client::open(config.redis.url.clone())?; @@ -83,6 +200,7 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { tokio::spawn({ let pt_tasks_len = Arc::clone(&pt_tasks_len); + async move { loop { program_transformer_tasks_total_set(pt_tasks_len.load(Ordering::Relaxed)); diff --git a/grpc-ingest/src/main.rs b/grpc-ingest/src/main.rs index 9ca97d111..25beb0ecd 100644 --- a/grpc-ingest/src/main.rs +++ b/grpc-ingest/src/main.rs @@ -71,14 +71,14 @@ async fn main() -> anyhow::Result<()> { let config = config_load::(&args.config) .await .with_context(|| format!("failed to parse config from: {}", args.config))?; - grpc::run(config).await + grpc::run_v2(config).await } ArgsAction::Ingester => { let config = config_load::(&args.config) .await .with_context(|| format!("failed to parse config from: {}", args.config))?; config.check(); - ingester::run(config).await + ingester::run_v2(config).await } ArgsAction::DownloadMetadata => { let config = config_load::(&args.config) diff --git a/grpc-ingest/src/prom.rs b/grpc-ingest/src/prom.rs index bd7ee591f..e81de4255 100644 --- a/grpc-ingest/src/prom.rs +++ b/grpc-ingest/src/prom.rs @@ -127,10 +127,10 @@ pub fn redis_xlen_set(stream: &str, len: usize) { .set(len as i64); } -pub fn redis_xadd_status_inc(stream: &str, status: Result<(), ()>, delta: &usize) { +pub fn redis_xadd_status_inc(stream: &str, status: Result<(), ()>, delta: usize) { REDIS_XADD_STATUS .with_label_values(&[stream, if status.is_ok() { "success" } else { "failed" }]) - .inc_by(*delta as u64); + .inc_by(delta as u64); } pub fn redis_xack_inc(stream: &str, delta: usize) { diff --git a/grpc-ingest/src/redis.rs b/grpc-ingest/src/redis.rs index 8cb599264..0b6dc7c18 100644 --- a/grpc-ingest/src/redis.rs +++ b/grpc-ingest/src/redis.rs @@ -499,4 +499,8 @@ impl TrackedPipeline { Err(_) => Err(counts), } } + + pub fn size(&self) -> usize { + self.counts.values().sum() + } }