diff --git a/program_transformers/src/asset_upserts.rs b/program_transformers/src/asset_upserts.rs index ee0b62713..41a9d022c 100644 --- a/program_transformers/src/asset_upserts.rs +++ b/program_transformers/src/asset_upserts.rs @@ -6,8 +6,8 @@ use { }, }, sea_orm::{ - sea_query::OnConflict, ConnectionTrait, DbBackend, DbErr, EntityTrait, QueryTrait, Set, - TransactionTrait, + sea_query::{Alias, Expr, OnConflict}, + Condition, ConnectionTrait, DbErr, EntityTrait, Set, TransactionTrait, }, serde_json::value::Value, sqlx::types::Decimal, @@ -33,7 +33,7 @@ pub async fn upsert_assets_token_account_columns= asset.slot_updated_token_account OR asset.slot_updated_token_account IS NULL) AND asset.owner_type = 'single'", - query.sql); - txn_or_conn.execute(query).await?; Ok(()) } @@ -78,7 +107,8 @@ pub async fn upsert_assets_mint_account_columns= asset.slot_updated_mint_account OR asset.slot_updated_mint_account IS NULL", - query.sql); - txn_or_conn.execute(query).await?; + .exec_without_returning(txn_or_conn) + .await?; Ok(()) } @@ -144,26 +192,15 @@ pub async fn upsert_assets_metadata_account_columns= asset.slot_updated_metadata_account OR asset.slot_updated_metadata_account IS NULL", - query.sql); - txn_or_conn.execute(query).await?; Ok(()) } diff --git a/program_transformers/src/token/mod.rs b/program_transformers/src/token/mod.rs index 64a83d300..a8d6a2780 100644 --- a/program_transformers/src/token/mod.rs +++ b/program_transformers/src/token/mod.rs @@ -10,8 +10,9 @@ use { blockbuster::programs::token_account::TokenProgramAccount, digital_asset_types::dao::{token_accounts, tokens}, sea_orm::{ - entity::ActiveValue, query::QueryTrait, sea_query::query::OnConflict, ConnectionTrait, - DatabaseConnection, DbBackend, EntityTrait, TransactionTrait, + entity::ActiveValue, + sea_query::{query::OnConflict, Alias, Expr}, + Condition, DatabaseConnection, EntityTrait, TransactionTrait, }, solana_sdk::program_option::COption, spl_token::state::AccountState, @@ -49,7 +50,7 @@ pub async fn handle_token_program_account<'a, 'b>( let txn = db.begin().await?; - let mut query = token_accounts::Entity::insert(model) + token_accounts::Entity::insert(model) .on_conflict( OnConflict::columns([token_accounts::Column::Pubkey]) .update_columns([ @@ -60,17 +61,109 @@ pub async fn handle_token_program_account<'a, 'b>( token_accounts::Column::Frozen, token_accounts::Column::TokenProgram, token_accounts::Column::Owner, - token_accounts::Column::CloseAuthority, token_accounts::Column::SlotUpdated, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::Mint, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::Mint, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::DelegatedAmount, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::DelegatedAmount, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::Delegate, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::Delegate, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::Amount, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::Amount, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::Frozen, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::Frozen, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::TokenProgram, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::TokenProgram, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + token_accounts::Column::Owner, + ) + .ne( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::Owner, + ), + ), + ), + ) + .add( + Expr::tbl( + token_accounts::Entity, + token_accounts::Column::SlotUpdated, + ) + .lte(account_info.slot as i64), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > token_accounts.slot_updated", - query.sql - ); - txn.execute(query).await?; + .exec_without_returning(&txn) + .await?; if ta.amount == 1 { upsert_assets_token_account_columns( @@ -112,29 +205,83 @@ pub async fn handle_token_program_account<'a, 'b>( let txn = db.begin().await?; - let mut query = tokens::Entity::insert(model) + tokens::Entity::insert(model) .on_conflict( OnConflict::columns([tokens::Column::Mint]) .update_columns([ tokens::Column::Supply, tokens::Column::TokenProgram, tokens::Column::MintAuthority, - tokens::Column::CloseAuthority, - tokens::Column::ExtensionData, tokens::Column::SlotUpdated, tokens::Column::Decimals, tokens::Column::FreezeAuthority, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + tokens::Column::Supply, + ) + .ne(Expr::tbl(tokens::Entity, tokens::Column::Supply)), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + tokens::Column::TokenProgram, + ) + .ne( + Expr::tbl( + tokens::Entity, + tokens::Column::TokenProgram, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + tokens::Column::MintAuthority, + ) + .ne( + Expr::tbl( + tokens::Entity, + tokens::Column::MintAuthority, + ), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + tokens::Column::Decimals, + ) + .ne( + Expr::tbl(tokens::Entity, tokens::Column::Decimals), + ), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + tokens::Column::FreezeAuthority, + ) + .ne( + Expr::tbl( + tokens::Entity, + tokens::Column::FreezeAuthority, + ), + ), + ), + ) + .add( + Expr::tbl(tokens::Entity, tokens::Column::SlotUpdated) + .lte(account_info.slot as i64), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - - query.sql = format!( - "{} WHERE excluded.slot_updated >= tokens.slot_updated", - query.sql - ); - - txn.execute(query).await?; + .exec_without_returning(&txn) + .await?; upsert_assets_mint_account_columns( AssetMintAccountColumns { diff --git a/program_transformers/src/token_metadata/master_edition.rs b/program_transformers/src/token_metadata/master_edition.rs index a95304e5f..a617e7c2f 100644 --- a/program_transformers/src/token_metadata/master_edition.rs +++ b/program_transformers/src/token_metadata/master_edition.rs @@ -9,9 +9,8 @@ use { }, sea_orm::{ entity::{ActiveValue, EntityTrait}, - query::QueryTrait, - sea_query::query::OnConflict, - ConnectionTrait, DatabaseTransaction, DbBackend, + sea_query::{query::OnConflict, Alias, Condition, Expr}, + DatabaseTransaction, }, solana_sdk::pubkey::Pubkey, }; @@ -74,7 +73,7 @@ pub async fn save_master_edition( ..Default::default() }; - let query = asset_v1_account_attachments::Entity::insert(model) + asset_v1_account_attachments::Entity::insert(model) .on_conflict( OnConflict::columns([asset_v1_account_attachments::Column::Id]) .update_columns([ @@ -82,9 +81,40 @@ pub async fn save_master_edition( asset_v1_account_attachments::Column::Data, asset_v1_account_attachments::Column::SlotUpdated, ]) + .action_cond_where( + Condition::all() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_v1_account_attachments::Column::AttachmentType, + ) + .ne(Expr::tbl( + asset_v1_account_attachments::Entity, + asset_v1_account_attachments::Column::AttachmentType, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_v1_account_attachments::Column::Data, + ) + .ne(Expr::tbl( + asset_v1_account_attachments::Entity, + asset_v1_account_attachments::Column::Data, + )), + ) + .add( + Expr::tbl( + asset_v1_account_attachments::Entity, + asset_v1_account_attachments::Column::SlotUpdated, + ) + .lte(slot as i64), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - txn.execute(query).await?; + .exec_without_returning(txn) + .await + .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; Ok(()) } diff --git a/program_transformers/src/token_metadata/v1_asset.rs b/program_transformers/src/token_metadata/v1_asset.rs index 13b6e3c32..c74184e06 100644 --- a/program_transformers/src/token_metadata/v1_asset.rs +++ b/program_transformers/src/token_metadata/v1_asset.rs @@ -21,9 +21,9 @@ use { }, sea_orm::{ entity::{ActiveValue, EntityTrait}, - query::{JsonValue, QueryTrait}, - sea_query::query::OnConflict, - ConnectionTrait, DbBackend, Statement, TransactionTrait, + query::JsonValue, + sea_query::{query::OnConflict, Alias, Expr}, + Condition, ConnectionTrait, Statement, TransactionTrait, }, solana_sdk::pubkey, tracing::warn, @@ -41,18 +41,24 @@ pub async fn burn_v1_asset( burnt: ActiveValue::Set(true), ..Default::default() }; - let mut query = asset::Entity::insert(model) + + asset::Entity::insert(model) .on_conflict( OnConflict::columns([asset::Column::Id]) .update_columns([asset::Column::SlotUpdated, asset::Column::Burnt]) + .action_cond_where( + Condition::all() + .add( + Expr::tbl(Alias::new("excluded"), asset::Column::Burnt) + .ne(Expr::tbl(asset::Entity, asset::Column::Burnt)), + ) + .add(Expr::tbl(asset::Entity, asset::Column::SlotUpdated).lte(slot_i)), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset.slot_updated", - query.sql - ); - conn.execute(query).await?; + .exec_without_returning(conn) + .await?; + Ok(()) } @@ -134,7 +140,7 @@ pub async fn save_v1_asset( txn.execute(set_lock_timeout_stmt).await?; txn.execute(set_local_app_name_stmt).await?; - let mut query = asset_data::Entity::insert(asset_data_model) + asset_data::Entity::insert(asset_data_model) .on_conflict( OnConflict::columns([asset_data::Column::Id]) .update_columns([ @@ -148,14 +154,93 @@ pub async fn save_v1_asset( asset_data::Column::RawSymbol, asset_data::Column::BaseInfoSeq, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::ChainDataMutability, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::ChainDataMutability, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::ChainData, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::ChainData, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::MetadataUrl, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::MetadataUrl, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::MetadataMutability, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::MetadataMutability, + )), + ) + .add( + Expr::tbl(Alias::new("excluded"), asset_data::Column::Reindex) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::Reindex, + )), + ) + .add( + Expr::tbl(Alias::new("excluded"), asset_data::Column::RawName) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::RawName, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::RawSymbol, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::RawSymbol, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_data::Column::BaseInfoSeq, + ) + .ne(Expr::tbl( + asset_data::Entity, + asset_data::Column::BaseInfoSeq, + )), + ), + ) + .add( + Expr::tbl(asset_data::Entity, asset_data::Column::SlotUpdated) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_data.slot_updated", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -184,14 +269,14 @@ pub async fn save_v1_asset( attachment_type: ActiveValue::Set(V1AccountAttachments::MasterEditionV2), ..Default::default() }; - let query = asset_v1_account_attachments::Entity::insert(attachment) + + asset_v1_account_attachments::Entity::insert(attachment) .on_conflict( OnConflict::columns([asset_v1_account_attachments::Column::Id]) .do_nothing() .to_owned(), ) - .build(DbBackend::Postgres); - txn.execute(query) + .exec(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -202,21 +287,34 @@ pub async fn save_v1_asset( slot_updated: ActiveValue::Set(slot_i), ..Default::default() }; - let mut query = asset_authority::Entity::insert(model) + + asset_authority::Entity::insert(model) .on_conflict( - OnConflict::columns([asset_authority::Column::AssetId]) + OnConflict::column(asset_authority::Column::AssetId) .update_columns([ asset_authority::Column::Authority, asset_authority::Column::SlotUpdated, ]) + .action_cond_where( + Condition::all() + .add( + Expr::tbl(Alias::new("excluded"), asset_authority::Column::Authority) + .ne(Expr::tbl( + asset_authority::Entity, + asset_authority::Column::Authority, + )), + ) + .add( + Expr::tbl( + asset_authority::Entity, + asset_authority::Column::SlotUpdated, + ) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_authority.slot_updated AND excluded.authority != asset_authority.authority", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -230,7 +328,8 @@ pub async fn save_v1_asset( slot_updated: ActiveValue::Set(Some(slot_i)), ..Default::default() }; - let mut query = asset_grouping::Entity::insert(model) + + asset_grouping::Entity::insert(model) .on_conflict( OnConflict::columns([ asset_grouping::Column::AssetId, @@ -240,16 +339,39 @@ pub async fn save_v1_asset( asset_grouping::Column::GroupValue, asset_grouping::Column::Verified, asset_grouping::Column::SlotUpdated, - asset_grouping::Column::GroupInfoSeq, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any().add( + Expr::tbl( + Alias::new("excluded"), + asset_grouping::Column::GroupValue, + ) + .ne(Expr::tbl( + asset_grouping::Entity, + asset_grouping::Column::GroupValue, + )) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_grouping::Column::Verified, + ) + .ne(Expr::tbl( + asset_grouping::Entity, + asset_grouping::Column::Verified, + )), + ), + ), + ) + .add( + Expr::tbl(asset_grouping::Entity, asset_grouping::Column::SlotUpdated) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_grouping.slot_updated", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; } @@ -272,7 +394,7 @@ pub async fn save_v1_asset( .collect::>(); if !creators.is_empty() { - let mut query = asset_creators::Entity::insert_many(creators) + asset_creators::Entity::insert_many(creators) .on_conflict( OnConflict::columns([ asset_creators::Column::AssetId, @@ -282,17 +404,51 @@ pub async fn save_v1_asset( asset_creators::Column::Creator, asset_creators::Column::Share, asset_creators::Column::Verified, - asset_creators::Column::Seq, asset_creators::Column::SlotUpdated, ]) + .action_cond_where( + Condition::all() + .add( + Condition::any() + .add( + Expr::tbl( + Alias::new("excluded"), + asset_creators::Column::Creator, + ) + .ne(Expr::tbl( + asset_creators::Entity, + asset_creators::Column::Creator, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_creators::Column::Share, + ) + .ne(Expr::tbl( + asset_creators::Entity, + asset_creators::Column::Share, + )), + ) + .add( + Expr::tbl( + Alias::new("excluded"), + asset_creators::Column::Verified, + ) + .ne(Expr::tbl( + asset_creators::Entity, + asset_creators::Column::Verified, + )), + ), + ) + .add( + Expr::tbl(asset_creators::Entity, asset_creators::Column::SlotUpdated) + .lte(slot_i), + ), + ) .to_owned(), ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated >= asset_creators.slot_updated OR asset_creators.slot_updated is NULL", - query.sql - ); - txn.execute(query) + .exec_without_returning(&txn) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; }