Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NFT Metadata Crawler] Fix not committing duplicate images and animations #12242

Merged
merged 8 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub struct NFTMetadataCrawlerURIsQuery {

impl NFTMetadataCrawlerURIsQuery {
pub fn get_by_asset_uri(
asset_uri: &str,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
asset_uri: &str,
) -> Option<Self> {
let mut op = || {
parsed_asset_uris::table
Expand All @@ -55,9 +55,9 @@ impl NFTMetadataCrawlerURIsQuery {
}

pub fn get_by_raw_image_uri(
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
asset_uri: &str,
raw_image_uri: &str,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
) -> Option<Self> {
let mut op = || {
parsed_asset_uris::table
Expand All @@ -81,9 +81,9 @@ impl NFTMetadataCrawlerURIsQuery {
}

pub fn get_by_raw_animation_uri(
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
asset_uri: &str,
raw_animation_uri: &str,
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
) -> Option<Self> {
let mut op = || {
parsed_asset_uris::table
Expand Down
4 changes: 3 additions & 1 deletion ecosystem/nft-metadata-crawler-parser/src/utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub fn run_migrations(pool: &Pool<ConnectionManager<PgConnection>>) {
pub fn upsert_uris(
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
entry: &NFTMetadataCrawlerURIs,
ltv: i64,
) -> anyhow::Result<usize> {
use schema::nft_metadata_crawler::parsed_asset_uris::dsl::*;

Expand All @@ -51,8 +52,9 @@ pub fn upsert_uris(
image_optimizer_retry_count.eq(excluded(image_optimizer_retry_count)),
json_parser_retry_count.eq(excluded(json_parser_retry_count)),
animation_optimizer_retry_count.eq(excluded(animation_optimizer_retry_count)),
inserted_at.eq(excluded(inserted_at)),
do_not_parse.eq(excluded(do_not_parse)),
last_transaction_version.eq(excluded(last_transaction_version)),
last_transaction_version.eq(ltv),
));

let debug_query = diesel::debug_query::<diesel::pg::Pg, _>(&query).to_string();
Expand Down
126 changes: 61 additions & 65 deletions ecosystem/nft-metadata-crawler-parser/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ impl Worker {
last_transaction_timestamp: chrono::NaiveDateTime,
force: bool,
) -> Self {
let mut model = NFTMetadataCrawlerURIs::new(asset_uri);
model.set_last_transaction_version(last_transaction_version as i64);
let model = NFTMetadataCrawlerURIs::new(asset_uri);
let worker = Self {
config,
conn,
Expand All @@ -79,30 +78,28 @@ impl Worker {
// Deduplicate asset_uri
// Exit if not force or if asset_uri has already been parsed
let prev_model =
NFTMetadataCrawlerURIsQuery::get_by_asset_uri(&self.asset_uri, &mut self.conn);
NFTMetadataCrawlerURIsQuery::get_by_asset_uri(&mut self.conn, &self.asset_uri);
if let Some(pm) = prev_model {
DUPLICATE_ASSET_URI_COUNT.inc();
if !self.force && pm.do_not_parse {
self.model = pm.into();
if !self.force && self.model.get_do_not_parse() {
self.log_info("asset_uri has been marked as do_not_parse, skipping parse");
SKIP_URI_COUNT.with_label_values(&["do_not_parse"]).inc();
self.upsert();
return Ok(());
}
self.model = pm.into();
}

// Check asset_uri against the URI blacklist
if self.is_blacklisted_uri(&self.asset_uri.clone()) {
return Ok(());
}

// Skip if asset_uri is not a valid URI
// Skip if asset_uri is not a valid URI, do not write invalid URI to Postgres
if Url::parse(&self.asset_uri).is_err() {
self.log_info("URI is invalid, skipping parse, marking as do_not_parse");
self.model.set_do_not_parse(true);
SKIP_URI_COUNT.with_label_values(&["invalid"]).inc();
if let Err(e) = upsert_uris(&mut self.conn, &self.model) {
self.log_error("Commit to Postgres failed", &e);
}
return Ok(());
}

Expand Down Expand Up @@ -161,32 +158,30 @@ impl Worker {

// Commit model to Postgres
self.log_info("Committing JSON to Postgres");
if let Err(e) = upsert_uris(&mut self.conn, &self.model) {
self.log_error("Commit to Postgres failed", &e);
}
self.upsert();
}

// Deduplicate raw_image_uri
// Proceed with image optimization of force or if raw_image_uri has not been parsed
// Proceed with image optimization if force or if raw_image_uri has not been parsed
// Since we default to asset_uri, this check works if raw_image_uri is null because deduplication for asset_uri has already taken place
if (self.force || self.model.get_cdn_image_uri().is_none())
&& (self.model.get_cdn_image_uri().is_some()
|| self.model.get_raw_image_uri().map_or(true, |uri_option| {
match NFTMetadataCrawlerURIsQuery::get_by_raw_image_uri(
&self.asset_uri,
&uri_option,
&mut self.conn,
) {
Some(uris) => {
self.log_info("Duplicate raw_image_uri found");
DUPLICATE_RAW_IMAGE_URI_COUNT.inc();
self.model.set_cdn_image_uri(uris.cdn_image_uri);
false
},
None => true,
}
}))
{
let dupe_image_found = self.model.get_raw_image_uri().map_or(false, |uri| {
match NFTMetadataCrawlerURIsQuery::get_by_raw_image_uri(
&mut self.conn,
&self.asset_uri,
&uri,
) {
Some(uris) => {
self.log_info("Duplicate raw_image_uri found");
DUPLICATE_RAW_IMAGE_URI_COUNT.inc();
self.model.set_cdn_image_uri(uris.cdn_image_uri);
self.upsert();
true
},
None => false,
}
});

if self.force || self.model.get_cdn_image_uri().is_none() && !dupe_image_found {
Copy link
Contributor

@bowenyang007 bowenyang007 Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// if self force blabla
let should_continue = if self.force {
  true
} else {
  if self.model.get_cdn_image_uri().is_some() {
    false
  } else {
    // dupe logic
  }
};

// Parse raw_image_uri, use asset_uri if parsing fails
self.log_info("Parsing raw_image_uri");
let raw_image_uri = self
Expand Down Expand Up @@ -257,38 +252,35 @@ impl Worker {

// Commit model to Postgres
self.log_info("Committing image to Postgres");
if let Err(e) = upsert_uris(&mut self.conn, &self.model) {
self.log_error("Commit to Postgres failed", &e);
}
self.upsert();
}

// Deduplicate raw_animation_uri
// Set raw_animation_uri_option to None if not force and raw_animation_uri already exists
// Proceed with animation optimization if force or if raw_animation_uri has not been parsed
let mut raw_animation_uri_option = self.model.get_raw_animation_uri();
if self.model.get_cdn_animation_uri().is_some()
|| !self.force
&& raw_animation_uri_option.clone().map_or(true, |uri| {
match NFTMetadataCrawlerURIsQuery::get_by_raw_animation_uri(
&self.asset_uri,
&uri,
&mut self.conn,
) {
Some(uris) => {
self.log_info("Duplicate raw_animation_uri found");
DUPLICATE_RAW_ANIMATION_URI_COUNT.inc();
self.model.set_cdn_animation_uri(uris.cdn_animation_uri);
true
},
None => true,
}
})
{
let dupe_animation_found = raw_animation_uri_option.clone().map_or(false, |uri| {
match NFTMetadataCrawlerURIsQuery::get_by_raw_animation_uri(
&mut self.conn,
&self.asset_uri,
&uri,
) {
Some(uris) => {
self.log_info("Duplicate raw_animation_uri found");
DUPLICATE_RAW_ANIMATION_URI_COUNT.inc();
self.model.set_cdn_animation_uri(uris.cdn_animation_uri);
self.upsert();
true
},
None => false,
}
});
if !(self.force || self.model.get_cdn_animation_uri().is_none() && !dupe_animation_found) {
raw_animation_uri_option = None;
}

// If raw_animation_uri_option is None, skip
if let Some(raw_animation_uri) = raw_animation_uri_option {
self.log_info("Starting animation optimization");
self.log_info("Parsing raw_animation_uri");
let animation_uri = URIParser::parse(
&self.config.ipfs_prefix,
&raw_animation_uri,
Expand Down Expand Up @@ -343,28 +335,34 @@ impl Worker {

// Commit model to Postgres
self.log_info("Committing animation to Postgres");
if let Err(e) = upsert_uris(&mut self.conn, &self.model) {
self.log_error("Commit to Postgres failed", &e);
}
self.upsert();
}

self.model
.set_last_transaction_version(self.last_transaction_version as i64);
if self.model.get_json_parser_retry_count() >= self.config.max_num_parse_retries
|| self.model.get_image_optimizer_retry_count() >= self.config.max_num_parse_retries
|| self.model.get_animation_optimizer_retry_count() >= self.config.max_num_parse_retries
{
self.log_info("Retry count exceeded, marking as do_not_parse");
self.model.set_do_not_parse(true);
if let Err(e) = upsert_uris(&mut self.conn, &self.model) {
self.log_error("Commit to Postgres failed", &e);
}
self.upsert();
}

PARSER_SUCCESSES_COUNT.inc();
Ok(())
}

fn upsert(&mut self) {
upsert_uris(
&mut self.conn,
&self.model,
self.last_transaction_version as i64,
)
.unwrap_or_else(|e| {
self.log_error("Commit to Postgres failed", &e);
panic!();
});
}

fn is_blacklisted_uri(&mut self, uri: &str) -> bool {
if self
.config
Expand All @@ -374,9 +372,7 @@ impl Worker {
{
self.log_info("Found match in URI blacklist, marking as do_not_parse");
self.model.set_do_not_parse(true);
if let Err(e) = upsert_uris(&mut self.conn, &self.model) {
self.log_error("Commit to Postgres failed", &e);
}
self.upsert();
SKIP_URI_COUNT.with_label_values(&["blacklist"]).inc();
return true;
}
Expand Down
Loading