Skip to content

Commit

Permalink
Add snapshot config in ingester
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Oct 7, 2024
1 parent a5f5c58 commit 87d6cbf
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 69 deletions.
100 changes: 42 additions & 58 deletions backfill/src/worker/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,71 +60,55 @@ impl TreeWorkerArgs {
let (signature_worker, signature_sender) =
signature_worker_args.start(signature_context, transaction_info_sender)?;

let tree_gap_siganture_sender = signature_sender.clone();
let (gap_worker, tree_gap_sender) =
gap_worker_args.start(context, tree_gap_siganture_sender)?;

let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(db_pool);

let mut gaps = TreeGapModel::find(&conn, tree.pubkey)
.await?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?;

let upper_known_seq = cl_audits_v2::Entity::find()
.filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec()))
.order_by_desc(cl_audits_v2::Column::Seq)
.one(&conn)
.await?
.filter(|_| !force);

let lower_known_seq = cl_audits_v2::Entity::find()
.filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec()))
.order_by_asc(cl_audits_v2::Column::Seq)
.one(&conn)
.await?
.filter(|_| !force);

if let Some(upper_seq) = upper_known_seq {
let signature = Signature::try_from(upper_seq.tx.as_ref())?;
gaps.push(TreeGapFill::new(tree.pubkey, None, Some(signature)));
log::info!("Added gap with upper known sequence: {:?}", upper_seq);
// Reprocess the entire tree if force is true or if the tree has a seq of 0 to keep the current behavior
} else if force || tree.seq > 0 {
gaps.push(TreeGapFill::new(tree.pubkey, None, None));
log::info!(
"Added gap for entire tree reprocessing. Force: {}, Tree Seq: {}",
force,
tree.seq
);
}
let (gap_worker, tree_gap_sender) = gap_worker_args.start(context, signature_sender)?;

if let Some(lower_seq) = lower_known_seq.filter(|seq| seq.seq > 1) {
let signature = Signature::try_from(lower_seq.tx.as_ref())?;
gaps.push(TreeGapFill::new(tree.pubkey, Some(signature), None));
log::info!("Added gap with lower known sequence: {:?}", lower_seq);
}
{
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(db_pool);

let mut gaps = TreeGapModel::find(&conn, tree.pubkey)
.await?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?;

for gap in gaps {
if let Err(e) = tree_gap_sender.send(gap).await {
error!("send gap: {:?}", e);
let upper_known_seq = if force {
None
} else {
cl_audits_v2::Entity::find()
.filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec()))
.order_by_desc(cl_audits_v2::Column::Seq)
.one(&conn)
.await?
};

let lower_known_seq = if force {
None
} else {
cl_audits_v2::Entity::find()
.filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec()))
.order_by_asc(cl_audits_v2::Column::Seq)
.one(&conn)
.await?
};

if let Some(upper_seq) = upper_known_seq {
let signature = Signature::try_from(upper_seq.tx.as_ref())?;
gaps.push(TreeGapFill::new(tree.pubkey, None, Some(signature)));
// Reprocess the entire tree if force is true or if the tree has a seq of 0 to keep the current behavior
} else if force || tree.seq > 0 {
gaps.push(TreeGapFill::new(tree.pubkey, None, None));
}
}

// New block to handle reindexing based on cl_audit_v2 records ordered by seq desc
if let Some(lower_seq) = lower_known_seq.filter(|seq| seq.seq > 1) {
let signature = Signature::try_from(lower_seq.tx.as_ref())?;

let audit_records = cl_audits_v2::Entity::find()
.filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec()))
.order_by_desc(cl_audits_v2::Column::Seq)
.all(&conn)
.await?;
gaps.push(TreeGapFill::new(tree.pubkey, Some(signature), None));
}

for record in audit_records {
let signature = Signature::try_from(record.tx.as_ref())?;
if let Err(e) = signature_sender.send(signature).await {
error!("send signature: {:?}", e);
for gap in gaps {
if let Err(e) = tree_gap_sender.send(gap).await {
error!("send gap: {:?}", e);
}
}
}

Expand Down
11 changes: 1 addition & 10 deletions grpc-ingest/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use {
},
};

pub const REDIS_STREAM_ACCOUNTS: &str = "ACCOUNTS";
pub const REDIS_STREAM_DATA_KEY: &str = "data";

pub async fn load<T>(path: impl AsRef<Path> + Copy) -> anyhow::Result<T>
Expand Down Expand Up @@ -139,7 +138,6 @@ impl ConfigGrpc {

#[derive(Debug, Clone, Deserialize)]
pub struct ConfigGrpcAccounts {
#[serde(default = "ConfigGrpcAccounts::default_stream")]
pub stream: String,
#[serde(
default = "ConfigGrpcAccounts::default_stream_maxlen",
Expand All @@ -153,10 +151,6 @@ pub struct ConfigGrpcAccounts {
}

impl ConfigGrpcAccounts {
pub fn default_stream() -> String {
REDIS_STREAM_ACCOUNTS.to_owned()
}

pub const fn default_stream_maxlen() -> usize {
100_000_000
}
Expand Down Expand Up @@ -232,6 +226,7 @@ pub struct ConfigIngester {
pub redis: String,
pub postgres: ConfigIngesterPostgres,
pub download_metadata: ConfigIngesterDownloadMetadata,
pub snapshots: ConfigIngestStream,
pub accounts: ConfigIngestStream,
pub transactions: ConfigIngestStream,
}
Expand Down Expand Up @@ -320,10 +315,6 @@ impl ConfigIngesterDownloadMetadata {
10
}

pub fn default_stream() -> String {
REDIS_STREAM_METADATA_JSON.to_owned()
}

pub const fn default_stream_maxlen() -> usize {
10_000_000
}
Expand Down
2 changes: 1 addition & 1 deletion grpc-ingest/src/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
})
.start()?;
let snapshot_stream = IngestStream::build()
.config(config.accounts.clone())
.config(config.snapshots.clone())
.connection(connection.clone())
.handler(move |info| {
let pt_snapshots = Arc::clone(&pt_snapshots);
Expand Down

0 comments on commit 87d6cbf

Please sign in to comment.