Skip to content

Commit

Permalink
refactor: drop exculsion clauses on cl_items to allow for reindexing (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola authored Sep 16, 2024
1 parent 4c3f649 commit 4e3c076
Showing 1 changed file with 8 additions and 43 deletions.
51 changes: 8 additions & 43 deletions program_transformers/src/bubblegum/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,22 @@ use {
crate::error::{ProgramTransformerError, ProgramTransformerResult},
das_core::DownloadMetadataInfo,
digital_asset_types::dao::{
asset, asset_authority, asset_creators, asset_data, asset_grouping, backfill_items,
cl_audits_v2, cl_items,
asset, asset_authority, asset_creators, asset_data, asset_grouping, cl_audits_v2, cl_items,
sea_orm_active_enums::{
ChainMutability, Instruction, Mutability, OwnerType, RoyaltyTargetType,
SpecificationAssetClass, SpecificationVersions,
},
},
mpl_bubblegum::types::{Collection, Creator},
sea_orm::{
entity::{ActiveValue, ColumnTrait, EntityTrait},
entity::{ActiveValue, EntityTrait},
prelude::*,
query::{JsonValue, QueryFilter, QuerySelect, QueryTrait},
query::{JsonValue, QueryTrait},
sea_query::query::OnConflict,
ConnectionTrait, DbBackend, TransactionTrait,
},
spl_account_compression::events::ChangeLogEventV1,
tracing::{debug, error, info},
tracing::{debug, error},
};

pub async fn save_changelog_event<'c, T>(
Expand All @@ -41,18 +40,17 @@ const fn node_idx_to_leaf_idx(index: i64, tree_height: u32) -> i64 {

pub async fn insert_change_log<'c, T>(
change_log_event: &ChangeLogEventV1,
slot: u64,
_slot: u64,
txn_id: &str,
txn: &T,
instruction: &str,
) -> ProgramTransformerResult<()>
where
T: ConnectionTrait + TransactionTrait,
{
let mut i: i64 = 0;
let depth = change_log_event.path.len() - 1;
let tree_id = change_log_event.id.as_ref();
for p in change_log_event.path.iter() {
for (i, p) in (0_i64..).zip(change_log_event.path.iter()) {
let node_idx = p.index as i64;
debug!(
"seq {}, index {} level {}, node {:?}, txn: {:?}, instruction {}",
Expand All @@ -79,8 +77,7 @@ where
..Default::default()
};

i += 1;
let mut query = cl_items::Entity::insert(item)
let query = cl_items::Entity::insert(item)
.on_conflict(
OnConflict::columns([cl_items::Column::Tree, cl_items::Column::NodeIdx])
.update_columns([
Expand All @@ -92,7 +89,6 @@ where
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!("{} WHERE excluded.seq > cl_items.seq", query.sql);
txn.execute(query)
.await
.map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?;
Expand Down Expand Up @@ -121,7 +117,7 @@ where
cl_audits_v2::Column::LeafIdx,
cl_audits_v2::Column::Seq,
])
.do_nothing()
.update_columns([cl_audits_v2::Column::Tx, cl_audits_v2::Column::Instruction])
.to_owned(),
)
.build(DbBackend::Postgres);
Expand All @@ -132,37 +128,6 @@ where
}
}

// If and only if the entire path of nodes was inserted into the `cl_items` table, then insert
// a single row into the `backfill_items` table. This way if an incomplete path was inserted
// into `cl_items` due to an error, a gap will be created for the tree and the backfiller will
// fix it.
if i - 1 == depth as i64 {
// See if the tree already exists in the `backfill_items` table.
let rows = backfill_items::Entity::find()
.filter(backfill_items::Column::Tree.eq(tree_id))
.limit(1)
.all(txn)
.await?;

// If the tree does not exist in `backfill_items` and the sequence number is greater than 1,
// then we know we will need to backfill the tree from sequence number 1 up to the current
// sequence number. So in this case we set at flag to force checking the tree.
let force_chk = rows.is_empty() && change_log_event.seq > 1;

info!("Adding to backfill_items table at level {}", i - 1);
let item = backfill_items::ActiveModel {
tree: ActiveValue::Set(tree_id.to_vec()),
seq: ActiveValue::Set(change_log_event.seq as i64),
slot: ActiveValue::Set(slot as i64),
force_chk: ActiveValue::Set(force_chk),
backfilled: ActiveValue::Set(false),
failed: ActiveValue::Set(false),
..Default::default()
};

backfill_items::Entity::insert(item).exec(txn).await?;
}

Ok(())
}

Expand Down

0 comments on commit 4e3c076

Please sign in to comment.