From 3751934785846f0cedb6be7a9cb42c7f0c4823d9 Mon Sep 17 00:00:00 2001 From: Nagaprasadvr Date: Wed, 30 Oct 2024 15:11:32 +0530 Subject: [PATCH 1/7] WIP: restrict metadata_json refetching --- core/src/metadata_json.rs | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/core/src/metadata_json.rs b/core/src/metadata_json.rs index 36c3ebf0a..7c70a0cba 100644 --- a/core/src/metadata_json.rs +++ b/core/src/metadata_json.rs @@ -8,6 +8,7 @@ use { reqwest::{Client, Url as ReqwestUrl}, sea_orm::{entity::*, SqlxPostgresConnector}, serde::{Deserialize, Serialize}, + sqlx::{Pool, Postgres}, tokio::{ sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender}, task::JoinHandle, @@ -65,6 +66,25 @@ pub struct MetadataJsonDownloadWorkerArgs { metadata_json_download_worker_request_timeout: u64, } +async fn is_asset_data_fetch_req( + download_metadata_info: &DownloadMetadataInfo, + pool: Pool, +) -> bool { + let DownloadMetadataInfo { + asset_data_id, + slot: incoming_slot, + .. + } = download_metadata_info; + + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + + return asset_data::Entity::find_by_id(asset_data_id.clone()) + .one(&conn) + .await + .unwrap_or(None) + .is_some_and(|model| *incoming_slot > model.slot_updated); +} + impl MetadataJsonDownloadWorkerArgs { pub fn start( &self, @@ -90,9 +110,12 @@ impl MetadataJsonDownloadWorkerArgs { } let pool = pool.clone(); - let client = client.clone(); - handlers.push(spawn_task(client, pool, download_metadata_info)); + if is_asset_data_fetch_req(&download_metadata_info, pool.clone()).await { + println!("Fetching metadata for asset_data_id"); + let client = client.clone(); + handlers.push(spawn_task(client, pool, download_metadata_info)); + } } while handlers.next().await.is_some() {} From ed9356af2772c6763acebab95e7428cf05f518cd Mon Sep 17 00:00:00 2001 From: Nagaprasadvr Date: Wed, 30 Oct 2024 20:43:35 +0530 Subject: [PATCH 2/7] feat: track fetch_duration, retries, statuscode for metadata_json fetch task --- core/src/metadata_json.rs | 81 +++++++++++++++---- .../src/dao/generated/asset_data.rs | 10 +++ .../src/dao/generated/sea_orm_active_enums.rs | 9 +++ digital_asset_types/tests/common.rs | 3 + digital_asset_types/tests/json_parsing.rs | 3 + program_transformers/src/bubblegum/db.rs | 1 + .../src/mpl_core_program/v1_asset.rs | 1 + .../src/token_metadata/v1_asset.rs | 1 + 8 files changed, 94 insertions(+), 15 deletions(-) diff --git a/core/src/metadata_json.rs b/core/src/metadata_json.rs index 7c70a0cba..3d5e1700b 100644 --- a/core/src/metadata_json.rs +++ b/core/src/metadata_json.rs @@ -1,7 +1,7 @@ use { backon::{ExponentialBuilder, Retryable}, clap::Parser, - digital_asset_types::dao::asset_data, + digital_asset_types::dao::{asset_data, sea_orm_active_enums::MetadataJsonFetchResult}, futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}, indicatif::HumanDuration, log::{debug, error}, @@ -9,6 +9,7 @@ use { sea_orm::{entity::*, SqlxPostgresConnector}, serde::{Deserialize, Serialize}, sqlx::{Pool, Postgres}, + std::sync::atomic::{AtomicU8, Ordering}, tokio::{ sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender}, task::JoinHandle, @@ -112,9 +113,7 @@ impl MetadataJsonDownloadWorkerArgs { let pool = pool.clone(); if is_asset_data_fetch_req(&download_metadata_info, pool.clone()).await { - println!("Fetching metadata for asset_data_id"); - let client = client.clone(); - handlers.push(spawn_task(client, pool, download_metadata_info)); + handlers.push(spawn_task(client.clone(), pool, download_metadata_info)); } } @@ -182,20 +181,39 @@ pub enum StatusCode { Code(reqwest::StatusCode), } +pub struct MetadataJsonData { + value: serde_json::Value, + time_elapsed: u64, + retries: u8, +} + +pub struct MetadataJsonFetchError { + error: FetchMetadataJsonError, + time_elapsed: u64, + retries: u8, +} + async fn fetch_metadata_json( client: Client, metadata_json_url: &str, -) -> Result { - (|| async { +) -> Result { + let retries = AtomicU8::new(0); + let start = Instant::now(); + + let res = (|| async { let url = ReqwestUrl::parse(metadata_json_url)?; let response = client.get(url.clone()).send().await?; match response.error_for_status() { - Ok(res) => res - .json::() - .await - .map_err(|source| FetchMetadataJsonError::Parse { source, url }), + Ok(res) => { + let value = res + .json::() + .await + .map_err(|source| FetchMetadataJsonError::Parse { source, url })?; + + Ok(value) + } Err(source) => { let status = source .status() @@ -211,7 +229,25 @@ async fn fetch_metadata_json( } }) .retry(&ExponentialBuilder::default()) - .await + .notify(|_e, _d| { + retries.fetch_add(1, Ordering::Relaxed); + }) + .await; + + let time_elapsed = start.elapsed().as_secs(); + + let retries = retries.load(Ordering::Relaxed); + + res.map(|value| MetadataJsonData { + value, + time_elapsed, + retries, + }) + .map_err(|error| MetadataJsonFetchError { + error, + time_elapsed, + retries, + }) } #[derive(thiserror::Error, Debug)] @@ -229,22 +265,37 @@ pub async fn perform_metadata_json_task( pool: sqlx::PgPool, download_metadata_info: &DownloadMetadataInfo, ) -> Result { + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); match fetch_metadata_json(client, &download_metadata_info.uri).await { Ok(metadata) => { let active_model = asset_data::ActiveModel { id: Set(download_metadata_info.asset_data_id.clone()), - metadata: Set(metadata), + metadata: Set(metadata.value), reindex: Set(Some(false)), + last_requested_status_code: Set(Some(MetadataJsonFetchResult::Success)), + fetch_duration_in_secs: Set(Some(metadata.time_elapsed)), + failed_fetch_attempts: Set(Some(metadata.retries)), ..Default::default() }; - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - let model = active_model.update(&conn).await?; Ok(model) } - Err(e) => Err(MetadataJsonTaskError::Fetch(e)), + Err(e) => { + let active_model = asset_data::ActiveModel { + id: Set(download_metadata_info.asset_data_id.clone()), + reindex: Set(Some(true)), + last_requested_status_code: Set(Some(MetadataJsonFetchResult::Failure)), + failed_fetch_attempts: Set(Some(e.retries)), + fetch_duration_in_secs: Set(Some(e.time_elapsed)), + ..Default::default() + }; + + active_model.update(&conn).await?; + + return Err(MetadataJsonTaskError::Fetch(e.error)); + } } } diff --git a/digital_asset_types/src/dao/generated/asset_data.rs b/digital_asset_types/src/dao/generated/asset_data.rs index c96c265e6..4ea6cd454 100644 --- a/digital_asset_types/src/dao/generated/asset_data.rs +++ b/digital_asset_types/src/dao/generated/asset_data.rs @@ -1,6 +1,7 @@ //! SeaORM Entity. Generated by sea-orm-codegen 0.9.3 use super::sea_orm_active_enums::ChainMutability; +use super::sea_orm_active_enums::MetadataJsonFetchResult; use super::sea_orm_active_enums::Mutability; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -27,6 +28,9 @@ pub struct Model { pub raw_name: Option>, pub raw_symbol: Option>, pub base_info_seq: Option, + pub fetch_duration_in_secs: Option, + pub last_requested_status_code: Option, + pub failed_fetch_attempts: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -42,6 +46,9 @@ pub enum Column { RawName, RawSymbol, BaseInfoSeq, + FetchDurationInSecs, + LastRequestedStatusCode, + FailedFetchAttempts, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -74,6 +81,9 @@ impl ColumnTrait for Column { Self::RawName => ColumnType::Binary.def().null(), Self::RawSymbol => ColumnType::Binary.def().null(), Self::BaseInfoSeq => ColumnType::BigInteger.def().null(), + Self::FetchDurationInSecs => ColumnType::Unsigned.def().null(), + Self::LastRequestedStatusCode => ColumnType::Unsigned.def().null(), + Self::FailedFetchAttempts => ColumnType::Unsigned.def().null(), } } } diff --git a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs index e4d0e012d..eae72a25d 100644 --- a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs +++ b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs @@ -163,3 +163,12 @@ pub enum Instruction { #[sea_orm(string_value = "verify_creator")] VerifyCreator, } + +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "fetch_result")] +pub enum MetadataJsonFetchResult { + #[sea_orm(string_value = "success")] + Success, + #[sea_orm(string_value = "failure")] + Failure, +} diff --git a/digital_asset_types/tests/common.rs b/digital_asset_types/tests/common.rs index 0dcad71ef..2bc85012f 100644 --- a/digital_asset_types/tests/common.rs +++ b/digital_asset_types/tests/common.rs @@ -86,6 +86,9 @@ pub fn create_asset_data( raw_name: Some(metadata.name.into_bytes().to_vec().clone()), raw_symbol: Some(metadata.symbol.into_bytes().to_vec().clone()), base_info_seq: Some(0), + fetch_duration_in_secs: None, + last_requested_status_code: None, + failed_fetch_attempts: None, }, ) } diff --git a/digital_asset_types/tests/json_parsing.rs b/digital_asset_types/tests/json_parsing.rs index e630c91ff..57c58c4d3 100644 --- a/digital_asset_types/tests/json_parsing.rs +++ b/digital_asset_types/tests/json_parsing.rs @@ -37,6 +37,9 @@ pub async fn parse_onchain_json(json: serde_json::Value) -> Content { raw_name: Some(String::from("Handalf").into_bytes().to_vec()), raw_symbol: Some(String::from("").into_bytes().to_vec()), base_info_seq: Some(0), + fetch_duration_in_secs: None, + last_requested_status_code: None, + failed_fetch_attempts: None, }; v1_content_from_json(&asset_data).unwrap() diff --git a/program_transformers/src/bubblegum/db.rs b/program_transformers/src/bubblegum/db.rs index 4c8c33f4a..507aca3fb 100644 --- a/program_transformers/src/bubblegum/db.rs +++ b/program_transformers/src/bubblegum/db.rs @@ -390,6 +390,7 @@ where raw_name: ActiveValue::Set(Some(raw_name)), raw_symbol: ActiveValue::Set(Some(raw_symbol)), base_info_seq: ActiveValue::Set(Some(seq)), + ..Default::default() }; let mut query = asset_data::Entity::insert(model) diff --git a/program_transformers/src/mpl_core_program/v1_asset.rs b/program_transformers/src/mpl_core_program/v1_asset.rs index fd7d84f07..16f974c35 100644 --- a/program_transformers/src/mpl_core_program/v1_asset.rs +++ b/program_transformers/src/mpl_core_program/v1_asset.rs @@ -195,6 +195,7 @@ pub async fn save_v1_asset( raw_name: ActiveValue::Set(Some(name.to_vec())), raw_symbol: ActiveValue::Set(None), base_info_seq: ActiveValue::Set(Some(0)), + ..Default::default() }; let mut query = asset_data::Entity::insert(asset_data_model) diff --git a/program_transformers/src/token_metadata/v1_asset.rs b/program_transformers/src/token_metadata/v1_asset.rs index 2c3a38149..5857badde 100644 --- a/program_transformers/src/token_metadata/v1_asset.rs +++ b/program_transformers/src/token_metadata/v1_asset.rs @@ -233,6 +233,7 @@ pub async fn save_v1_asset( raw_name: ActiveValue::Set(Some(name.to_vec())), raw_symbol: ActiveValue::Set(Some(symbol.to_vec())), base_info_seq: ActiveValue::Set(Some(0)), + ..Default::default() }; let txn = conn.begin().await?; From ccacf6ffc439ce49b8b857b748a11284cf27c75e Mon Sep 17 00:00:00 2001 From: Nagaprasadvr Date: Wed, 30 Oct 2024 20:53:48 +0530 Subject: [PATCH 3/7] chore:cleanup --- core/src/metadata_json.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/metadata_json.rs b/core/src/metadata_json.rs index 3d5e1700b..fd466a00b 100644 --- a/core/src/metadata_json.rs +++ b/core/src/metadata_json.rs @@ -79,11 +79,11 @@ async fn is_asset_data_fetch_req( let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - return asset_data::Entity::find_by_id(asset_data_id.clone()) + asset_data::Entity::find_by_id(asset_data_id.clone()) .one(&conn) .await .unwrap_or(None) - .is_some_and(|model| *incoming_slot > model.slot_updated); + .is_some_and(|model| *incoming_slot > model.slot_updated) } impl MetadataJsonDownloadWorkerArgs { @@ -294,7 +294,7 @@ pub async fn perform_metadata_json_task( active_model.update(&conn).await?; - return Err(MetadataJsonTaskError::Fetch(e.error)); + Err(MetadataJsonTaskError::Fetch(e.error)) } } } From d3cf166af65b70d5c64b9832de15c4e21595051f Mon Sep 17 00:00:00 2001 From: Nagaprasadvr Date: Thu, 31 Oct 2024 20:28:17 +0530 Subject: [PATCH 4/7] chore: optimize --- core/src/metadata_json.rs | 68 +++++-------------- .../src/dao/generated/asset_data.rs | 9 +-- digital_asset_types/tests/common.rs | 3 +- digital_asset_types/tests/json_parsing.rs | 3 +- 4 files changed, 22 insertions(+), 61 deletions(-) diff --git a/core/src/metadata_json.rs b/core/src/metadata_json.rs index fd466a00b..1681b541f 100644 --- a/core/src/metadata_json.rs +++ b/core/src/metadata_json.rs @@ -67,14 +67,11 @@ pub struct MetadataJsonDownloadWorkerArgs { metadata_json_download_worker_request_timeout: u64, } -async fn is_asset_data_fetch_req( - download_metadata_info: &DownloadMetadataInfo, - pool: Pool, -) -> bool { +async fn skip_index(download_metadata_info: &DownloadMetadataInfo, pool: Pool) -> bool { let DownloadMetadataInfo { asset_data_id, slot: incoming_slot, - .. + uri, } = download_metadata_info; let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); @@ -83,7 +80,7 @@ async fn is_asset_data_fetch_req( .one(&conn) .await .unwrap_or(None) - .is_some_and(|model| *incoming_slot > model.slot_updated) + .is_some_and(|model| model.metadata_url.eq(uri) && model.slot_updated >= *incoming_slot) } impl MetadataJsonDownloadWorkerArgs { @@ -112,9 +109,10 @@ impl MetadataJsonDownloadWorkerArgs { let pool = pool.clone(); - if is_asset_data_fetch_req(&download_metadata_info, pool.clone()).await { - handlers.push(spawn_task(client.clone(), pool, download_metadata_info)); + if skip_index(&download_metadata_info, pool.clone()).await { + continue; } + handlers.push(spawn_task(client.clone(), pool, download_metadata_info)); } while handlers.next().await.is_some() {} @@ -181,26 +179,11 @@ pub enum StatusCode { Code(reqwest::StatusCode), } -pub struct MetadataJsonData { - value: serde_json::Value, - time_elapsed: u64, - retries: u8, -} - -pub struct MetadataJsonFetchError { - error: FetchMetadataJsonError, - time_elapsed: u64, - retries: u8, -} - async fn fetch_metadata_json( client: Client, metadata_json_url: &str, -) -> Result { - let retries = AtomicU8::new(0); - let start = Instant::now(); - - let res = (|| async { +) -> Result { + (|| async { let url = ReqwestUrl::parse(metadata_json_url)?; let response = client.get(url.clone()).send().await?; @@ -229,25 +212,7 @@ async fn fetch_metadata_json( } }) .retry(&ExponentialBuilder::default()) - .notify(|_e, _d| { - retries.fetch_add(1, Ordering::Relaxed); - }) - .await; - - let time_elapsed = start.elapsed().as_secs(); - - let retries = retries.load(Ordering::Relaxed); - - res.map(|value| MetadataJsonData { - value, - time_elapsed, - retries, - }) - .map_err(|error| MetadataJsonFetchError { - error, - time_elapsed, - retries, - }) + .await } #[derive(thiserror::Error, Debug)] @@ -266,15 +231,17 @@ pub async fn perform_metadata_json_task( download_metadata_info: &DownloadMetadataInfo, ) -> Result { let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - match fetch_metadata_json(client, &download_metadata_info.uri).await { + let start = Instant::now(); + let fetch_res = fetch_metadata_json(client, &download_metadata_info.uri).await; + let time_elapsed = start.elapsed().as_millis() as u64; + match fetch_res { Ok(metadata) => { let active_model = asset_data::ActiveModel { id: Set(download_metadata_info.asset_data_id.clone()), - metadata: Set(metadata.value), + metadata: Set(metadata), reindex: Set(Some(false)), last_requested_status_code: Set(Some(MetadataJsonFetchResult::Success)), - fetch_duration_in_secs: Set(Some(metadata.time_elapsed)), - failed_fetch_attempts: Set(Some(metadata.retries)), + fetch_duration_in_ms: Set(Some(time_elapsed)), ..Default::default() }; @@ -287,14 +254,13 @@ pub async fn perform_metadata_json_task( id: Set(download_metadata_info.asset_data_id.clone()), reindex: Set(Some(true)), last_requested_status_code: Set(Some(MetadataJsonFetchResult::Failure)), - failed_fetch_attempts: Set(Some(e.retries)), - fetch_duration_in_secs: Set(Some(e.time_elapsed)), + fetch_duration_in_ms: Set(Some(time_elapsed)), ..Default::default() }; active_model.update(&conn).await?; - Err(MetadataJsonTaskError::Fetch(e.error)) + Err(MetadataJsonTaskError::Fetch(e)) } } } diff --git a/digital_asset_types/src/dao/generated/asset_data.rs b/digital_asset_types/src/dao/generated/asset_data.rs index 4ea6cd454..997f6ea7b 100644 --- a/digital_asset_types/src/dao/generated/asset_data.rs +++ b/digital_asset_types/src/dao/generated/asset_data.rs @@ -28,9 +28,8 @@ pub struct Model { pub raw_name: Option>, pub raw_symbol: Option>, pub base_info_seq: Option, - pub fetch_duration_in_secs: Option, + pub fetch_duration_in_ms: Option, pub last_requested_status_code: Option, - pub failed_fetch_attempts: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -46,9 +45,8 @@ pub enum Column { RawName, RawSymbol, BaseInfoSeq, - FetchDurationInSecs, + FetchDurationInMs, LastRequestedStatusCode, - FailedFetchAttempts, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -81,9 +79,8 @@ impl ColumnTrait for Column { Self::RawName => ColumnType::Binary.def().null(), Self::RawSymbol => ColumnType::Binary.def().null(), Self::BaseInfoSeq => ColumnType::BigInteger.def().null(), - Self::FetchDurationInSecs => ColumnType::Unsigned.def().null(), + Self::FetchDurationInMs => ColumnType::Unsigned.def().null(), Self::LastRequestedStatusCode => ColumnType::Unsigned.def().null(), - Self::FailedFetchAttempts => ColumnType::Unsigned.def().null(), } } } diff --git a/digital_asset_types/tests/common.rs b/digital_asset_types/tests/common.rs index 2bc85012f..417376fb0 100644 --- a/digital_asset_types/tests/common.rs +++ b/digital_asset_types/tests/common.rs @@ -86,9 +86,8 @@ pub fn create_asset_data( raw_name: Some(metadata.name.into_bytes().to_vec().clone()), raw_symbol: Some(metadata.symbol.into_bytes().to_vec().clone()), base_info_seq: Some(0), - fetch_duration_in_secs: None, + fetch_duration_in_ms: None, last_requested_status_code: None, - failed_fetch_attempts: None, }, ) } diff --git a/digital_asset_types/tests/json_parsing.rs b/digital_asset_types/tests/json_parsing.rs index 57c58c4d3..3660c41c3 100644 --- a/digital_asset_types/tests/json_parsing.rs +++ b/digital_asset_types/tests/json_parsing.rs @@ -37,9 +37,8 @@ pub async fn parse_onchain_json(json: serde_json::Value) -> Content { raw_name: Some(String::from("Handalf").into_bytes().to_vec()), raw_symbol: Some(String::from("").into_bytes().to_vec()), base_info_seq: Some(0), - fetch_duration_in_secs: None, + fetch_duration_in_ms: None, last_requested_status_code: None, - failed_fetch_attempts: None, }; v1_content_from_json(&asset_data).unwrap() From 1415cc945144e909860ed5b22677eaccb744454b Mon Sep 17 00:00:00 2001 From: Nagaprasadvr Date: Mon, 4 Nov 2024 14:07:15 +0530 Subject: [PATCH 5/7] update : add skip_metadata_json to top level and add migrations for db --- core/src/metadata_json.rs | 33 ++----- .../src/dao/generated/asset_data.rs | 6 +- .../src/dao/generated/sea_orm_active_enums.rs | 8 +- migration/src/lib.rs | 2 + ..._metadata_json_fetch_heuristics_columns.rs | 92 +++++++++++++++++++ migration/src/model/table.rs | 2 + program_transformers/src/bubblegum/mod.rs | 8 +- program_transformers/src/lib.rs | 25 +++++ .../src/mpl_core_program/mod.rs | 5 +- .../src/token_metadata/mod.rs | 4 + 10 files changed, 151 insertions(+), 34 deletions(-) create mode 100644 migration/src/m20241104_093312_add_metadata_json_fetch_heuristics_columns.rs diff --git a/core/src/metadata_json.rs b/core/src/metadata_json.rs index 1681b541f..52ba85b3f 100644 --- a/core/src/metadata_json.rs +++ b/core/src/metadata_json.rs @@ -1,15 +1,13 @@ use { backon::{ExponentialBuilder, Retryable}, clap::Parser, - digital_asset_types::dao::{asset_data, sea_orm_active_enums::MetadataJsonFetchResult}, + digital_asset_types::dao::{asset_data, sea_orm_active_enums::LastRequestedStatusCode}, futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}, indicatif::HumanDuration, log::{debug, error}, reqwest::{Client, Url as ReqwestUrl}, sea_orm::{entity::*, SqlxPostgresConnector}, serde::{Deserialize, Serialize}, - sqlx::{Pool, Postgres}, - std::sync::atomic::{AtomicU8, Ordering}, tokio::{ sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender}, task::JoinHandle, @@ -19,9 +17,9 @@ use { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DownloadMetadataInfo { - asset_data_id: Vec, - uri: String, - slot: i64, + pub asset_data_id: Vec, + pub uri: String, + pub slot: i64, } impl DownloadMetadataInfo { @@ -67,22 +65,6 @@ pub struct MetadataJsonDownloadWorkerArgs { metadata_json_download_worker_request_timeout: u64, } -async fn skip_index(download_metadata_info: &DownloadMetadataInfo, pool: Pool) -> bool { - let DownloadMetadataInfo { - asset_data_id, - slot: incoming_slot, - uri, - } = download_metadata_info; - - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - - asset_data::Entity::find_by_id(asset_data_id.clone()) - .one(&conn) - .await - .unwrap_or(None) - .is_some_and(|model| model.metadata_url.eq(uri) && model.slot_updated >= *incoming_slot) -} - impl MetadataJsonDownloadWorkerArgs { pub fn start( &self, @@ -109,9 +91,6 @@ impl MetadataJsonDownloadWorkerArgs { let pool = pool.clone(); - if skip_index(&download_metadata_info, pool.clone()).await { - continue; - } handlers.push(spawn_task(client.clone(), pool, download_metadata_info)); } @@ -240,7 +219,7 @@ pub async fn perform_metadata_json_task( id: Set(download_metadata_info.asset_data_id.clone()), metadata: Set(metadata), reindex: Set(Some(false)), - last_requested_status_code: Set(Some(MetadataJsonFetchResult::Success)), + last_requested_status_code: Set(Some(LastRequestedStatusCode::Success)), fetch_duration_in_ms: Set(Some(time_elapsed)), ..Default::default() }; @@ -253,7 +232,7 @@ pub async fn perform_metadata_json_task( let active_model = asset_data::ActiveModel { id: Set(download_metadata_info.asset_data_id.clone()), reindex: Set(Some(true)), - last_requested_status_code: Set(Some(MetadataJsonFetchResult::Failure)), + last_requested_status_code: Set(Some(LastRequestedStatusCode::Failure)), fetch_duration_in_ms: Set(Some(time_elapsed)), ..Default::default() }; diff --git a/digital_asset_types/src/dao/generated/asset_data.rs b/digital_asset_types/src/dao/generated/asset_data.rs index 997f6ea7b..a7b24a56c 100644 --- a/digital_asset_types/src/dao/generated/asset_data.rs +++ b/digital_asset_types/src/dao/generated/asset_data.rs @@ -1,7 +1,7 @@ //! SeaORM Entity. Generated by sea-orm-codegen 0.9.3 use super::sea_orm_active_enums::ChainMutability; -use super::sea_orm_active_enums::MetadataJsonFetchResult; +use super::sea_orm_active_enums::LastRequestedStatusCode; use super::sea_orm_active_enums::Mutability; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -29,7 +29,7 @@ pub struct Model { pub raw_symbol: Option>, pub base_info_seq: Option, pub fetch_duration_in_ms: Option, - pub last_requested_status_code: Option, + pub last_requested_status_code: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -80,7 +80,7 @@ impl ColumnTrait for Column { Self::RawSymbol => ColumnType::Binary.def().null(), Self::BaseInfoSeq => ColumnType::BigInteger.def().null(), Self::FetchDurationInMs => ColumnType::Unsigned.def().null(), - Self::LastRequestedStatusCode => ColumnType::Unsigned.def().null(), + Self::LastRequestedStatusCode => LastRequestedStatusCode::db_type().null(), } } } diff --git a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs index eae72a25d..045c5b1cb 100644 --- a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs +++ b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs @@ -165,8 +165,12 @@ pub enum Instruction { } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "fetch_result")] -pub enum MetadataJsonFetchResult { +#[sea_orm( + rs_type = "String", + db_type = "Enum", + enum_name = "last_requested_status_code" +)] +pub enum LastRequestedStatusCode { #[sea_orm(string_value = "success")] Success, #[sea_orm(string_value = "failure")] diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 7cee77f5e..29480a4c6 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -43,6 +43,7 @@ mod m20240319_120101_add_mpl_core_enum_vals; mod m20240320_120101_add_mpl_core_info_items; mod m20240520_120101_add_mpl_core_external_plugins_columns; mod m20240718_161232_change_supply_columns_to_numeric; +mod m20241104_093312_add_metadata_json_fetch_heuristics_columns; pub mod model; @@ -95,6 +96,7 @@ impl MigratorTrait for Migrator { Box::new(m20240320_120101_add_mpl_core_info_items::Migration), Box::new(m20240520_120101_add_mpl_core_external_plugins_columns::Migration), Box::new(m20240718_161232_change_supply_columns_to_numeric::Migration), + Box::new(m20241104_093312_add_metadata_json_fetch_heuristics_columns::Migration), ] } } diff --git a/migration/src/m20241104_093312_add_metadata_json_fetch_heuristics_columns.rs b/migration/src/m20241104_093312_add_metadata_json_fetch_heuristics_columns.rs new file mode 100644 index 000000000..2e8d4a405 --- /dev/null +++ b/migration/src/m20241104_093312_add_metadata_json_fetch_heuristics_columns.rs @@ -0,0 +1,92 @@ +use enum_iterator::{all, Sequence}; +use extension::postgres::Type; +use sea_orm_migration::prelude::*; + +use crate::model::table::AssetData; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(AssetData::Table) + .add_column( + ColumnDef::new(AssetData::FetchDurationInMs) + .integer() + .null(), + ) + .to_owned(), + ) + .await?; + + manager + .create_type( + Type::create() + .as_enum(AssetData::LastRequestedStatusCode) + .values(vec![ + LastRequestedStatusCode::Success, + LastRequestedStatusCode::Failure, + ]) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(AssetData::Table) + .add_column( + ColumnDef::new(AssetData::LastRequestedStatusCode) + .enumeration( + AssetData::LastRequestedStatusCode, + all::().collect::>(), + ) + .not_null(), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(AssetData::Table) + .drop_column(AssetData::FetchDurationInMs) + .to_owned(), + ) + .await?; + + manager + .alter_table( + Table::alter() + .table(AssetData::Table) + .drop_column(AssetData::LastRequestedStatusCode) + .to_owned(), + ) + .await?; + + manager + .drop_type( + Type::drop() + .name(AssetData::LastRequestedStatusCode) + .to_owned(), + ) + .await?; + + Ok(()) + } +} + +#[derive(Iden, Sequence)] +pub enum LastRequestedStatusCode { + Success, + Failure, +} diff --git a/migration/src/model/table.rs b/migration/src/model/table.rs index eec2313b5..a53612dcb 100644 --- a/migration/src/model/table.rs +++ b/migration/src/model/table.rs @@ -107,6 +107,8 @@ pub enum AssetData { RawName, RawSymbol, BaseInfoSeq, + FetchDurationInMs, + LastRequestedStatusCode, } #[derive(Copy, Clone, Iden)] diff --git a/program_transformers/src/bubblegum/mod.rs b/program_transformers/src/bubblegum/mod.rs index 03a800858..af4266cf4 100644 --- a/program_transformers/src/bubblegum/mod.rs +++ b/program_transformers/src/bubblegum/mod.rs @@ -1,7 +1,7 @@ use { crate::{ error::{ProgramTransformerError, ProgramTransformerResult}, - DownloadMetadataNotifier, + skip_metadata_json_download, DownloadMetadataNotifier, }, blockbuster::{ instruction::InstructionBundle, @@ -72,6 +72,9 @@ where } InstructionName::MintV1 | InstructionName::MintToCollectionV1 => { if let Some(info) = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await? { + if skip_metadata_json_download(&info, txn).await { + return Ok(()); + } download_metadata_notifier(info) .await .map_err(ProgramTransformerError::DownloadMetadataNotify)?; @@ -99,6 +102,9 @@ where if let Some(info) = update_metadata::update_metadata(parsing_result, bundle, txn, ix_str).await? { + if skip_metadata_json_download(&info, txn).await { + return Ok(()); + } download_metadata_notifier(info) .await .map_err(ProgramTransformerError::DownloadMetadataNotify)?; diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs index c34fc82cf..cd93915a0 100644 --- a/program_transformers/src/lib.rs +++ b/program_transformers/src/lib.rs @@ -16,6 +16,7 @@ use { }, }, das_core::{DownloadMetadataInfo, DownloadMetadataNotifier}, + digital_asset_types::dao::asset_data, sea_orm::{ entity::EntityTrait, query::Select, ConnectionTrait, DatabaseConnection, DbErr, SqlxPostgresConnector, TransactionTrait, @@ -254,3 +255,27 @@ fn record_metric(metric_name: &str, success: bool, retries: u32) { cadence_macros::statsd_count!(metric_name, 1, "success" => success, "retry_count" => retry_count); } } + +pub async fn skip_metadata_json_download( + download_metadata_info: &DownloadMetadataInfo, + conn: &T, +) -> bool +where + T: ConnectionTrait + TransactionTrait, +{ + let DownloadMetadataInfo { + asset_data_id, + slot: incoming_slot, + uri, + } = download_metadata_info; + + asset_data::Entity::find_by_id(asset_data_id.clone()) + .one(conn) + .await + .unwrap_or(None) + .is_some_and(|model| { + !model.reindex.unwrap_or(false) + && model.metadata_url.eq(uri) + && model.slot_updated >= *incoming_slot + }) +} diff --git a/program_transformers/src/mpl_core_program/mod.rs b/program_transformers/src/mpl_core_program/mod.rs index b01252d4b..d610cdd0a 100644 --- a/program_transformers/src/mpl_core_program/mod.rs +++ b/program_transformers/src/mpl_core_program/mod.rs @@ -2,7 +2,7 @@ use { crate::{ error::{ProgramTransformerError, ProgramTransformerResult}, mpl_core_program::v1_asset::{burn_v1_asset, save_v1_asset}, - AccountInfo, DownloadMetadataNotifier, + skip_metadata_json_download, AccountInfo, DownloadMetadataNotifier, }, blockbuster::programs::mpl_core_program::{MplCoreAccountData, MplCoreAccountState}, sea_orm::DatabaseConnection, @@ -30,6 +30,9 @@ pub async fn handle_mpl_core_account<'a, 'b, 'c>( ) .await? { + if skip_metadata_json_download(&info, db).await { + return Ok(()); + } download_metadata_notifier(info) .await .map_err(ProgramTransformerError::DownloadMetadataNotify)?; diff --git a/program_transformers/src/token_metadata/mod.rs b/program_transformers/src/token_metadata/mod.rs index cbeb94171..e7b7b3760 100644 --- a/program_transformers/src/token_metadata/mod.rs +++ b/program_transformers/src/token_metadata/mod.rs @@ -1,6 +1,7 @@ use { crate::{ error::{ProgramTransformerError, ProgramTransformerResult}, + skip_metadata_json_download, token_metadata::{ master_edition::{save_v1_master_edition, save_v2_master_edition}, v1_asset::{burn_v1_asset, save_v1_asset}, @@ -33,6 +34,9 @@ pub async fn handle_token_metadata_account<'a, 'b>( } TokenMetadataAccountData::MetadataV1(m) => { if let Some(info) = save_v1_asset(db, m, account_info.slot).await? { + if skip_metadata_json_download(&info, db).await { + return Ok(()); + } download_metadata_notifier(info) .await .map_err(ProgramTransformerError::DownloadMetadataNotify)?; From 8b8cfa6855e6af7f1cff33aa1b17b0b12bd1d856 Mon Sep 17 00:00:00 2001 From: Nagaprasadvr Date: Mon, 4 Nov 2024 14:10:56 +0530 Subject: [PATCH 6/7] chore: cleanup --- core/src/metadata_json.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/metadata_json.rs b/core/src/metadata_json.rs index 52ba85b3f..ce5c8cac6 100644 --- a/core/src/metadata_json.rs +++ b/core/src/metadata_json.rs @@ -168,14 +168,10 @@ async fn fetch_metadata_json( let response = client.get(url.clone()).send().await?; match response.error_for_status() { - Ok(res) => { - let value = res - .json::() - .await - .map_err(|source| FetchMetadataJsonError::Parse { source, url })?; - - Ok(value) - } + Ok(res) => Ok(res + .json::() + .await + .map_err(|source| FetchMetadataJsonError::Parse { source, url })?), Err(source) => { let status = source .status() From fb17d1a3cfec8f3a1a4e33b8b69860449efe73ba Mon Sep 17 00:00:00 2001 From: Nagaprasadvr Date: Mon, 4 Nov 2024 20:50:34 +0530 Subject: [PATCH 7/7] refactor: revert db changes and reimplement skip_metdata_json_download logic --- core/src/metadata_json.rs | 30 +++--- .../src/dao/generated/asset_data.rs | 7 -- .../src/dao/generated/sea_orm_active_enums.rs | 13 --- digital_asset_types/tests/common.rs | 2 - digital_asset_types/tests/json_parsing.rs | 2 - migration/src/lib.rs | 2 - ..._metadata_json_fetch_heuristics_columns.rs | 92 ------------------- migration/src/model/table.rs | 2 - program_transformers/src/bubblegum/db.rs | 63 +++++++------ program_transformers/src/bubblegum/mod.rs | 8 +- program_transformers/src/lib.rs | 25 ----- .../src/mpl_core_program/mod.rs | 5 +- .../src/mpl_core_program/v1_asset.rs | 50 ++++++---- .../src/token_metadata/mod.rs | 4 - .../src/token_metadata/v1_asset.rs | 50 +++++++--- 15 files changed, 123 insertions(+), 232 deletions(-) delete mode 100644 migration/src/m20241104_093312_add_metadata_json_fetch_heuristics_columns.rs diff --git a/core/src/metadata_json.rs b/core/src/metadata_json.rs index ce5c8cac6..3eecbd462 100644 --- a/core/src/metadata_json.rs +++ b/core/src/metadata_json.rs @@ -1,12 +1,12 @@ use { backon::{ExponentialBuilder, Retryable}, clap::Parser, - digital_asset_types::dao::{asset_data, sea_orm_active_enums::LastRequestedStatusCode}, + digital_asset_types::dao::asset_data::{self}, futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}, indicatif::HumanDuration, log::{debug, error}, reqwest::{Client, Url as ReqwestUrl}, - sea_orm::{entity::*, SqlxPostgresConnector}, + sea_orm::{entity::*, ConnectionTrait, SqlxPostgresConnector, TransactionTrait}, serde::{Deserialize, Serialize}, tokio::{ sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender}, @@ -17,9 +17,9 @@ use { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct DownloadMetadataInfo { - pub asset_data_id: Vec, - pub uri: String, - pub slot: i64, + asset_data_id: Vec, + uri: String, + slot: i64, } impl DownloadMetadataInfo { @@ -206,17 +206,12 @@ pub async fn perform_metadata_json_task( download_metadata_info: &DownloadMetadataInfo, ) -> Result { let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - let start = Instant::now(); - let fetch_res = fetch_metadata_json(client, &download_metadata_info.uri).await; - let time_elapsed = start.elapsed().as_millis() as u64; - match fetch_res { + match fetch_metadata_json(client, &download_metadata_info.uri).await { Ok(metadata) => { let active_model = asset_data::ActiveModel { id: Set(download_metadata_info.asset_data_id.clone()), metadata: Set(metadata), reindex: Set(Some(false)), - last_requested_status_code: Set(Some(LastRequestedStatusCode::Success)), - fetch_duration_in_ms: Set(Some(time_elapsed)), ..Default::default() }; @@ -228,8 +223,6 @@ pub async fn perform_metadata_json_task( let active_model = asset_data::ActiveModel { id: Set(download_metadata_info.asset_data_id.clone()), reindex: Set(Some(true)), - last_requested_status_code: Set(Some(LastRequestedStatusCode::Failure)), - fetch_duration_in_ms: Set(Some(time_elapsed)), ..Default::default() }; @@ -263,3 +256,14 @@ impl DownloadMetadata { .map(|_| ()) } } + +pub async fn skip_metadata_json_download(asset_data_id: &[u8], uri: &str, conn: &T) -> bool +where + T: ConnectionTrait + TransactionTrait, +{ + asset_data::Entity::find_by_id(asset_data_id.to_vec()) + .one(conn) + .await + .unwrap_or(None) + .is_some_and(|model| model.metadata_url.eq(uri)) +} diff --git a/digital_asset_types/src/dao/generated/asset_data.rs b/digital_asset_types/src/dao/generated/asset_data.rs index a7b24a56c..c96c265e6 100644 --- a/digital_asset_types/src/dao/generated/asset_data.rs +++ b/digital_asset_types/src/dao/generated/asset_data.rs @@ -1,7 +1,6 @@ //! SeaORM Entity. Generated by sea-orm-codegen 0.9.3 use super::sea_orm_active_enums::ChainMutability; -use super::sea_orm_active_enums::LastRequestedStatusCode; use super::sea_orm_active_enums::Mutability; use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; @@ -28,8 +27,6 @@ pub struct Model { pub raw_name: Option>, pub raw_symbol: Option>, pub base_info_seq: Option, - pub fetch_duration_in_ms: Option, - pub last_requested_status_code: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -45,8 +42,6 @@ pub enum Column { RawName, RawSymbol, BaseInfoSeq, - FetchDurationInMs, - LastRequestedStatusCode, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -79,8 +74,6 @@ impl ColumnTrait for Column { Self::RawName => ColumnType::Binary.def().null(), Self::RawSymbol => ColumnType::Binary.def().null(), Self::BaseInfoSeq => ColumnType::BigInteger.def().null(), - Self::FetchDurationInMs => ColumnType::Unsigned.def().null(), - Self::LastRequestedStatusCode => LastRequestedStatusCode::db_type().null(), } } } diff --git a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs index 045c5b1cb..e4d0e012d 100644 --- a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs +++ b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs @@ -163,16 +163,3 @@ pub enum Instruction { #[sea_orm(string_value = "verify_creator")] VerifyCreator, } - -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm( - rs_type = "String", - db_type = "Enum", - enum_name = "last_requested_status_code" -)] -pub enum LastRequestedStatusCode { - #[sea_orm(string_value = "success")] - Success, - #[sea_orm(string_value = "failure")] - Failure, -} diff --git a/digital_asset_types/tests/common.rs b/digital_asset_types/tests/common.rs index 417376fb0..0dcad71ef 100644 --- a/digital_asset_types/tests/common.rs +++ b/digital_asset_types/tests/common.rs @@ -86,8 +86,6 @@ pub fn create_asset_data( raw_name: Some(metadata.name.into_bytes().to_vec().clone()), raw_symbol: Some(metadata.symbol.into_bytes().to_vec().clone()), base_info_seq: Some(0), - fetch_duration_in_ms: None, - last_requested_status_code: None, }, ) } diff --git a/digital_asset_types/tests/json_parsing.rs b/digital_asset_types/tests/json_parsing.rs index 3660c41c3..e630c91ff 100644 --- a/digital_asset_types/tests/json_parsing.rs +++ b/digital_asset_types/tests/json_parsing.rs @@ -37,8 +37,6 @@ pub async fn parse_onchain_json(json: serde_json::Value) -> Content { raw_name: Some(String::from("Handalf").into_bytes().to_vec()), raw_symbol: Some(String::from("").into_bytes().to_vec()), base_info_seq: Some(0), - fetch_duration_in_ms: None, - last_requested_status_code: None, }; v1_content_from_json(&asset_data).unwrap() diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 29480a4c6..7cee77f5e 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -43,7 +43,6 @@ mod m20240319_120101_add_mpl_core_enum_vals; mod m20240320_120101_add_mpl_core_info_items; mod m20240520_120101_add_mpl_core_external_plugins_columns; mod m20240718_161232_change_supply_columns_to_numeric; -mod m20241104_093312_add_metadata_json_fetch_heuristics_columns; pub mod model; @@ -96,7 +95,6 @@ impl MigratorTrait for Migrator { Box::new(m20240320_120101_add_mpl_core_info_items::Migration), Box::new(m20240520_120101_add_mpl_core_external_plugins_columns::Migration), Box::new(m20240718_161232_change_supply_columns_to_numeric::Migration), - Box::new(m20241104_093312_add_metadata_json_fetch_heuristics_columns::Migration), ] } } diff --git a/migration/src/m20241104_093312_add_metadata_json_fetch_heuristics_columns.rs b/migration/src/m20241104_093312_add_metadata_json_fetch_heuristics_columns.rs deleted file mode 100644 index 2e8d4a405..000000000 --- a/migration/src/m20241104_093312_add_metadata_json_fetch_heuristics_columns.rs +++ /dev/null @@ -1,92 +0,0 @@ -use enum_iterator::{all, Sequence}; -use extension::postgres::Type; -use sea_orm_migration::prelude::*; - -use crate::model::table::AssetData; - -#[derive(DeriveMigrationName)] -pub struct Migration; - -#[async_trait::async_trait] -impl MigrationTrait for Migration { - async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - manager - .alter_table( - Table::alter() - .table(AssetData::Table) - .add_column( - ColumnDef::new(AssetData::FetchDurationInMs) - .integer() - .null(), - ) - .to_owned(), - ) - .await?; - - manager - .create_type( - Type::create() - .as_enum(AssetData::LastRequestedStatusCode) - .values(vec![ - LastRequestedStatusCode::Success, - LastRequestedStatusCode::Failure, - ]) - .to_owned(), - ) - .await?; - - manager - .alter_table( - Table::alter() - .table(AssetData::Table) - .add_column( - ColumnDef::new(AssetData::LastRequestedStatusCode) - .enumeration( - AssetData::LastRequestedStatusCode, - all::().collect::>(), - ) - .not_null(), - ) - .to_owned(), - ) - .await?; - - Ok(()) - } - - async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - manager - .alter_table( - Table::alter() - .table(AssetData::Table) - .drop_column(AssetData::FetchDurationInMs) - .to_owned(), - ) - .await?; - - manager - .alter_table( - Table::alter() - .table(AssetData::Table) - .drop_column(AssetData::LastRequestedStatusCode) - .to_owned(), - ) - .await?; - - manager - .drop_type( - Type::drop() - .name(AssetData::LastRequestedStatusCode) - .to_owned(), - ) - .await?; - - Ok(()) - } -} - -#[derive(Iden, Sequence)] -pub enum LastRequestedStatusCode { - Success, - Failure, -} diff --git a/migration/src/model/table.rs b/migration/src/model/table.rs index a53612dcb..eec2313b5 100644 --- a/migration/src/model/table.rs +++ b/migration/src/model/table.rs @@ -107,8 +107,6 @@ pub enum AssetData { RawName, RawSymbol, BaseInfoSeq, - FetchDurationInMs, - LastRequestedStatusCode, } #[derive(Copy, Clone, Iden)] diff --git a/program_transformers/src/bubblegum/db.rs b/program_transformers/src/bubblegum/db.rs index 507aca3fb..ef5c6c3c2 100644 --- a/program_transformers/src/bubblegum/db.rs +++ b/program_transformers/src/bubblegum/db.rs @@ -1,6 +1,6 @@ use { crate::error::{ProgramTransformerError, ProgramTransformerResult}, - das_core::DownloadMetadataInfo, + das_core::{skip_metadata_json_download, DownloadMetadataInfo}, digital_asset_types::dao::{ asset, asset_authority, asset_creators, asset_data, asset_grouping, cl_audits_v2, cl_items, sea_orm_active_enums::{ @@ -378,36 +378,45 @@ pub async fn upsert_asset_data( where T: ConnectionTrait + TransactionTrait, { - let model = asset_data::ActiveModel { + let skip_metadata_json_download = skip_metadata_json_download(&id, &metadata_url, txn).await; + + let mut model = asset_data::ActiveModel { id: ActiveValue::Set(id.clone()), chain_data_mutability: ActiveValue::Set(chain_data_mutability), chain_data: ActiveValue::Set(chain_data), - metadata_url: ActiveValue::Set(metadata_url.clone()), metadata_mutability: ActiveValue::Set(metadata_mutability), - metadata: ActiveValue::Set(JsonValue::String("processing".to_string())), slot_updated: ActiveValue::Set(slot_updated), - reindex: ActiveValue::Set(Some(true)), raw_name: ActiveValue::Set(Some(raw_name)), raw_symbol: ActiveValue::Set(Some(raw_symbol)), base_info_seq: ActiveValue::Set(Some(seq)), ..Default::default() }; + let mut columns_to_update = vec![ + asset_data::Column::ChainDataMutability, + asset_data::Column::ChainData, + asset_data::Column::MetadataMutability, + asset_data::Column::SlotUpdated, + asset_data::Column::RawName, + asset_data::Column::RawSymbol, + asset_data::Column::BaseInfoSeq, + ]; + if !skip_metadata_json_download { + model.metadata_url = ActiveValue::Set(metadata_url.clone()); + model.metadata = ActiveValue::Set(JsonValue::String("processing".to_string())); + model.reindex = ActiveValue::Set(Some(true)); + + columns_to_update.extend_from_slice(&[ + asset_data::Column::MetadataUrl, + asset_data::Column::Metadata, + asset_data::Column::Reindex, + ]); + } + let mut query = asset_data::Entity::insert(model) .on_conflict( OnConflict::columns([asset_data::Column::Id]) - .update_columns([ - asset_data::Column::ChainDataMutability, - asset_data::Column::ChainData, - asset_data::Column::MetadataUrl, - asset_data::Column::MetadataMutability, - asset_data::Column::Metadata, - asset_data::Column::SlotUpdated, - asset_data::Column::Reindex, - asset_data::Column::RawName, - asset_data::Column::RawSymbol, - asset_data::Column::BaseInfoSeq, - ]) + .update_columns(columns_to_update) .to_owned(), ) .build(DbBackend::Postgres); @@ -421,20 +430,20 @@ where query.sql ); - let result = txn - .execute(query) + txn.execute(query) .await .map_err(|db_err| ProgramTransformerError::StorageWriteError(db_err.to_string()))?; - if result.rows_affected() > 0 { - Ok(Some(DownloadMetadataInfo::new( - id, - metadata_url, - slot_updated, - ))) - } else { - Ok(None) + // If the metadata JSON already exists, skip the download. + if skip_metadata_json_download { + return Ok(None); } + + Ok(Some(DownloadMetadataInfo::new( + id, + metadata_url, + slot_updated, + ))) } #[allow(clippy::too_many_arguments)] diff --git a/program_transformers/src/bubblegum/mod.rs b/program_transformers/src/bubblegum/mod.rs index af4266cf4..03a800858 100644 --- a/program_transformers/src/bubblegum/mod.rs +++ b/program_transformers/src/bubblegum/mod.rs @@ -1,7 +1,7 @@ use { crate::{ error::{ProgramTransformerError, ProgramTransformerResult}, - skip_metadata_json_download, DownloadMetadataNotifier, + DownloadMetadataNotifier, }, blockbuster::{ instruction::InstructionBundle, @@ -72,9 +72,6 @@ where } InstructionName::MintV1 | InstructionName::MintToCollectionV1 => { if let Some(info) = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str).await? { - if skip_metadata_json_download(&info, txn).await { - return Ok(()); - } download_metadata_notifier(info) .await .map_err(ProgramTransformerError::DownloadMetadataNotify)?; @@ -102,9 +99,6 @@ where if let Some(info) = update_metadata::update_metadata(parsing_result, bundle, txn, ix_str).await? { - if skip_metadata_json_download(&info, txn).await { - return Ok(()); - } download_metadata_notifier(info) .await .map_err(ProgramTransformerError::DownloadMetadataNotify)?; diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs index cd93915a0..c34fc82cf 100644 --- a/program_transformers/src/lib.rs +++ b/program_transformers/src/lib.rs @@ -16,7 +16,6 @@ use { }, }, das_core::{DownloadMetadataInfo, DownloadMetadataNotifier}, - digital_asset_types::dao::asset_data, sea_orm::{ entity::EntityTrait, query::Select, ConnectionTrait, DatabaseConnection, DbErr, SqlxPostgresConnector, TransactionTrait, @@ -255,27 +254,3 @@ fn record_metric(metric_name: &str, success: bool, retries: u32) { cadence_macros::statsd_count!(metric_name, 1, "success" => success, "retry_count" => retry_count); } } - -pub async fn skip_metadata_json_download( - download_metadata_info: &DownloadMetadataInfo, - conn: &T, -) -> bool -where - T: ConnectionTrait + TransactionTrait, -{ - let DownloadMetadataInfo { - asset_data_id, - slot: incoming_slot, - uri, - } = download_metadata_info; - - asset_data::Entity::find_by_id(asset_data_id.clone()) - .one(conn) - .await - .unwrap_or(None) - .is_some_and(|model| { - !model.reindex.unwrap_or(false) - && model.metadata_url.eq(uri) - && model.slot_updated >= *incoming_slot - }) -} diff --git a/program_transformers/src/mpl_core_program/mod.rs b/program_transformers/src/mpl_core_program/mod.rs index d610cdd0a..b01252d4b 100644 --- a/program_transformers/src/mpl_core_program/mod.rs +++ b/program_transformers/src/mpl_core_program/mod.rs @@ -2,7 +2,7 @@ use { crate::{ error::{ProgramTransformerError, ProgramTransformerResult}, mpl_core_program::v1_asset::{burn_v1_asset, save_v1_asset}, - skip_metadata_json_download, AccountInfo, DownloadMetadataNotifier, + AccountInfo, DownloadMetadataNotifier, }, blockbuster::programs::mpl_core_program::{MplCoreAccountData, MplCoreAccountState}, sea_orm::DatabaseConnection, @@ -30,9 +30,6 @@ pub async fn handle_mpl_core_account<'a, 'b, 'c>( ) .await? { - if skip_metadata_json_download(&info, db).await { - return Ok(()); - } download_metadata_notifier(info) .await .map_err(ProgramTransformerError::DownloadMetadataNotify)?; diff --git a/program_transformers/src/mpl_core_program/v1_asset.rs b/program_transformers/src/mpl_core_program/v1_asset.rs index 16f974c35..cc5230257 100644 --- a/program_transformers/src/mpl_core_program/v1_asset.rs +++ b/program_transformers/src/mpl_core_program/v1_asset.rs @@ -12,6 +12,7 @@ use { mpl_core::types::{Plugin, PluginAuthority, PluginType, UpdateAuthority}, programs::mpl_core_program::MplCoreAccountData, }, + das_core::skip_metadata_json_download, digital_asset_types::{ dao::{ asset, asset_authority, asset_creators, asset_data, asset_grouping, @@ -26,8 +27,7 @@ use { entity::{ActiveValue, ColumnTrait, EntityTrait}, prelude::*, query::{JsonValue, QueryFilter, QueryTrait}, - sea_query::query::OnConflict, - sea_query::Expr, + sea_query::{query::OnConflict, Expr}, ConnectionTrait, CursorTrait, DbBackend, Statement, TransactionTrait, }, serde_json::{value::Value, Map}, @@ -183,14 +183,13 @@ pub async fn save_v1_asset( _ => ChainMutability::Mutable, }; - let asset_data_model = asset_data::ActiveModel { + let skip_metadata_json_download = skip_metadata_json_download(&id_vec, &uri, &txn).await; + + let mut asset_data_model = asset_data::ActiveModel { chain_data_mutability: ActiveValue::Set(chain_mutability), chain_data: ActiveValue::Set(chain_data_json), - metadata_url: ActiveValue::Set(uri.clone()), - metadata: ActiveValue::Set(JsonValue::String("processing".to_string())), metadata_mutability: ActiveValue::Set(Mutability::Mutable), slot_updated: ActiveValue::Set(slot_i), - reindex: ActiveValue::Set(Some(true)), id: ActiveValue::Set(id_vec.clone()), raw_name: ActiveValue::Set(Some(name.to_vec())), raw_symbol: ActiveValue::Set(None), @@ -198,20 +197,32 @@ pub async fn save_v1_asset( ..Default::default() }; + let mut columns_to_update = vec![ + asset_data::Column::ChainDataMutability, + asset_data::Column::ChainData, + asset_data::Column::MetadataMutability, + asset_data::Column::SlotUpdated, + asset_data::Column::RawName, + asset_data::Column::RawSymbol, + asset_data::Column::BaseInfoSeq, + ]; + + if !skip_metadata_json_download { + asset_data_model.metadata_url = ActiveValue::Set(uri.clone()); + asset_data_model.metadata = ActiveValue::Set(JsonValue::String("processing".to_string())); + asset_data_model.reindex = ActiveValue::Set(Some(true)); + + columns_to_update.extend_from_slice(&[ + asset_data::Column::MetadataUrl, + asset_data::Column::Metadata, + asset_data::Column::Reindex, + ]); + } + let mut query = asset_data::Entity::insert(asset_data_model) .on_conflict( OnConflict::columns([asset_data::Column::Id]) - .update_columns([ - asset_data::Column::ChainDataMutability, - asset_data::Column::ChainData, - asset_data::Column::MetadataUrl, - asset_data::Column::MetadataMutability, - asset_data::Column::SlotUpdated, - asset_data::Column::Reindex, - asset_data::Column::RawName, - asset_data::Column::RawSymbol, - asset_data::Column::BaseInfoSeq, - ]) + .update_columns(columns_to_update) .to_owned(), ) .build(DbBackend::Postgres); @@ -471,6 +482,11 @@ pub async fn save_v1_asset( return Ok(None); } + // If the metadata JSON exists, skip downloading it. + if skip_metadata_json_download { + return Ok(None); + } + // Otherwise return with info for background downloading. Ok(Some(DownloadMetadataInfo::new(id_vec.clone(), uri, slot_i))) } diff --git a/program_transformers/src/token_metadata/mod.rs b/program_transformers/src/token_metadata/mod.rs index e7b7b3760..cbeb94171 100644 --- a/program_transformers/src/token_metadata/mod.rs +++ b/program_transformers/src/token_metadata/mod.rs @@ -1,7 +1,6 @@ use { crate::{ error::{ProgramTransformerError, ProgramTransformerResult}, - skip_metadata_json_download, token_metadata::{ master_edition::{save_v1_master_edition, save_v2_master_edition}, v1_asset::{burn_v1_asset, save_v1_asset}, @@ -34,9 +33,6 @@ pub async fn handle_token_metadata_account<'a, 'b>( } TokenMetadataAccountData::MetadataV1(m) => { if let Some(info) = save_v1_asset(db, m, account_info.slot).await? { - if skip_metadata_json_download(&info, db).await { - return Ok(()); - } download_metadata_notifier(info) .await .map_err(ProgramTransformerError::DownloadMetadataNotify)?; diff --git a/program_transformers/src/token_metadata/v1_asset.rs b/program_transformers/src/token_metadata/v1_asset.rs index 5857badde..b5480ee64 100644 --- a/program_transformers/src/token_metadata/v1_asset.rs +++ b/program_transformers/src/token_metadata/v1_asset.rs @@ -12,6 +12,7 @@ use { accounts::{MasterEdition, Metadata}, types::TokenStandard, }, + das_core::skip_metadata_json_download, digital_asset_types::{ dao::{ asset, asset_authority, asset_creators, asset_data, asset_grouping, @@ -221,20 +222,44 @@ pub async fn save_v1_asset( true => ChainMutability::Mutable, false => ChainMutability::Immutable, }; - let asset_data_model = asset_data::ActiveModel { + + let skip_metadata_json_download = + skip_metadata_json_download(&mint_pubkey_vec, &uri, conn).await; + + let mut asset_data_model = asset_data::ActiveModel { chain_data_mutability: ActiveValue::Set(chain_mutability), chain_data: ActiveValue::Set(chain_data_json), - metadata_url: ActiveValue::Set(uri.clone()), - metadata: ActiveValue::Set(JsonValue::String("processing".to_string())), metadata_mutability: ActiveValue::Set(Mutability::Mutable), slot_updated: ActiveValue::Set(slot_i), - reindex: ActiveValue::Set(Some(true)), id: ActiveValue::Set(mint_pubkey_vec.clone()), raw_name: ActiveValue::Set(Some(name.to_vec())), raw_symbol: ActiveValue::Set(Some(symbol.to_vec())), base_info_seq: ActiveValue::Set(Some(0)), ..Default::default() }; + + let mut columns_to_update = vec![ + asset_data::Column::ChainDataMutability, + asset_data::Column::ChainData, + asset_data::Column::MetadataMutability, + asset_data::Column::SlotUpdated, + asset_data::Column::RawName, + asset_data::Column::RawSymbol, + asset_data::Column::BaseInfoSeq, + ]; + + if !skip_metadata_json_download { + asset_data_model.reindex = ActiveValue::Set(Some(true)); + asset_data_model.metadata = ActiveValue::Set(JsonValue::String("processing".to_string())); + asset_data_model.metadata_url = ActiveValue::Set(uri.clone()); + + columns_to_update.extend_from_slice(&[ + asset_data::Column::Reindex, + asset_data::Column::Metadata, + asset_data::Column::MetadataUrl, + ]); + } + let txn = conn.begin().await?; let set_lock_timeout = "SET LOCAL lock_timeout = '5s';"; @@ -250,17 +275,7 @@ pub async fn save_v1_asset( let mut query = asset_data::Entity::insert(asset_data_model) .on_conflict( OnConflict::columns([asset_data::Column::Id]) - .update_columns([ - asset_data::Column::ChainDataMutability, - asset_data::Column::ChainData, - asset_data::Column::MetadataUrl, - asset_data::Column::MetadataMutability, - asset_data::Column::SlotUpdated, - asset_data::Column::Reindex, - asset_data::Column::RawName, - asset_data::Column::RawSymbol, - asset_data::Column::BaseInfoSeq, - ]) + .update_columns(columns_to_update) .to_owned(), ) .build(DbBackend::Postgres); @@ -421,6 +436,11 @@ pub async fn save_v1_asset( return Ok(None); } + // If the metadata JSON exists, skip downloading it. + if skip_metadata_json_download { + return Ok(None); + } + Ok(Some(DownloadMetadataInfo::new( mint_pubkey_vec, uri,