diff --git a/program_transformers/src/asset_upserts.rs b/program_transformers/src/asset_upserts.rs index 68f8fc238..ee0b62713 100644 --- a/program_transformers/src/asset_upserts.rs +++ b/program_transformers/src/asset_upserts.rs @@ -6,8 +6,8 @@ use { }, }, sea_orm::{ - sea_query::{Alias, Expr, OnConflict}, - Condition, ConnectionTrait, DbErr, EntityTrait, Set, TransactionTrait, + sea_query::OnConflict, ConnectionTrait, DbBackend, DbErr, EntityTrait, QueryTrait, Set, + TransactionTrait, }, serde_json::value::Value, sqlx::types::Decimal, @@ -33,7 +33,7 @@ pub async fn upsert_assets_token_account_columns= asset.slot_updated_token_account OR asset.slot_updated_token_account IS NULL) AND asset.owner_type = 'single'", + query.sql); + txn_or_conn.execute(query).await?; Ok(()) } @@ -107,8 +78,7 @@ pub async fn upsert_assets_mint_account_columns= asset.slot_updated_mint_account OR asset.slot_updated_mint_account IS NULL", + query.sql); + txn_or_conn.execute(query).await?; Ok(()) } @@ -192,15 +144,26 @@ pub async fn upsert_assets_metadata_account_columns= asset.slot_updated_metadata_account OR asset.slot_updated_metadata_account IS NULL", + query.sql); + txn_or_conn.execute(query).await?; Ok(()) } diff --git a/program_transformers/src/token/mod.rs b/program_transformers/src/token/mod.rs index a8d6a2780..64a83d300 100644 --- a/program_transformers/src/token/mod.rs +++ b/program_transformers/src/token/mod.rs @@ -10,9 +10,8 @@ use { blockbuster::programs::token_account::TokenProgramAccount, digital_asset_types::dao::{token_accounts, tokens}, sea_orm::{ - entity::ActiveValue, - sea_query::{query::OnConflict, Alias, Expr}, - Condition, DatabaseConnection, EntityTrait, TransactionTrait, + entity::ActiveValue, query::QueryTrait, sea_query::query::OnConflict, ConnectionTrait, + DatabaseConnection, DbBackend, EntityTrait, TransactionTrait, }, solana_sdk::program_option::COption, spl_token::state::AccountState, @@ -50,7 +49,7 @@ pub async fn handle_token_program_account<'a, 'b>( let txn = db.begin().await?; - token_accounts::Entity::insert(model) + let mut query = token_accounts::Entity::insert(model) .on_conflict( OnConflict::columns([token_accounts::Column::Pubkey]) .update_columns([ @@ -61,109 +60,17 @@ pub async fn handle_token_program_account<'a, 'b>( token_accounts::Column::Frozen, token_accounts::Column::TokenProgram, token_accounts::Column::Owner, + token_accounts::Column::CloseAuthority, token_accounts::Column::SlotUpdated, ]) - .action_cond_where( - Condition::all() - .add( - Condition::any() - .add( - Expr::tbl( - Alias::new("excluded"), - token_accounts::Column::Mint, - ) - .ne( - Expr::tbl( - token_accounts::Entity, - token_accounts::Column::Mint, - ), - ), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - token_accounts::Column::DelegatedAmount, - ) - .ne( - Expr::tbl( - token_accounts::Entity, - token_accounts::Column::DelegatedAmount, - ), - ), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - token_accounts::Column::Delegate, - ) - .ne( - Expr::tbl( - token_accounts::Entity, - token_accounts::Column::Delegate, - ), - ), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - token_accounts::Column::Amount, - ) - .ne( - Expr::tbl( - token_accounts::Entity, - token_accounts::Column::Amount, - ), - ), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - token_accounts::Column::Frozen, - ) - .ne( - Expr::tbl( - token_accounts::Entity, - token_accounts::Column::Frozen, - ), - ), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - token_accounts::Column::TokenProgram, - ) - .ne( - Expr::tbl( - token_accounts::Entity, - token_accounts::Column::TokenProgram, - ), - ), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - token_accounts::Column::Owner, - ) - .ne( - Expr::tbl( - token_accounts::Entity, - token_accounts::Column::Owner, - ), - ), - ), - ) - .add( - Expr::tbl( - token_accounts::Entity, - token_accounts::Column::SlotUpdated, - ) - .lte(account_info.slot as i64), - ), - ) .to_owned(), ) - .exec_without_returning(&txn) - .await?; + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated > token_accounts.slot_updated", + query.sql + ); + txn.execute(query).await?; if ta.amount == 1 { upsert_assets_token_account_columns( @@ -205,83 +112,29 @@ pub async fn handle_token_program_account<'a, 'b>( let txn = db.begin().await?; - tokens::Entity::insert(model) + let mut query = tokens::Entity::insert(model) .on_conflict( OnConflict::columns([tokens::Column::Mint]) .update_columns([ tokens::Column::Supply, tokens::Column::TokenProgram, tokens::Column::MintAuthority, + tokens::Column::CloseAuthority, + tokens::Column::ExtensionData, tokens::Column::SlotUpdated, tokens::Column::Decimals, tokens::Column::FreezeAuthority, ]) - .action_cond_where( - Condition::all() - .add( - Condition::any() - .add( - Expr::tbl( - Alias::new("excluded"), - tokens::Column::Supply, - ) - .ne(Expr::tbl(tokens::Entity, tokens::Column::Supply)), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - tokens::Column::TokenProgram, - ) - .ne( - Expr::tbl( - tokens::Entity, - tokens::Column::TokenProgram, - ), - ), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - tokens::Column::MintAuthority, - ) - .ne( - Expr::tbl( - tokens::Entity, - tokens::Column::MintAuthority, - ), - ), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - tokens::Column::Decimals, - ) - .ne( - Expr::tbl(tokens::Entity, tokens::Column::Decimals), - ), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - tokens::Column::FreezeAuthority, - ) - .ne( - Expr::tbl( - tokens::Entity, - tokens::Column::FreezeAuthority, - ), - ), - ), - ) - .add( - Expr::tbl(tokens::Entity, tokens::Column::SlotUpdated) - .lte(account_info.slot as i64), - ), - ) .to_owned(), ) - .exec_without_returning(&txn) - .await?; + .build(DbBackend::Postgres); + + query.sql = format!( + "{} WHERE excluded.slot_updated >= tokens.slot_updated", + query.sql + ); + + txn.execute(query).await?; upsert_assets_mint_account_columns( AssetMintAccountColumns { diff --git a/program_transformers/src/token_metadata/master_edition.rs b/program_transformers/src/token_metadata/master_edition.rs index a617e7c2f..a95304e5f 100644 --- a/program_transformers/src/token_metadata/master_edition.rs +++ b/program_transformers/src/token_metadata/master_edition.rs @@ -9,8 +9,9 @@ use { }, sea_orm::{ entity::{ActiveValue, EntityTrait}, - sea_query::{query::OnConflict, Alias, Condition, Expr}, - DatabaseTransaction, + query::QueryTrait, + sea_query::query::OnConflict, + ConnectionTrait, DatabaseTransaction, DbBackend, }, solana_sdk::pubkey::Pubkey, }; @@ -73,7 +74,7 @@ pub async fn save_master_edition( ..Default::default() }; - asset_v1_account_attachments::Entity::insert(model) + let query = asset_v1_account_attachments::Entity::insert(model) .on_conflict( OnConflict::columns([asset_v1_account_attachments::Column::Id]) .update_columns([ @@ -81,40 +82,9 @@ pub async fn save_master_edition( asset_v1_account_attachments::Column::Data, asset_v1_account_attachments::Column::SlotUpdated, ]) - .action_cond_where( - Condition::all() - .add( - Expr::tbl( - Alias::new("excluded"), - asset_v1_account_attachments::Column::AttachmentType, - ) - .ne(Expr::tbl( - asset_v1_account_attachments::Entity, - asset_v1_account_attachments::Column::AttachmentType, - )), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - asset_v1_account_attachments::Column::Data, - ) - .ne(Expr::tbl( - asset_v1_account_attachments::Entity, - asset_v1_account_attachments::Column::Data, - )), - ) - .add( - Expr::tbl( - asset_v1_account_attachments::Entity, - asset_v1_account_attachments::Column::SlotUpdated, - ) - .lte(slot as i64), - ), - ) .to_owned(), ) - .exec_without_returning(txn) - .await - .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; + .build(DbBackend::Postgres); + txn.execute(query).await?; Ok(()) } diff --git a/program_transformers/src/token_metadata/v1_asset.rs b/program_transformers/src/token_metadata/v1_asset.rs index c737431d5..13b6e3c32 100644 --- a/program_transformers/src/token_metadata/v1_asset.rs +++ b/program_transformers/src/token_metadata/v1_asset.rs @@ -21,9 +21,9 @@ use { }, sea_orm::{ entity::{ActiveValue, EntityTrait}, - query::JsonValue, - sea_query::{query::OnConflict, Alias, Expr}, - Condition, ConnectionTrait, Statement, TransactionTrait, + query::{JsonValue, QueryTrait}, + sea_query::query::OnConflict, + ConnectionTrait, DbBackend, Statement, TransactionTrait, }, solana_sdk::pubkey, tracing::warn, @@ -41,24 +41,18 @@ pub async fn burn_v1_asset( burnt: ActiveValue::Set(true), ..Default::default() }; - - asset::Entity::insert(model) + let mut query = asset::Entity::insert(model) .on_conflict( OnConflict::columns([asset::Column::Id]) .update_columns([asset::Column::SlotUpdated, asset::Column::Burnt]) - .action_cond_where( - Condition::all() - .add( - Expr::tbl(Alias::new("excluded"), asset::Column::Burnt) - .ne(Expr::tbl(asset::Entity, asset::Column::Burnt)), - ) - .add(Expr::tbl(asset::Entity, asset::Column::SlotUpdated).lte(slot_i)), - ) .to_owned(), ) - .exec_without_returning(conn) - .await?; - + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated > asset.slot_updated", + query.sql + ); + conn.execute(query).await?; Ok(()) } @@ -140,7 +134,7 @@ pub async fn save_v1_asset( txn.execute(set_lock_timeout_stmt).await?; txn.execute(set_local_app_name_stmt).await?; - asset_data::Entity::insert(asset_data_model) + let mut query = asset_data::Entity::insert(asset_data_model) .on_conflict( OnConflict::columns([asset_data::Column::Id]) .update_columns([ @@ -154,93 +148,14 @@ pub async fn save_v1_asset( asset_data::Column::RawSymbol, asset_data::Column::BaseInfoSeq, ]) - .action_cond_where( - Condition::all() - .add( - Condition::any() - .add( - Expr::tbl( - Alias::new("excluded"), - asset_data::Column::ChainDataMutability, - ) - .ne(Expr::tbl( - asset_data::Entity, - asset_data::Column::ChainDataMutability, - )), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - asset_data::Column::ChainData, - ) - .ne(Expr::tbl( - asset_data::Entity, - asset_data::Column::ChainData, - )), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - asset_data::Column::MetadataUrl, - ) - .ne(Expr::tbl( - asset_data::Entity, - asset_data::Column::MetadataUrl, - )), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - asset_data::Column::MetadataMutability, - ) - .ne(Expr::tbl( - asset_data::Entity, - asset_data::Column::MetadataMutability, - )), - ) - .add( - Expr::tbl(Alias::new("excluded"), asset_data::Column::Reindex) - .ne(Expr::tbl( - asset_data::Entity, - asset_data::Column::Reindex, - )), - ) - .add( - Expr::tbl(Alias::new("excluded"), asset_data::Column::RawName) - .ne(Expr::tbl( - asset_data::Entity, - asset_data::Column::RawName, - )), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - asset_data::Column::RawSymbol, - ) - .ne(Expr::tbl( - asset_data::Entity, - asset_data::Column::RawSymbol, - )), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - asset_data::Column::BaseInfoSeq, - ) - .ne(Expr::tbl( - asset_data::Entity, - asset_data::Column::BaseInfoSeq, - )), - ), - ) - .add( - Expr::tbl(asset_data::Entity, asset_data::Column::SlotUpdated) - .lte(slot_i), - ), - ) .to_owned(), ) - .exec_without_returning(&txn) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated > asset_data.slot_updated", + query.sql + ); + txn.execute(query) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -269,14 +184,14 @@ pub async fn save_v1_asset( attachment_type: ActiveValue::Set(V1AccountAttachments::MasterEditionV2), ..Default::default() }; - - asset_v1_account_attachments::Entity::insert(attachment) + let query = asset_v1_account_attachments::Entity::insert(attachment) .on_conflict( OnConflict::columns([asset_v1_account_attachments::Column::Id]) .do_nothing() .to_owned(), ) - .exec_without_returning(&txn) + .build(DbBackend::Postgres); + txn.execute(query) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -287,34 +202,21 @@ pub async fn save_v1_asset( slot_updated: ActiveValue::Set(slot_i), ..Default::default() }; - - asset_authority::Entity::insert(model) + let mut query = asset_authority::Entity::insert(model) .on_conflict( - OnConflict::column(asset_authority::Column::AssetId) + OnConflict::columns([asset_authority::Column::AssetId]) .update_columns([ asset_authority::Column::Authority, asset_authority::Column::SlotUpdated, ]) - .action_cond_where( - Condition::all() - .add( - Expr::tbl(Alias::new("excluded"), asset_authority::Column::Authority) - .ne(Expr::tbl( - asset_authority::Entity, - asset_authority::Column::Authority, - )), - ) - .add( - Expr::tbl( - asset_authority::Entity, - asset_authority::Column::SlotUpdated, - ) - .lte(slot_i), - ), - ) .to_owned(), ) - .exec_without_returning(&txn) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated > asset_authority.slot_updated AND excluded.authority != asset_authority.authority", + query.sql + ); + txn.execute(query) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; @@ -328,8 +230,7 @@ pub async fn save_v1_asset( slot_updated: ActiveValue::Set(Some(slot_i)), ..Default::default() }; - - asset_grouping::Entity::insert(model) + let mut query = asset_grouping::Entity::insert(model) .on_conflict( OnConflict::columns([ asset_grouping::Column::AssetId, @@ -339,39 +240,16 @@ pub async fn save_v1_asset( asset_grouping::Column::GroupValue, asset_grouping::Column::Verified, asset_grouping::Column::SlotUpdated, + asset_grouping::Column::GroupInfoSeq, ]) - .action_cond_where( - Condition::all() - .add( - Condition::any().add( - Expr::tbl( - Alias::new("excluded"), - asset_grouping::Column::GroupValue, - ) - .ne(Expr::tbl( - asset_grouping::Entity, - asset_grouping::Column::GroupValue, - )) - .add( - Expr::tbl( - Alias::new("excluded"), - asset_grouping::Column::Verified, - ) - .ne(Expr::tbl( - asset_grouping::Entity, - asset_grouping::Column::Verified, - )), - ), - ), - ) - .add( - Expr::tbl(asset_grouping::Entity, asset_grouping::Column::SlotUpdated) - .lte(slot_i), - ), - ) .to_owned(), ) - .exec_without_returning(&txn) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated > asset_grouping.slot_updated", + query.sql + ); + txn.execute(query) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; } @@ -394,7 +272,7 @@ pub async fn save_v1_asset( .collect::>(); if !creators.is_empty() { - asset_creators::Entity::insert_many(creators) + let mut query = asset_creators::Entity::insert_many(creators) .on_conflict( OnConflict::columns([ asset_creators::Column::AssetId, @@ -404,51 +282,17 @@ pub async fn save_v1_asset( asset_creators::Column::Creator, asset_creators::Column::Share, asset_creators::Column::Verified, + asset_creators::Column::Seq, asset_creators::Column::SlotUpdated, ]) - .action_cond_where( - Condition::all() - .add( - Condition::any() - .add( - Expr::tbl( - Alias::new("excluded"), - asset_creators::Column::Creator, - ) - .ne(Expr::tbl( - asset_creators::Entity, - asset_creators::Column::Creator, - )), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - asset_creators::Column::Share, - ) - .ne(Expr::tbl( - asset_creators::Entity, - asset_creators::Column::Share, - )), - ) - .add( - Expr::tbl( - Alias::new("excluded"), - asset_creators::Column::Verified, - ) - .ne(Expr::tbl( - asset_creators::Entity, - asset_creators::Column::Verified, - )), - ), - ) - .add( - Expr::tbl(asset_creators::Entity, asset_creators::Column::SlotUpdated) - .lte(slot_i), - ), - ) .to_owned(), ) - .exec_without_returning(&txn) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated >= asset_creators.slot_updated OR asset_creators.slot_updated is NULL", + query.sql + ); + txn.execute(query) .await .map_err(|db_err| ProgramTransformerError::AssetIndexError(db_err.to_string()))?; }