diff --git a/backfill/src/worker/tree.rs b/backfill/src/worker/tree.rs index 43bc7f54a..e6db2b8d9 100644 --- a/backfill/src/worker/tree.rs +++ b/backfill/src/worker/tree.rs @@ -60,71 +60,55 @@ impl TreeWorkerArgs { let (signature_worker, signature_sender) = signature_worker_args.start(signature_context, transaction_info_sender)?; - let tree_gap_siganture_sender = signature_sender.clone(); - let (gap_worker, tree_gap_sender) = - gap_worker_args.start(context, tree_gap_siganture_sender)?; - - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(db_pool); - - let mut gaps = TreeGapModel::find(&conn, tree.pubkey) - .await? - .into_iter() - .map(TryInto::try_into) - .collect::, _>>()?; - - let upper_known_seq = cl_audits_v2::Entity::find() - .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) - .order_by_desc(cl_audits_v2::Column::Seq) - .one(&conn) - .await? - .filter(|_| !force); - - let lower_known_seq = cl_audits_v2::Entity::find() - .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) - .order_by_asc(cl_audits_v2::Column::Seq) - .one(&conn) - .await? - .filter(|_| !force); - - if let Some(upper_seq) = upper_known_seq { - let signature = Signature::try_from(upper_seq.tx.as_ref())?; - gaps.push(TreeGapFill::new(tree.pubkey, None, Some(signature))); - log::info!("Added gap with upper known sequence: {:?}", upper_seq); - // Reprocess the entire tree if force is true or if the tree has a seq of 0 to keep the current behavior - } else if force || tree.seq > 0 { - gaps.push(TreeGapFill::new(tree.pubkey, None, None)); - log::info!( - "Added gap for entire tree reprocessing. Force: {}, Tree Seq: {}", - force, - tree.seq - ); - } + let (gap_worker, tree_gap_sender) = gap_worker_args.start(context, signature_sender)?; - if let Some(lower_seq) = lower_known_seq.filter(|seq| seq.seq > 1) { - let signature = Signature::try_from(lower_seq.tx.as_ref())?; - gaps.push(TreeGapFill::new(tree.pubkey, Some(signature), None)); - log::info!("Added gap with lower known sequence: {:?}", lower_seq); - } + { + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(db_pool); + + let mut gaps = TreeGapModel::find(&conn, tree.pubkey) + .await? + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?; - for gap in gaps { - if let Err(e) = tree_gap_sender.send(gap).await { - error!("send gap: {:?}", e); + let upper_known_seq = if force { + None } else { + cl_audits_v2::Entity::find() + .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) + .order_by_desc(cl_audits_v2::Column::Seq) + .one(&conn) + .await? + }; + + let lower_known_seq = if force { + None + } else { + cl_audits_v2::Entity::find() + .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) + .order_by_asc(cl_audits_v2::Column::Seq) + .one(&conn) + .await? + }; + + if let Some(upper_seq) = upper_known_seq { + let signature = Signature::try_from(upper_seq.tx.as_ref())?; + gaps.push(TreeGapFill::new(tree.pubkey, None, Some(signature))); + // Reprocess the entire tree if force is true or if the tree has a seq of 0 to keep the current behavior + } else if force || tree.seq > 0 { + gaps.push(TreeGapFill::new(tree.pubkey, None, None)); } - } - // New block to handle reindexing based on cl_audit_v2 records ordered by seq desc + if let Some(lower_seq) = lower_known_seq.filter(|seq| seq.seq > 1) { + let signature = Signature::try_from(lower_seq.tx.as_ref())?; - let audit_records = cl_audits_v2::Entity::find() - .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) - .order_by_desc(cl_audits_v2::Column::Seq) - .all(&conn) - .await?; + gaps.push(TreeGapFill::new(tree.pubkey, Some(signature), None)); + } - for record in audit_records { - let signature = Signature::try_from(record.tx.as_ref())?; - if let Err(e) = signature_sender.send(signature).await { - error!("send signature: {:?}", e); + for gap in gaps { + if let Err(e) = tree_gap_sender.send(gap).await { + error!("send gap: {:?}", e); + } } } diff --git a/grpc-ingest/src/config.rs b/grpc-ingest/src/config.rs index 365552160..5d998bb4f 100644 --- a/grpc-ingest/src/config.rs +++ b/grpc-ingest/src/config.rs @@ -9,7 +9,6 @@ use { }, }; -pub const REDIS_STREAM_ACCOUNTS: &str = "ACCOUNTS"; pub const REDIS_STREAM_DATA_KEY: &str = "data"; pub async fn load(path: impl AsRef + Copy) -> anyhow::Result @@ -139,7 +138,6 @@ impl ConfigGrpc { #[derive(Debug, Clone, Deserialize)] pub struct ConfigGrpcAccounts { - #[serde(default = "ConfigGrpcAccounts::default_stream")] pub stream: String, #[serde( default = "ConfigGrpcAccounts::default_stream_maxlen", @@ -153,10 +151,6 @@ pub struct ConfigGrpcAccounts { } impl ConfigGrpcAccounts { - pub fn default_stream() -> String { - REDIS_STREAM_ACCOUNTS.to_owned() - } - pub const fn default_stream_maxlen() -> usize { 100_000_000 } @@ -232,6 +226,7 @@ pub struct ConfigIngester { pub redis: String, pub postgres: ConfigIngesterPostgres, pub download_metadata: ConfigIngesterDownloadMetadata, + pub snapshots: ConfigIngestStream, pub accounts: ConfigIngestStream, pub transactions: ConfigIngestStream, } @@ -320,10 +315,6 @@ impl ConfigIngesterDownloadMetadata { 10 } - pub fn default_stream() -> String { - REDIS_STREAM_METADATA_JSON.to_owned() - } - pub const fn default_stream_maxlen() -> usize { 10_000_000 } diff --git a/grpc-ingest/src/ingester.rs b/grpc-ingest/src/ingester.rs index fd40bab5b..000f747b4 100644 --- a/grpc-ingest/src/ingester.rs +++ b/grpc-ingest/src/ingester.rs @@ -142,7 +142,7 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { }) .start()?; let snapshot_stream = IngestStream::build() - .config(config.accounts.clone()) + .config(config.snapshots.clone()) .connection(connection.clone()) .handler(move |info| { let pt_snapshots = Arc::clone(&pt_snapshots);