Skip to content

Commit

Permalink
rename to get_asset_signatures
Browse files Browse the repository at this point in the history
  • Loading branch information
tahsintunan committed Jan 15, 2024
1 parent a150bc3 commit c762a95
Show file tree
Hide file tree
Showing 23 changed files with 142 additions and 74 deletions.
12 changes: 6 additions & 6 deletions das_api/src/api/api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use digital_asset_types::{
Cursor, PageOptions, SearchAssetsQuery,
},
dapi::{
get_asset, get_asset_proofs, get_assets, get_assets_by_authority, get_assets_by_creator,
get_assets_by_group, get_assets_by_owner, get_proof_for_asset, get_signatures_for_asset,
get_asset, get_asset_proofs, get_asset_signatures, get_assets, get_assets_by_authority,
get_assets_by_creator, get_assets_by_group, get_assets_by_owner, get_proof_for_asset,
search_assets,
},
rpc::{
Expand Down Expand Up @@ -462,11 +462,11 @@ impl ApiContract for DasApi {
.map_err(Into::into)
}

async fn get_signatures_for_asset(
async fn get_asset_signatures(
self: &DasApi,
payload: GetSignaturesForAsset,
payload: GetAssetSignatures,
) -> Result<TransactionSignatureList, DasApiError> {
let GetSignaturesForAsset {
let GetAssetSignatures {
id,
limit,
page,
Expand All @@ -489,7 +489,7 @@ impl ApiContract for DasApi {

let page_options = self.validate_pagination(limit, page, &before, &after, &cursor, None)?;

get_signatures_for_asset(&self.db_connection, id, tree, leaf_index, page_options)
get_asset_signatures(&self.db_connection, id, tree, leaf_index, page_options)
.await
.map_err(Into::into)
}
Expand Down
8 changes: 4 additions & 4 deletions das_api/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub struct GetGrouping {

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, Default)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
pub struct GetSignaturesForAsset {
pub struct GetAssetSignatures {
pub id: Option<String>,
pub limit: Option<u32>,
pub page: Option<u32>,
Expand Down Expand Up @@ -235,13 +235,13 @@ pub trait ApiContract: Send + Sync + 'static {
)]
async fn search_assets(&self, payload: SearchAssets) -> Result<AssetList, DasApiError>;
#[rpc(
name = "getSignaturesForAsset",
name = "getAssetSignatures",
params = "named",
summary = "Get transaction signatures for an asset"
)]
async fn get_signatures_for_asset(
async fn get_asset_signatures(
&self,
payload: GetSignaturesForAsset,
payload: GetAssetSignatures,
) -> Result<TransactionSignatureList, DasApiError>;
#[rpc(
name = "getGrouping",
Expand Down
7 changes: 4 additions & 3 deletions das_api/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,16 @@ impl RpcApiBuilder {
module.register_alias("getAssetsByGroup", "get_assets_by_group")?;

module.register_async_method(
"getSignaturesForAsset",
"getAssetSignatures",
|rpc_params, rpc_context| async move {
let payload = rpc_params.parse::<GetSignaturesForAsset>()?;
let payload = rpc_params.parse::<GetAssetSignatures>()?;
rpc_context
.get_signatures_for_asset(payload)
.get_asset_signatures(payload)
.await
.map_err(Into::into)
},
)?;
module.register_alias("getSignaturesForAsset", "getAssetSignatures")?;

module.register_async_method("search_assets", |rpc_params, rpc_context| async move {
let payload = rpc_params.parse::<SearchAssets>()?;
Expand Down
4 changes: 4 additions & 0 deletions digital_asset_types/src/dao/generated/sea_orm_active_enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ pub enum Instruction {
VerifyCollection,
#[sea_orm(string_value = "verify_creator")]
VerifyCreator,
#[sea_orm(string_value = "update_metadata")]
UpdateMetadata,
}
// Added manually for convenience.
impl Instruction {
Expand All @@ -176,6 +178,7 @@ impl Instruction {
"UnverifyCreator" => Instruction::UnverifyCreator,
"VerifyCollection" => Instruction::VerifyCollection,
"VerifyCreator" => Instruction::VerifyCreator,
"UpdateMetadata" => Instruction::UpdateMetadata,
_ => Instruction::Unknown,
}
}
Expand All @@ -196,6 +199,7 @@ impl Instruction {
Instruction::UnverifyCreator => "UnverifyCreator",
Instruction::VerifyCollection => "VerifyCollection",
Instruction::VerifyCreator => "VerifyCreator",
Instruction::UpdateMetadata => "UpdateMetadata",
_ => "Unknown",
}
}
Expand Down
2 changes: 1 addition & 1 deletion digital_asset_types/src/dao/scopes/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ pub async fn fetch_transactions(
Ok(transaction_list)
}

pub async fn get_signatures_for_asset(
pub async fn get_asset_signatures(
conn: &impl ConnectionTrait,
asset_id: Option<Vec<u8>>,
tree_id: Option<Vec<u8>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use sea_orm::DbErr;

use super::common::{build_transaction_signatures_response, create_pagination};

pub async fn get_signatures_for_asset(
pub async fn get_asset_signatures(
db: &DatabaseConnection,
asset_id: Option<Vec<u8>>,
tree: Option<Vec<u8>>,
leaf_idx: Option<i64>,
page_options: PageOptions,
) -> Result<TransactionSignatureList, DbErr> {
let pagination = create_pagination(&page_options)?;
let transactions = scopes::asset::get_signatures_for_asset(
let transactions = scopes::asset::get_asset_signatures(
db,
asset_id,
tree,
Expand Down
5 changes: 3 additions & 2 deletions digital_asset_types/src/dapi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ mod assets_by_group;
mod assets_by_owner;
mod change_logs;
mod get_asset;
mod get_asset_signatures;
mod search_assets;
mod signatures_for_asset;

pub mod common;

pub use assets_by_authority::*;
pub use assets_by_creator::*;
pub use assets_by_group::*;
pub use assets_by_owner::*;
pub use change_logs::*;
pub use get_asset::*;
pub use get_asset_signatures::*;
pub use search_assets::*;
pub use signatures_for_asset::*;
2 changes: 1 addition & 1 deletion nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub fn account_worker<T: Messenger>(
tokio::spawn(async move {
let source = T::new(config).await;
if let Ok(mut msg) = source {
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender));
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, false));
loop {
let e = msg.recv(stream_key, consumption_type.clone()).await;
let mut tasks = JoinSet::new();
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct IngesterConfig {
pub worker_config: Option<Vec<WorkerConfig>>,
pub code_version: Option<&'static str>,
pub background_task_runner_config: Option<BackgroundTaskRunnerConfig>,
pub cl_audits: Option<bool>, // save transaction logs for compressed nfts
}

#[derive(Deserialize, PartialEq, Debug, Clone)]
Expand Down
6 changes: 2 additions & 4 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ use cadence_macros::{is_global_default_set, statsd_count};
use chrono::Duration;
use clap::{arg, command, value_parser};
use log::{error, info};
use plerkle_messenger::{
redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_BACKFILL_STREAM, ACCOUNT_STREAM,
TRANSACTION_BACKFILL_STREAM, TRANSACTION_STREAM,
};
use plerkle_messenger::{redis_messenger::RedisMessenger, ConsumptionType};
use std::{path::PathBuf, time};
use tokio::{signal, task::JoinSet};

Expand Down Expand Up @@ -145,6 +142,7 @@ pub async fn main() -> Result<(), IngesterError> {
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
stream_name,
);
}
Expand Down
4 changes: 3 additions & 1 deletion nft_ingester/src/program_transformers/bubblegum/burn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ pub async fn burn<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let Some(cl) = &parsing_result.tree_update {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
let leaf_index = cl.index;
let (asset_id, _) = Pubkey::find_program_address(
&[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ pub async fn cancel_redeem<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
match le.schema {
LeafSchema::V1 {
id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub async fn process<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -37,7 +38,8 @@ where
"Handling collection verification event for {} (verify: {}): {}",
collection, verify, bundle.txn_id
);
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;
let id_bytes = match le.schema {
LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub async fn process<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
instruction: &str,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand Down Expand Up @@ -57,7 +58,8 @@ where
"Handling creator verification event for creator {} (verify: {}): {}",
creator, verify, bundle.txn_id
);
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits)
.await?;

let asset_id_bytes = match le.schema {
LeafSchema::V1 {
Expand Down
100 changes: 69 additions & 31 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use crate::error::IngesterError;
use digital_asset_types::dao::{
asset, asset_authority, asset_creators, asset_data, asset_grouping, cl_audits_v2, cl_items,
asset, asset_authority, asset_creators, asset_data, asset_grouping, backfill_items,
cl_audits_v2, cl_items,
sea_orm_active_enums::{
ChainMutability, Instruction, Mutability, OwnerType, RoyaltyTargetType,
SpecificationAssetClass, SpecificationVersions,
},
};
use log::{debug, error};
use log::{debug, error, info};
use mpl_bubblegum::types::{Collection, Creator};
use sea_orm::{query::*, sea_query::OnConflict, ActiveValue::Set, DbBackend, EntityTrait};
use sea_orm::{
query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait,
};
use spl_account_compression::events::ChangeLogEventV1;
use std::collections::HashSet;

Expand All @@ -18,11 +21,12 @@ pub async fn save_changelog_event<'c, T>(
txn_id: &str,
txn: &T,
instruction: &str,
cl_audits: bool,
) -> Result<u64, IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
insert_change_log(change_log_event, slot, txn_id, txn, instruction).await?;
insert_change_log(change_log_event, slot, txn_id, txn, instruction, cl_audits).await?;
Ok(change_log_event.seq)
}

Expand All @@ -32,10 +36,11 @@ const fn node_idx_to_leaf_idx(index: i64, tree_height: u32) -> i64 {

pub async fn insert_change_log<'c, T>(
change_log_event: &ChangeLogEventV1,
_slot: u64,
slot: u64,
txn_id: &str,
txn: &T,
instruction: &str,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand Down Expand Up @@ -90,35 +95,68 @@ where
}

// Insert the audit item after the insert into cl_items have been completed
let tx_id_bytes = bs58::decode(txn_id)
.into_vec()
.map_err(|_e| IngesterError::ChangeLogEventMalformed)?;
let audit_item_v2 = cl_audits_v2::ActiveModel {
tree: Set(tree_id.to_vec()),
leaf_idx: Set(change_log_event.index as i64),
seq: Set(change_log_event.seq as i64),
tx: Set(tx_id_bytes),
instruction: Set(Instruction::from_str(instruction)),
..Default::default()
};
let query = cl_audits_v2::Entity::insert(audit_item_v2)
.on_conflict(
OnConflict::columns([
cl_audits_v2::Column::Tree,
cl_audits_v2::Column::LeafIdx,
cl_audits_v2::Column::Seq,
])
.do_nothing()
.to_owned(),
)
.build(DbBackend::Postgres);
match txn.execute(query).await {
Ok(_) => {}
Err(e) => {
error!("Error while inserting into cl_audits_v2: {:?}", e);
if cl_audits {
let tx_id_bytes = bs58::decode(txn_id)
.into_vec()
.map_err(|_e| IngesterError::ChangeLogEventMalformed)?;
let audit_item_v2 = cl_audits_v2::ActiveModel {
tree: Set(tree_id.to_vec()),
leaf_idx: Set(change_log_event.index as i64),
seq: Set(change_log_event.seq as i64),
tx: Set(tx_id_bytes),
instruction: Set(Instruction::from_str(instruction)),
..Default::default()
};
let query = cl_audits_v2::Entity::insert(audit_item_v2)
.on_conflict(
OnConflict::columns([
cl_audits_v2::Column::Tree,
cl_audits_v2::Column::LeafIdx,
cl_audits_v2::Column::Seq,
])
.do_nothing()
.to_owned(),
)
.build(DbBackend::Postgres);
match txn.execute(query).await {
Ok(_) => {}
Err(e) => {
error!("Error while inserting into cl_audits_v2: {:?}", e);
}
}
}

// If and only if the entire path of nodes was inserted into the `cl_items` table, then insert
// a single row into the `backfill_items` table. This way if an incomplete path was inserted
// into `cl_items` due to an error, a gap will be created for the tree and the backfiller will
// fix it.
if i - 1 == depth as i64 {
// See if the tree already exists in the `backfill_items` table.
let rows = backfill_items::Entity::find()
.filter(backfill_items::Column::Tree.eq(tree_id))
.limit(1)
.all(txn)
.await?;

// If the tree does not exist in `backfill_items` and the sequence number is greater than 1,
// then we know we will need to backfill the tree from sequence number 1 up to the current
// sequence number. So in this case we set at flag to force checking the tree.
let force_chk = rows.is_empty() && change_log_event.seq > 1;

info!("Adding to backfill_items table at level {}", i - 1);
let item = backfill_items::ActiveModel {
tree: Set(tree_id.to_vec()),
seq: Set(change_log_event.seq as i64),
slot: Set(slot as i64),
force_chk: Set(force_chk),
backfilled: Set(false),
failed: Set(false),
..Default::default()
};

backfill_items::Entity::insert(item).exec(txn).await?;
}

Ok(())
}

Expand Down
Loading

0 comments on commit c762a95

Please sign in to comment.