diff --git a/ecosystem/nft-metadata-crawler-parser/src/models/nft_metadata_crawler_uris_query.rs b/ecosystem/nft-metadata-crawler-parser/src/models/nft_metadata_crawler_uris_query.rs index 963065659448c..f076ba81fc9e5 100644 --- a/ecosystem/nft-metadata-crawler-parser/src/models/nft_metadata_crawler_uris_query.rs +++ b/ecosystem/nft-metadata-crawler-parser/src/models/nft_metadata_crawler_uris_query.rs @@ -32,8 +32,8 @@ pub struct NFTMetadataCrawlerURIsQuery { impl NFTMetadataCrawlerURIsQuery { pub fn get_by_asset_uri( - asset_uri: &str, conn: &mut PooledConnection>, + asset_uri: &str, ) -> Option { let mut op = || { parsed_asset_uris::table @@ -55,9 +55,9 @@ impl NFTMetadataCrawlerURIsQuery { } pub fn get_by_raw_image_uri( + conn: &mut PooledConnection>, asset_uri: &str, raw_image_uri: &str, - conn: &mut PooledConnection>, ) -> Option { let mut op = || { parsed_asset_uris::table @@ -81,9 +81,9 @@ impl NFTMetadataCrawlerURIsQuery { } pub fn get_by_raw_animation_uri( + conn: &mut PooledConnection>, asset_uri: &str, raw_animation_uri: &str, - conn: &mut PooledConnection>, ) -> Option { let mut op = || { parsed_asset_uris::table diff --git a/ecosystem/nft-metadata-crawler-parser/src/utils/database.rs b/ecosystem/nft-metadata-crawler-parser/src/utils/database.rs index 063876db2f925..9466748d2d91d 100644 --- a/ecosystem/nft-metadata-crawler-parser/src/utils/database.rs +++ b/ecosystem/nft-metadata-crawler-parser/src/utils/database.rs @@ -35,6 +35,7 @@ pub fn run_migrations(pool: &Pool>) { pub fn upsert_uris( conn: &mut PooledConnection>, entry: &NFTMetadataCrawlerURIs, + ltv: i64, ) -> anyhow::Result { use schema::nft_metadata_crawler::parsed_asset_uris::dsl::*; @@ -51,8 +52,9 @@ pub fn upsert_uris( image_optimizer_retry_count.eq(excluded(image_optimizer_retry_count)), json_parser_retry_count.eq(excluded(json_parser_retry_count)), animation_optimizer_retry_count.eq(excluded(animation_optimizer_retry_count)), + inserted_at.eq(excluded(inserted_at)), do_not_parse.eq(excluded(do_not_parse)), - last_transaction_version.eq(excluded(last_transaction_version)), + last_transaction_version.eq(ltv), )); let debug_query = diesel::debug_query::(&query).to_string(); diff --git a/ecosystem/nft-metadata-crawler-parser/src/worker.rs b/ecosystem/nft-metadata-crawler-parser/src/worker.rs index 316ebb990ce01..1c985910af4e9 100644 --- a/ecosystem/nft-metadata-crawler-parser/src/worker.rs +++ b/ecosystem/nft-metadata-crawler-parser/src/worker.rs @@ -56,8 +56,7 @@ impl Worker { last_transaction_timestamp: chrono::NaiveDateTime, force: bool, ) -> Self { - let mut model = NFTMetadataCrawlerURIs::new(asset_uri); - model.set_last_transaction_version(last_transaction_version as i64); + let model = NFTMetadataCrawlerURIs::new(asset_uri); let worker = Self { config, conn, @@ -79,30 +78,32 @@ impl Worker { // Deduplicate asset_uri // Exit if not force or if asset_uri has already been parsed let prev_model = - NFTMetadataCrawlerURIsQuery::get_by_asset_uri(&self.asset_uri, &mut self.conn); + NFTMetadataCrawlerURIsQuery::get_by_asset_uri(&mut self.conn, &self.asset_uri); if let Some(pm) = prev_model { DUPLICATE_ASSET_URI_COUNT.inc(); - if !self.force && pm.do_not_parse { + self.model = pm.into(); + if !self.force && self.model.get_do_not_parse() { self.log_info("asset_uri has been marked as do_not_parse, skipping parse"); SKIP_URI_COUNT.with_label_values(&["do_not_parse"]).inc(); + self.upsert(); return Ok(()); } - self.model = pm.into(); } // Check asset_uri against the URI blacklist if self.is_blacklisted_uri(&self.asset_uri.clone()) { + self.log_info("Found match in URI blacklist, marking as do_not_parse"); + self.model.set_do_not_parse(true); + self.upsert(); + SKIP_URI_COUNT.with_label_values(&["blacklist"]).inc(); return Ok(()); } - // Skip if asset_uri is not a valid URI + // Skip if asset_uri is not a valid URI, do not write invalid URI to Postgres if Url::parse(&self.asset_uri).is_err() { self.log_info("URI is invalid, skipping parse, marking as do_not_parse"); self.model.set_do_not_parse(true); SKIP_URI_COUNT.with_label_values(&["invalid"]).inc(); - if let Err(e) = upsert_uris(&mut self.conn, &self.model) { - self.log_error("Commit to Postgres failed", &e); - } return Ok(()); } @@ -161,32 +162,39 @@ impl Worker { // Commit model to Postgres self.log_info("Committing JSON to Postgres"); - if let Err(e) = upsert_uris(&mut self.conn, &self.model) { - self.log_error("Commit to Postgres failed", &e); - } + self.upsert(); } - // Deduplicate raw_image_uri - // Proceed with image optimization of force or if raw_image_uri has not been parsed - // Since we default to asset_uri, this check works if raw_image_uri is null because deduplication for asset_uri has already taken place - if (self.force || self.model.get_cdn_image_uri().is_none()) - && (self.model.get_cdn_image_uri().is_some() - || self.model.get_raw_image_uri().map_or(true, |uri_option| { - match NFTMetadataCrawlerURIsQuery::get_by_raw_image_uri( - &self.asset_uri, - &uri_option, - &mut self.conn, - ) { - Some(uris) => { - self.log_info("Duplicate raw_image_uri found"); - DUPLICATE_RAW_IMAGE_URI_COUNT.inc(); - self.model.set_cdn_image_uri(uris.cdn_image_uri); - false - }, - None => true, - } - })) - { + // Should I optimize image? + // if force: true + // else if cdn_image_uri already exists: false + // else: perform lookup + // if found: set cdn_image_uri, false + // else: true + let should_optimize_image = if self.force { + true + } else if self.model.get_cdn_image_uri().is_some() { + false + } else { + self.model.get_raw_image_uri().map_or(true, |uri| { + match NFTMetadataCrawlerURIsQuery::get_by_raw_image_uri( + &mut self.conn, + &self.asset_uri, + &uri, + ) { + Some(uris) => { + self.log_info("Duplicate raw_image_uri found"); + DUPLICATE_RAW_IMAGE_URI_COUNT.inc(); + self.model.set_cdn_image_uri(uris.cdn_image_uri); + self.upsert(); + false + }, + None => true, + } + }) + }; + + if should_optimize_image { // Parse raw_image_uri, use asset_uri if parsing fails self.log_info("Parsing raw_image_uri"); let raw_image_uri = self @@ -196,6 +204,10 @@ impl Worker { // Check raw_image_uri against the URI blacklist if self.is_blacklisted_uri(&raw_image_uri) { + self.log_info("Found match in URI blacklist, marking as do_not_parse"); + self.model.set_do_not_parse(true); + self.upsert(); + SKIP_URI_COUNT.with_label_values(&["blacklist"]).inc(); return Ok(()); } @@ -257,38 +269,41 @@ impl Worker { // Commit model to Postgres self.log_info("Committing image to Postgres"); - if let Err(e) = upsert_uris(&mut self.conn, &self.model) { - self.log_error("Commit to Postgres failed", &e); - } + self.upsert(); } - // Deduplicate raw_animation_uri - // Set raw_animation_uri_option to None if not force and raw_animation_uri already exists - let mut raw_animation_uri_option = self.model.get_raw_animation_uri(); - if self.model.get_cdn_animation_uri().is_some() - || !self.force - && raw_animation_uri_option.clone().map_or(true, |uri| { - match NFTMetadataCrawlerURIsQuery::get_by_raw_animation_uri( - &self.asset_uri, - &uri, - &mut self.conn, - ) { - Some(uris) => { - self.log_info("Duplicate raw_animation_uri found"); - DUPLICATE_RAW_ANIMATION_URI_COUNT.inc(); - self.model.set_cdn_animation_uri(uris.cdn_animation_uri); - true - }, - None => true, - } - }) - { - raw_animation_uri_option = None; - } + // Should I optimize animation? + // if force: true + // else if cdn_animation_uri already exists: false + // else: perform lookup + // if found: set cdn_animation_uri, false + // else: true + let raw_animation_uri_option = if self.force { + self.model.get_raw_animation_uri() + } else if self.model.get_cdn_animation_uri().is_some() { + None + } else { + self.model.get_raw_animation_uri().and_then(|uri| { + match NFTMetadataCrawlerURIsQuery::get_by_raw_animation_uri( + &mut self.conn, + &self.asset_uri, + &uri, + ) { + Some(uris) => { + self.log_info("Duplicate raw_image_uri found"); + DUPLICATE_RAW_ANIMATION_URI_COUNT.inc(); + self.model.set_cdn_animation_uri(uris.cdn_animation_uri); + self.upsert(); + None + }, + None => Some(uri), + } + }) + }; // If raw_animation_uri_option is None, skip if let Some(raw_animation_uri) = raw_animation_uri_option { - self.log_info("Starting animation optimization"); + self.log_info("Parsing raw_animation_uri"); let animation_uri = URIParser::parse( &self.config.ipfs_prefix, &raw_animation_uri, @@ -343,44 +358,39 @@ impl Worker { // Commit model to Postgres self.log_info("Committing animation to Postgres"); - if let Err(e) = upsert_uris(&mut self.conn, &self.model) { - self.log_error("Commit to Postgres failed", &e); - } + self.upsert(); } - self.model - .set_last_transaction_version(self.last_transaction_version as i64); if self.model.get_json_parser_retry_count() >= self.config.max_num_parse_retries || self.model.get_image_optimizer_retry_count() >= self.config.max_num_parse_retries || self.model.get_animation_optimizer_retry_count() >= self.config.max_num_parse_retries { self.log_info("Retry count exceeded, marking as do_not_parse"); self.model.set_do_not_parse(true); - if let Err(e) = upsert_uris(&mut self.conn, &self.model) { - self.log_error("Commit to Postgres failed", &e); - } + self.upsert(); } PARSER_SUCCESSES_COUNT.inc(); Ok(()) } + fn upsert(&mut self) { + upsert_uris( + &mut self.conn, + &self.model, + self.last_transaction_version as i64, + ) + .unwrap_or_else(|e| { + self.log_error("Commit to Postgres failed", &e); + panic!(); + }); + } + fn is_blacklisted_uri(&mut self, uri: &str) -> bool { - if self - .config + self.config .uri_blacklist .iter() .any(|blacklist_uri| uri.contains(blacklist_uri)) - { - self.log_info("Found match in URI blacklist, marking as do_not_parse"); - self.model.set_do_not_parse(true); - if let Err(e) = upsert_uris(&mut self.conn, &self.model) { - self.log_error("Commit to Postgres failed", &e); - } - SKIP_URI_COUNT.with_label_values(&["blacklist"]).inc(); - return true; - } - false } fn log_info(&self, message: &str) {