diff --git a/grpc/src/mapper.rs b/grpc/src/mapper.rs index 6f6ce524..552c4ba5 100644 --- a/grpc/src/mapper.rs +++ b/grpc/src/mapper.rs @@ -6,7 +6,7 @@ use crate::gapfiller::{ MasterEdition, OffchainData, OwnerType, RawBlock, RoyaltyTargetType, SpecificationAssetClass, SpecificationVersions, SplMint, TokenStandard, UpdateVersionValue, UseMethod, Uses, }; -use entities::models::{AssetCompleteDetailsGrpc, UpdateVersion, Updated}; +use entities::models::{AssetCompleteDetailsGrpc, OffChainDataGrpc, UpdateVersion, Updated}; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; @@ -201,12 +201,10 @@ impl TryFrom for AssetCompleteDetailsGrpc { .collect::, _>>()?, edition: value.edition.map(TryInto::try_into).transpose()?, master_edition: value.master_edition.map(TryInto::try_into).transpose()?, - offchain_data: value - .offchain_data - .map(|e| entities::models::OffChainDataGrpc { - url: e.url, - metadata: e.metadata, - }), + offchain_data: value.offchain_data.map(|e| OffChainDataGrpc { + url: e.url, + metadata: e.metadata, + }), spl_mint: value.spl_mint.map(TryInto::try_into).transpose()?, }) } @@ -258,8 +256,8 @@ impl From for SplMint { } } -impl From for OffchainData { - fn from(value: entities::models::OffChainDataGrpc) -> Self { +impl From for OffchainData { + fn from(value: OffChainDataGrpc) -> Self { Self { url: value.url, metadata: value.metadata, diff --git a/nft_ingester/src/api/dapi/asset.rs b/nft_ingester/src/api/dapi/asset.rs index 019e454b..2a4c3a08 100644 --- a/nft_ingester/src/api/dapi/asset.rs +++ b/nft_ingester/src/api/dapi/asset.rs @@ -219,7 +219,6 @@ pub async fn get_by_ids< .into_iter() .map(|id| id.to_string()) .collect_vec(); - // request prices and symbols only for fungibles when the option is set. This will prolong the request at least an order of magnitude let (token_prices, token_symbols) = if options.show_fungible { let token_prices_fut = token_price_fetcher.fetch_token_prices(asset_ids_string.as_slice()); let token_symbols_fut = @@ -240,6 +239,7 @@ pub async fn get_by_ids< HashMap::new() }); + // request prices and symbols only for fungibles when the option is set. This will prolong the request at least an order of magnitude let mut asset_selected_maps = rocks_db .get_asset_selected_maps_async(unique_asset_ids.clone(), owner_address, &options) .await?; @@ -254,6 +254,13 @@ pub async fn get_by_ids< let mut download_needed = false; match offchain_data { Some(offchain_data) => { + let curr_time = chrono::Utc::now().timestamp(); + if offchain_data.storage_mutability.is_mutable() + && curr_time > offchain_data.last_read_at + METADATA_CACHE_TTL + { + download_needed = true; + } + match &offchain_data.metadata { Some(metadata) => { if metadata.is_empty() { @@ -264,24 +271,6 @@ pub async fn get_by_ids< download_needed = true; } } - - match &offchain_data.url { - Some(url) => { - if url.is_empty() { - download_needed = true; - } - } - None => { - download_needed = true; - } - } - - let curr_time = chrono::Utc::now().timestamp(); - if offchain_data.storage_mutability.is_mutable() - && curr_time > offchain_data.last_read_at + METADATA_CACHE_TTL - { - download_needed = true; - } } None => { download_needed = true; @@ -317,20 +306,17 @@ pub async fn get_by_ids< let last_read_at = chrono::Utc::now().timestamp(); match res { Ok(JsonDownloadResult::JsonContent(metadata)) => { - let storage_mutability = StorageMutability::from(json_url.as_str()); - asset_selected_maps.offchain_data.insert( json_url.clone(), OffChainData { url: Some(json_url.clone()), metadata: Some(metadata.clone()), - storage_mutability, + storage_mutability: StorageMutability::from(json_url.as_str()), last_read_at, }, ); } Ok(JsonDownloadResult::MediaUrlAndMimeType { url, mime_type }) => { - let storage_mutability = StorageMutability::from(json_url.as_str()); asset_selected_maps.offchain_data.insert( json_url.clone(), OffChainData { @@ -339,7 +325,7 @@ pub async fn get_by_ids< format!("{{\"image\":\"{}\",\"type\":\"{}\"}}", url, mime_type) .to_string(), ), - storage_mutability, + storage_mutability: StorageMutability::from(json_url.as_str()), last_read_at, }, ); diff --git a/nft_ingester/src/api/dapi/rpc_asset_convertors.rs b/nft_ingester/src/api/dapi/rpc_asset_convertors.rs index e55b5c13..7b6f8211 100644 --- a/nft_ingester/src/api/dapi/rpc_asset_convertors.rs +++ b/nft_ingester/src/api/dapi/rpc_asset_convertors.rs @@ -91,8 +91,8 @@ pub fn get_content( offchain_data: &OffChainData, ) -> Result { let json_uri = asset_dynamic.url.value.clone(); - let metadata = offchain_data.metadata.clone().unwrap_or_default(); - let metadata: Value = serde_json::from_str(&metadata).unwrap_or(Value::Null); + let metadata = serde_json::from_str(&offchain_data.metadata.clone().unwrap_or_default()) + .unwrap_or(Value::Null); let chain_data: Value = serde_json::from_str( asset_dynamic .onchain_data @@ -235,8 +235,8 @@ fn extract_collection_metadata( asset_dynamic: &AssetDynamicDetails, offchain_data: &OffChainData, ) -> MetadataMap { - let metadata = offchain_data.metadata.clone().unwrap_or_default(); - let metadata: Value = serde_json::from_str(&metadata).unwrap_or(Value::Null); + let metadata = serde_json::from_str(&offchain_data.metadata.clone().unwrap_or_default()) + .unwrap_or(Value::Null); let chain_data: Value = serde_json::from_str( asset_dynamic .onchain_data diff --git a/nft_ingester/src/json_worker.rs b/nft_ingester/src/json_worker.rs index 47199b15..5e1b555b 100644 --- a/nft_ingester/src/json_worker.rs +++ b/nft_ingester/src/json_worker.rs @@ -275,7 +275,6 @@ impl JsonDownloader for JsonWorker { JsonDownloaderError::ErrorDownloading(format!("Failed to create client: {:?}", e)) })?; - // TODO: maybe IPFS/Arweave stuff might be done here // Detect if the URL is an IPFS link let parsed_url = if url.starts_with("ipfs://") { // Extract the IPFS hash or path @@ -369,7 +368,6 @@ impl JsonPersister for JsonWorker { results: Vec<(String, Result)>, ) -> Result<(), JsonDownloaderError> { let mut pg_updates = Vec::new(); - // TODO: store updates here let mut rocks_updates = HashMap::new(); let curr_time = chrono::Utc::now().timestamp(); @@ -482,7 +480,9 @@ impl JsonPersister for JsonWorker { if !rocks_updates.is_empty() { let urls_to_download = rocks_updates .values() - .filter(|data| data.metadata.is_some()) + .filter(|data| { + data.metadata.is_some() && !data.metadata.clone().unwrap().is_empty() + }) .filter_map(|data| parse_files(data.metadata.clone().unwrap().as_str())) .flat_map(|files| files.into_iter()) .filter_map(|file| file.uri) diff --git a/nft_ingester/src/processors/transaction_based/bubblegum_updates_processor.rs b/nft_ingester/src/processors/transaction_based/bubblegum_updates_processor.rs index 9840b5f5..4445d888 100644 --- a/nft_ingester/src/processors/transaction_based/bubblegum_updates_processor.rs +++ b/nft_ingester/src/processors/transaction_based/bubblegum_updates_processor.rs @@ -1143,15 +1143,13 @@ impl BubblegumTxProcessor { if let Some(dynamic_info) = &update.update { if let Some(data) = &dynamic_info.dynamic_data { let url = data.url.value.clone(); - let storage_mutability = url.as_str().into(); - let last_read_at = Utc::now().timestamp(); if let Some(metadata) = batch_mint.raw_metadata_map.get(&url) { update.offchain_data_update = Some(OffChainData { - url: Some(url), + url: Some(url.clone()), metadata: Some(metadata.to_string()), - storage_mutability, - last_read_at, + storage_mutability: url.as_str().into(), + last_read_at: Utc::now().timestamp(), }); } } diff --git a/rocks-db/src/columns/offchain_data.rs b/rocks-db/src/columns/offchain_data.rs index 4b040b8e..7546a127 100644 --- a/rocks-db/src/columns/offchain_data.rs +++ b/rocks-db/src/columns/offchain_data.rs @@ -24,7 +24,10 @@ impl StorageMutability { impl From<&str> for StorageMutability { fn from(storage_mutability: &str) -> Self { - if storage_mutability.starts_with("ipfs") || storage_mutability.starts_with("arweave") { + if storage_mutability.is_empty() + || storage_mutability.starts_with("ipfs") + || storage_mutability.starts_with("arweave") + { return StorageMutability::Immutable; } else { return StorageMutability::Mutable; diff --git a/rocks-db/src/lib.rs b/rocks-db/src/lib.rs index 52b01c72..c8a8e1ef 100644 --- a/rocks-db/src/lib.rs +++ b/rocks-db/src/lib.rs @@ -810,8 +810,13 @@ impl Storage { } } +#[allow(unused_variables)] pub trait ToFlatbuffersConverter<'a> { type Target: 'a; - fn convert_to_fb(&self, builder: &mut FlatBufferBuilder<'a>) -> WIPOffset; - fn convert_to_fb_bytes(&self) -> Vec; + fn convert_to_fb(&self, builder: &mut FlatBufferBuilder<'a>) -> WIPOffset { + todo!() + } + fn convert_to_fb_bytes(&self) -> Vec { + todo!() + } } diff --git a/rocks-db/src/migrations/clean_update_authorities.rs b/rocks-db/src/migrations/clean_update_authorities.rs index 907ee490..c40f8e9a 100644 --- a/rocks-db/src/migrations/clean_update_authorities.rs +++ b/rocks-db/src/migrations/clean_update_authorities.rs @@ -26,22 +26,12 @@ impl From for AssetCollection { impl<'a> ToFlatbuffersConverter<'a> for AssetCollection { type Target = AssetCollection; - - fn convert_to_fb( - &self, - builder: &mut flatbuffers::FlatBufferBuilder<'a>, - ) -> flatbuffers::WIPOffset { - todo!() - } - - fn convert_to_fb_bytes(&self) -> Vec { - todo!() - } } pub(crate) struct CleanCollectionAuthoritiesMigration; impl RocksMigration for CleanCollectionAuthoritiesMigration { const VERSION: u64 = 2; + const DESERIALIZATION_TYPE: SerializationType = SerializationType::Bincode; const SERIALIZATION_TYPE: SerializationType = SerializationType::Bincode; type NewDataType = AssetCollection; type OldDataType = AssetCollectionBeforeCleanUp; diff --git a/rocks-db/src/migrations/collection_authority.rs b/rocks-db/src/migrations/collection_authority.rs index 591c71fd..d40538f5 100644 --- a/rocks-db/src/migrations/collection_authority.rs +++ b/rocks-db/src/migrations/collection_authority.rs @@ -99,6 +99,7 @@ impl AssetCollectionVersion0 { pub(crate) struct CollectionAuthorityMigration; impl RocksMigration for CollectionAuthorityMigration { const VERSION: u64 = 0; + const DESERIALIZATION_TYPE: SerializationType = SerializationType::Bincode; const SERIALIZATION_TYPE: SerializationType = SerializationType::Bincode; type NewDataType = AssetCollection; type OldDataType = AssetCollectionVersion0; diff --git a/rocks-db/src/migrations/offchain_data.rs b/rocks-db/src/migrations/offchain_data.rs index 80b47ebd..7be9c56c 100644 --- a/rocks-db/src/migrations/offchain_data.rs +++ b/rocks-db/src/migrations/offchain_data.rs @@ -16,6 +16,7 @@ impl From for OffChainData { pub(crate) struct OffChainDataMigration; impl RocksMigration for OffChainDataMigration { const VERSION: u64 = 4; + const DESERIALIZATION_TYPE: SerializationType = SerializationType::Flatbuffers; const SERIALIZATION_TYPE: SerializationType = SerializationType::Flatbuffers; type NewDataType = OffChainData; type OldDataType = OffChainDataDeprecated; diff --git a/rocks-db/src/migrations/spl2022.rs b/rocks-db/src/migrations/spl2022.rs index 1a161075..8be7c211 100644 --- a/rocks-db/src/migrations/spl2022.rs +++ b/rocks-db/src/migrations/spl2022.rs @@ -45,22 +45,12 @@ impl From for TokenAccount { impl<'a> ToFlatbuffersConverter<'a> for TokenAccount { type Target = TokenAccount; - - fn convert_to_fb( - &self, - builder: &mut flatbuffers::FlatBufferBuilder<'a>, - ) -> flatbuffers::WIPOffset { - todo!() - } - - fn convert_to_fb_bytes(&self) -> Vec { - todo!() - } } pub(crate) struct TokenAccounts2022ExtentionsMigration; impl RocksMigration for TokenAccounts2022ExtentionsMigration { const VERSION: u64 = 3; + const DESERIALIZATION_TYPE: SerializationType = SerializationType::Bincode; const SERIALIZATION_TYPE: SerializationType = SerializationType::Bincode; type NewDataType = TokenAccount; type OldDataType = TokenAccountWithoutExtentions; diff --git a/rocks-db/src/migrator.rs b/rocks-db/src/migrator.rs index 0de219ec..2c88ee60 100644 --- a/rocks-db/src/migrator.rs +++ b/rocks-db/src/migrator.rs @@ -33,6 +33,7 @@ pub enum SerializationType { pub trait RocksMigration { const VERSION: u64; + const DESERIALIZATION_TYPE: SerializationType; const SERIALIZATION_TYPE: SerializationType; type NewDataType: Sync + Serialize @@ -350,7 +351,7 @@ impl<'a> MigrationApplier<'a> { <::NewDataType as TypedColumn>::ValueType: 'static + Clone, <::NewDataType as TypedColumn>::KeyType: 'static + Hash + Eq, { - match M::SERIALIZATION_TYPE { + match M::DESERIALIZATION_TYPE { SerializationType::Bincode => deserialize::(value).map_err(|e| { error!("migration data deserialize: {:?}, {}", key_decoded, e); e.into() @@ -362,7 +363,9 @@ impl<'a> MigrationApplier<'a> { }) } SerializationType::Flatbuffers => { - unreachable!("Flatbuffers migration is not supported yet") + unreachable!( + "Deserialization from Flatbuffers in term of migration is not supported yet" + ) } } }