Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mtg-1065] Keep metadata inside of mutable storages up to date with occuring changes #339

Open
wants to merge 14 commits into
base: new-main
Choose a base branch
from
6 changes: 3 additions & 3 deletions entities/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ pub struct Creator {
}

#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct OffChainData {
pub struct OffChainDataGrpc {
pub url: String,
pub metadata: String,
}

#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct CompleteAssetDetails {
pub struct AssetCompleteDetailsGrpc {
// From AssetStaticDetails
pub pubkey: Pubkey,
pub specification_asset_class: SpecificationAssetClass,
Expand Down Expand Up @@ -172,7 +172,7 @@ pub struct CompleteAssetDetails {
pub master_edition: Option<MasterEdition>,

// OffChainData
pub offchain_data: Option<OffChainData>,
pub offchain_data: Option<OffChainDataGrpc>,

// SplMint
pub spl_mint: Option<SplMint>,
Expand Down
18 changes: 8 additions & 10 deletions grpc/src/mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use crate::gapfiller::{
MasterEdition, OffchainData, OwnerType, RawBlock, RoyaltyTargetType, SpecificationAssetClass,
SpecificationVersions, SplMint, TokenStandard, UpdateVersionValue, UseMethod, Uses,
};
use entities::models::{CompleteAssetDetails, OffChainData, UpdateVersion, Updated};
use entities::models::{AssetCompleteDetailsGrpc, OffChainDataGrpc, UpdateVersion, Updated};
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;

impl From<CompleteAssetDetails> for AssetDetails {
fn from(value: CompleteAssetDetails) -> Self {
impl From<AssetCompleteDetailsGrpc> for AssetDetails {
fn from(value: AssetCompleteDetailsGrpc) -> Self {
let delegate = value.delegate.value.map(|key| DynamicBytesField {
value: key.to_bytes().to_vec(),
slot_updated: value.delegate.slot_updated,
Expand Down Expand Up @@ -85,7 +85,7 @@ impl From<CompleteAssetDetails> for AssetDetails {
}
}

impl TryFrom<AssetDetails> for CompleteAssetDetails {
impl TryFrom<AssetDetails> for AssetCompleteDetailsGrpc {
type Error = GrpcError;

fn try_from(value: AssetDetails) -> Result<Self, Self::Error> {
Expand Down Expand Up @@ -136,9 +136,7 @@ impl TryFrom<AssetDetails> for CompleteAssetDetails {
.is_burnt
.map(Into::into)
.ok_or(GrpcError::MissingField("is_burnt".to_string()))?,
was_decompressed: value
.was_decompressed
.map(Into::into),
was_decompressed: value.was_decompressed.map(Into::into),
creators: value
.creators
.map(TryInto::try_into)
Expand Down Expand Up @@ -203,7 +201,7 @@ impl TryFrom<AssetDetails> for CompleteAssetDetails {
.collect::<Result<Vec<_>, _>>()?,
edition: value.edition.map(TryInto::try_into).transpose()?,
master_edition: value.master_edition.map(TryInto::try_into).transpose()?,
offchain_data: value.offchain_data.map(|e| OffChainData {
offchain_data: value.offchain_data.map(|e| OffChainDataGrpc {
url: e.url,
metadata: e.metadata,
}),
Expand Down Expand Up @@ -258,8 +256,8 @@ impl From<entities::models::SplMint> for SplMint {
}
}

impl From<entities::models::OffChainData> for OffchainData {
fn from(value: OffChainData) -> Self {
impl From<OffChainDataGrpc> for OffchainData {
fn from(value: OffChainDataGrpc) -> Self {
Self {
url: value.url,
metadata: value.metadata,
Expand Down
5 changes: 2 additions & 3 deletions integration_tests/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct TestSetup {
pub message_parser: MessageParser,
pub acc_processor: Arc<AccountsProcessor<Buffer>>,
pub tx_processor: BubblegumTxProcessor,
pub synchronizer: Synchronizer<Storage, PgClient, Arc<PgClient>>,
pub synchronizer: Synchronizer<Storage, PgClient>,
pub das_api: DasApi<
MaybeProofChecker,
JsonWorker,
Expand Down Expand Up @@ -109,6 +109,7 @@ impl TestSetup {
red_metrics.clone(),
MIN_PG_CONNECTIONS,
POSTGRE_MIGRATIONS_PATH,
Some(PathBuf::from_str("./dump").unwrap()),
)
.await
.unwrap(),
Expand Down Expand Up @@ -185,12 +186,10 @@ impl TestSetup {
let synchronizer = Synchronizer::new(
storage.clone(),
index_storage.clone(),
index_storage.clone(),
DUMP_SYNCHRONIZER_BATCH_SIZE,
"./dump".to_string(),
metrics_state.synchronizer_metrics.clone(),
SYNCHRONIZER_PARALLEL_TASKS,
false,
);

TestSetup {
Expand Down
1 change: 1 addition & 0 deletions integrity_verification/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ async fn main() {
&config.database_url.clone().unwrap(),
100,
500,
None,
metrics.red_metrics,
)
.await
Expand Down
4 changes: 2 additions & 2 deletions interface/src/asset_streaming_and_discovery.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use async_trait::async_trait;
use entities::models::{CompleteAssetDetails, RawBlock};
use entities::models::{AssetCompleteDetailsGrpc, RawBlock};
use futures::stream::Stream;
use mockall::automock;
use std::pin::Pin;

pub type AsyncError = Box<dyn std::error::Error + Send + Sync>;
type AssetResult = Result<CompleteAssetDetails, AsyncError>;
type AssetResult = Result<AssetCompleteDetailsGrpc, AsyncError>;
pub type AssetDetailsStream = Pin<Box<dyn Stream<Item = AssetResult> + Send + Sync>>;
pub type AssetDetailsStreamNonSync = Pin<Box<dyn Stream<Item = AssetResult> + Send>>;
type RawBlocksResult = Result<RawBlock, AsyncError>;
Expand Down
69 changes: 51 additions & 18 deletions nft_ingester/src/api/dapi/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::sync::Arc;

use entities::api_req_params::{AssetSortDirection, Options};
use entities::enums::SpecificationAssetClass;
use entities::models::{AssetSignatureWithPagination, OffChainData};
use entities::models::AssetSignatureWithPagination;
use interface::asset_sigratures::AssetSignaturesGetter;
use interface::json::{JsonDownloadResult, JsonDownloader, JsonPersister};
use rocks_db::columns::offchain_data::{OffChainData, StorageMutability};
use rocks_db::errors::StorageError;
use solana_sdk::pubkey::Pubkey;
use tracing::error;
Expand All @@ -17,12 +18,13 @@ use interface::price_fetcher::TokenPriceFetcher;
use interface::processing_possibility::ProcessingPossibilityChecker;
use itertools::Itertools;
use metrics_utils::ApiMetricsConfig;
use rocks_db::asset::{AssetLeaf, AssetSelectedMaps};
use rocks_db::columns::asset::{AssetLeaf, AssetSelectedMaps};
use rocks_db::Storage;
use tokio::sync::Mutex;
use tokio::task::{JoinError, JoinSet};

pub const COLLECTION_GROUP_KEY: &str = "collection";
pub const METADATA_CACHE_TTL: i64 = 86400; // 1 day

fn convert_rocks_asset_model(
asset_pubkey: &Pubkey,
Expand Down Expand Up @@ -211,10 +213,7 @@ pub async fn get_by_ids<
.push(index);
}

let unique_asset_ids: Vec<_> = unique_asset_ids_map.keys().cloned().collect();
// request prices and symbols only for fungibles when the option is set. This will prolong the request at least an order of magnitude
let asset_selected_maps_fut =
rocks_db.get_asset_selected_maps_async(unique_asset_ids.clone(), owner_address, &options);
let unique_asset_ids: Vec<Pubkey> = unique_asset_ids_map.keys().cloned().collect();
let asset_ids_string = asset_ids
.clone()
.into_iter()
Expand All @@ -228,7 +227,7 @@ pub async fn get_by_ids<
} else {
(Ok(HashMap::new()), Ok(HashMap::new()))
};
let mut asset_selected_maps = asset_selected_maps_fut.await?;

let token_prices = token_prices.unwrap_or_else(|e| {
error!("Fetch token prices: {}", e);
metrics.inc_token_info_fetch_errors("prices");
Expand All @@ -240,6 +239,10 @@ pub async fn get_by_ids<
HashMap::new()
});

// request prices and symbols only for fungibles when the option is set. This will prolong the request at least an order of magnitude
let mut asset_selected_maps = rocks_db
.get_asset_selected_maps_async(unique_asset_ids.clone(), owner_address, &options)
.await?;
if let Some(json_downloader) = json_downloader {
let mut urls_to_download = Vec::new();

Expand All @@ -248,17 +251,44 @@ pub async fn get_by_ids<
continue;
}
let offchain_data = asset_selected_maps.offchain_data.get(url);
if offchain_data.is_none() || offchain_data.unwrap().metadata.is_empty() {
let mut download_needed = false;
match offchain_data {
Some(offchain_data) => {
let curr_time = chrono::Utc::now().timestamp();
if offchain_data.storage_mutability.is_mutable()
&& curr_time > offchain_data.last_read_at + METADATA_CACHE_TTL
{
download_needed = true;
}

match &offchain_data.metadata {
Some(metadata) => {
if metadata.is_empty() {
download_needed = true;
}
}
None => {
download_needed = true;
}
}
}
None => {
download_needed = true;
}
}

if download_needed {
urls_to_download.push(url.clone());
}

if urls_to_download.len() >= max_json_to_download {
break;
}
}

let num_of_tasks = urls_to_download.len();

if num_of_tasks != 0 {
if num_of_tasks > 0 {
let download_results = stream::iter(urls_to_download)
.map(|url| {
let json_downloader = json_downloader.clone();
Expand All @@ -273,26 +303,30 @@ pub async fn get_by_ids<
.await;

for (json_url, res) in download_results.iter() {
let last_read_at = chrono::Utc::now().timestamp();
match res {
Ok(JsonDownloadResult::JsonContent(metadata)) => {
asset_selected_maps.offchain_data.insert(
json_url.clone(),
OffChainData {
url: json_url.clone(),
metadata: metadata.clone(),
url: Some(json_url.clone()),
metadata: Some(metadata.clone()),
storage_mutability: StorageMutability::from(json_url.as_str()),
last_read_at,
},
);
}
Ok(JsonDownloadResult::MediaUrlAndMimeType { url, mime_type }) => {
asset_selected_maps.offchain_data.insert(
json_url.clone(),
OffChainData {
url: json_url.clone(),
metadata: format!(
"{{\"image\":\"{}\",\"type\":\"{}\"}}",
url, mime_type
)
.to_string(),
url: Some(json_url.clone()),
metadata: Some(
format!("{{\"image\":\"{}\",\"type\":\"{}\"}}", url, mime_type)
.to_string(),
),
storage_mutability: StorageMutability::from(json_url.as_str()),
last_read_at,
},
);
}
Expand All @@ -302,7 +336,6 @@ pub async fn get_by_ids<

if let Some(json_persister) = json_persister {
if !download_results.is_empty() {
let download_results = download_results.clone();
tasks.lock().await.spawn(async move {
if let Err(e) = json_persister.persist_response(download_results).await {
error!("Could not persist downloaded JSONs: {:?}", e);
Expand Down
13 changes: 7 additions & 6 deletions nft_ingester/src/api/dapi/change_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ use std::{collections::HashMap, str::FromStr};

use interface::proofs::ProofChecker;
use metrics_utils::ApiMetricsConfig;
use rocks_db::cl_items::{ClItemKey, ClLeafKey};
use rocks_db::columns::cl_items::{ClItemKey, ClLeafKey};
use rocks_db::errors::StorageError;
use solana_sdk::pubkey::Pubkey;
use tracing::{debug, warn};

use crate::api::dapi::model;
use crate::api::dapi::rpc_asset_models::AssetProof;
use interface::processing_possibility::ProcessingPossibilityChecker;
use rocks_db::asset_streaming_client::get_required_nodes_for_proof;
use rocks_db::clients::asset_streaming_client::get_required_nodes_for_proof;
use rocks_db::Storage;
use spl_concurrent_merkle_tree::node::empty_node;

Expand Down Expand Up @@ -49,10 +49,11 @@ pub async fn get_proof_for_assets<
asset_ids.iter().map(|id| (id.to_string(), None)).collect();

// Instead of using a HashMap keyed by tree_id, we keep a Vec of (tree_id, pubkey, nonce).
let tree_pubkeys: Vec<(Pubkey, Pubkey, u64)> = fetch_asset_data!(rocks_db, asset_leaf_data, asset_ids)
.values()
.map(|asset| (asset.tree_id, asset.pubkey, asset.nonce.unwrap_or_default()))
.collect();
let tree_pubkeys: Vec<(Pubkey, Pubkey, u64)> =
fetch_asset_data!(rocks_db, asset_leaf_data, asset_ids)
.values()
.map(|asset| (asset.tree_id, asset.pubkey, asset.nonce.unwrap_or_default()))
.collect();

// Construct leaf keys for all requested assets
let leaf_keys: Vec<ClLeafKey> = tree_pubkeys
Expand Down
Loading