From 5313d710c096a2e2657fb693d854084192e72b99 Mon Sep 17 00:00:00 2001 From: Justin Chang Date: Wed, 9 Oct 2024 03:36:06 -0700 Subject: [PATCH 1/8] yay --- Cargo.lock | 1 + ecosystem/nft-metadata-crawler/Cargo.toml | 1 + .../src/asset_uploader/mod.rs | 1 + .../src/asset_uploader/throttler/config.rs | 33 +++ .../src/asset_uploader/throttler/mod.rs | 245 ++++++++++++++++++ .../src/asset_uploader/worker/mod.rs | 10 +- ecosystem/nft-metadata-crawler/src/config.rs | 10 + .../models/asset_uploader_request_statuses.rs | 37 ++- .../asset_uploader_request_statuses_query.rs | 15 +- 9 files changed, 338 insertions(+), 15 deletions(-) create mode 100644 ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/config.rs create mode 100644 ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs diff --git a/Cargo.lock b/Cargo.lock index e7cd870fd0a89..0240b49a6a329 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3103,6 +3103,7 @@ dependencies = [ "google-cloud-storage", "image", "once_cell", + "parking_lot 0.12.1", "regex", "reqwest 0.11.23", "serde", diff --git a/ecosystem/nft-metadata-crawler/Cargo.toml b/ecosystem/nft-metadata-crawler/Cargo.toml index 35f20f5f83c6f..2fb34094d2be9 100644 --- a/ecosystem/nft-metadata-crawler/Cargo.toml +++ b/ecosystem/nft-metadata-crawler/Cargo.toml @@ -37,6 +37,7 @@ futures = { workspace = true } google-cloud-storage = { workspace = true } image = { workspace = true } once_cell = { workspace = true } +parking_lot = { workspace = true } regex = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/mod.rs index e528f38d1ad41..182efb465f8b4 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/mod.rs @@ -2,4 +2,5 @@ // SPDX-License-Identifier: Apache-2.0 pub mod api; +pub mod throttler; pub mod worker; diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/config.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/config.rs new file mode 100644 index 0000000000000..3481bf4ebf32d --- /dev/null +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/config.rs @@ -0,0 +1,33 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct AssetUploaderThrottlerConfig { + /// URI for the Asset Uploader Worker + pub asset_uploader_worker_uri: String, + /// Interval in seconds to poll Postgres to update upload queue + #[serde(default = "AssetUploaderThrottlerConfig::default_poll_interval_seconds")] + pub poll_interval_seconds: u64, + /// Maximum number of rows to poll from Postgres + #[serde(default = "AssetUploaderThrottlerConfig::default_poll_rows_limit")] + pub poll_rows_limit: u64, + /// Cloudflare Account Hash provided at the images home page used for generating the CDN image URLs + pub cloudflare_account_hash: String, + /// Cloudflare Image Delivery URL prefix provided at the images home page used for generating the CDN image URLs + pub cloudflare_image_delivery_prefix: String, + /// In addition to on the fly transformations, Cloudflare images can be returned in preset variants. This is the default variant used with the saved CDN image URLs. + pub cloudflare_default_variant: String, +} + +impl AssetUploaderThrottlerConfig { + pub const fn default_poll_interval_seconds() -> u64 { + 10 + } + + pub const fn default_poll_rows_limit() -> u64 { + 600 + } +} diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs new file mode 100644 index 0000000000000..1e0e1933a84bd --- /dev/null +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs @@ -0,0 +1,245 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + asset_uploader::worker::UploadRequest, + config::Server, + models::{ + asset_uploader_request_statuses::AssetUploaderRequestStatuses, + asset_uploader_request_statuses_query::AssetUploaderRequestStatusesQuery, + }, + schema, +}; +use ahash::AHashSet; +use anyhow::Context; +use axum::{http::StatusCode as AxumStatusCode, response::IntoResponse, routing::post, Extension}; +use config::AssetUploaderThrottlerConfig; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, +}; +use parking_lot::Mutex; +use reqwest::{Client, StatusCode as ReqwestStatusCode}; +use serde::Deserialize; +use std::{ + collections::BTreeSet, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::sync::Notify; +use tracing::{debug, error}; +use url::Url; + +pub mod config; + +const FIVE_MINUTES: Duration = Duration::from_secs(60 * 5); + +// Structs below are for accessing relevant data in a typed way for Cloudflare API calls +#[derive(Debug, Deserialize)] +struct CloudflareImageUploadResponseResult { + id: String, +} + +#[derive(Debug, Deserialize)] +struct CloudflareImageUploadResponse { + errors: Vec, + result: CloudflareImageUploadResponseResult, +} + +#[derive(Clone)] +pub struct AssetUploaderThrottlerContext { + config: AssetUploaderThrottlerConfig, + pool: Pool>, + asset_queue: Arc>>, + in_progress_assets: Arc>>, + inserted_notify: Arc, + is_rate_limited: Arc, + rate_limit_over_notify: Arc, + client: Arc, +} + +impl AssetUploaderThrottlerContext { + pub fn new( + config: AssetUploaderThrottlerConfig, + pool: Pool>, + ) -> Self { + Self { + config, + pool, + asset_queue: Arc::new(Mutex::new(BTreeSet::new())), + in_progress_assets: Arc::new(Mutex::new(AHashSet::new())), + inserted_notify: Arc::new(Notify::new()), + is_rate_limited: Arc::new(AtomicBool::new(false)), + rate_limit_over_notify: Arc::new(Notify::new()), + client: Arc::new(Client::new()), + } + } + + async fn upload_asset( + &self, + asset: &AssetUploaderRequestStatusesQuery, + ) -> anyhow::Result { + use schema::nft_metadata_crawler::asset_uploader_request_statuses::dsl::*; + + // Make a request to the worker to upload the asset + let res = self + .client + .post(self.config.asset_uploader_worker_uri.clone()) + .json(&UploadRequest { + url: Url::parse(&asset.asset_uri)?, + }) + .send() + .await + .context("Error sending upload request to worker")?; + + let status = res.status(); + let body = res.text().await?; + let body = serde_json::from_str::(&body)?; + + // Update the request in Postgres with the response + let mut asset: AssetUploaderRequestStatuses = asset.into(); + asset.status_code = status.as_u16() as i64; + if status == ReqwestStatusCode::OK { + asset.cdn_image_uri = Some(format!( + "{}/{}/{}/{}", + self.config.cloudflare_image_delivery_prefix, + self.config.cloudflare_account_hash, + body.result.id, + self.config.cloudflare_default_variant, + )); + } else { + asset.num_failures += 1; + asset.error_message = Some(body.errors.join(", ")); + } + + let query = diesel::update( + asset_uploader_request_statuses.find((&asset.request_id, &asset.asset_uri)), + ) + .set(&asset); + + let debug_query = diesel::debug_query::(&query).to_string(); + debug!("Executing Query: {}", debug_query); + query.execute(&mut self.pool.get()?)?; + + Ok(status) + } + + async fn handle_upload_assets(&self) { + let self_arc = Arc::new(self.clone()); + loop { + // Wait until notified if rate limited + if self.is_rate_limited.load(Ordering::Relaxed) { + self.rate_limit_over_notify.notified().await; + self.is_rate_limited.store(false, Ordering::Relaxed); + } + + // Wait until notified if queue is empty + let is_empty = self.asset_queue.lock().is_empty(); + if is_empty { + self.inserted_notify.notified().await; + } + + // Pop the first asset from the queue and add it to the in-progress set + let mut queue = self.asset_queue.lock(); + let asset = queue.pop_first().unwrap(); // Safe to unwrap because we checked if the queue is empty + let mut in_progress = self.in_progress_assets.lock(); + in_progress.insert(asset.clone()); + drop(in_progress); + drop(queue); + + // Upload the asset in a separate task + // If successful, remove the asset from the in-progress set and continue to next asset + // If unsuccessful, add the asset back to the queue + let self_clone = self_arc.clone(); + tokio::spawn(async move { + if let Ok(res) = self_clone.upload_asset(&asset).await { + if res.is_success() { + let mut in_progress = self_clone.in_progress_assets.lock(); + in_progress.remove(&asset); + } else { + // If rate limited, sleep for 5 minutes then notify + if res == ReqwestStatusCode::TOO_MANY_REQUESTS { + self_clone.is_rate_limited.store(true, Ordering::Relaxed); + tokio::time::sleep(FIVE_MINUTES).await; + self_clone.rate_limit_over_notify.notify_one(); + } + + let mut queue = self_clone.asset_queue.lock(); + queue.insert(asset); + }; + } else { + error!(asset_uri = ?asset.asset_uri, "[Asset Uploader Throttler] Error uploading asset"); + let mut queue = self_clone.asset_queue.lock(); + queue.insert(asset); + } + }); + } + } + + async fn update_queue(&self) -> anyhow::Result<()> { + use schema::nft_metadata_crawler::asset_uploader_request_statuses::dsl::*; + + let query = asset_uploader_request_statuses + .filter(status_code.ne(ReqwestStatusCode::OK.as_u16() as i64)) + .order_by(inserted_at.asc()) + .limit(self.config.poll_rows_limit as i64); + + let debug_query = diesel::debug_query::(&query).to_string(); + debug!("Executing Query: {}", debug_query); + let rows = query.load(&mut self.pool.get()?)?; + + let mut queue = self.asset_queue.lock(); + let in_progress = self.in_progress_assets.lock(); + for row in rows { + if !queue.contains(&row) && !in_progress.contains(&row) { + queue.insert(row); + } + } + + Ok(()) + } + + async fn start_update_loop(&self) { + let poll_interval_seconds = Duration::from_secs(self.config.poll_interval_seconds); + loop { + if let Err(e) = self.update_queue().await { + error!(error = ?e, "[Asset Uploader Throttler] Error updating queue"); + } + self.inserted_notify.notify_one(); + tokio::time::sleep(poll_interval_seconds).await; + } + } + + async fn handle_update_queue(Extension(context): Extension>) -> impl IntoResponse { + match context.update_queue().await { + Ok(_) => AxumStatusCode::OK, + Err(e) => { + error!(error = ?e, "[Asset Uploader Throttler] Error updating queue"); + AxumStatusCode::INTERNAL_SERVER_ERROR + }, + } + } +} + +impl Server for AssetUploaderThrottlerContext { + fn build_router(&self) -> axum::Router { + let self_arc = Arc::new(self.clone()); + + let self_arc_clone = self_arc.clone(); + tokio::spawn(async move { + self_arc_clone.handle_upload_assets().await; + }); + + let self_arc_clone = self_arc.clone(); + tokio::spawn(async move { + self_arc_clone.start_update_loop().await; + }); + + axum::Router::new() + .route("/update_queue", post(Self::handle_update_queue)) + .layer(Extension(self_arc.clone())) + } +} diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs index 3e8bb27a089ab..1d991fb1f6b03 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs @@ -10,7 +10,7 @@ use axum::{ body::Body, http::StatusCode, response::IntoResponse, routing::post, Extension, Json, Router, }; use reqwest::{multipart::Form, Client}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::{sync::Arc, time::Duration}; use tracing::{error, info}; use url::Url; @@ -22,9 +22,9 @@ pub struct AssetUploaderWorkerContext { config: Arc, } -#[derive(Debug, Deserialize)] -struct UploadRequest { - url: Url, +#[derive(Debug, Deserialize, Serialize)] +pub struct UploadRequest { + pub url: Url, } impl AssetUploaderWorkerContext { @@ -99,7 +99,7 @@ impl Server for AssetUploaderWorkerContext { /// Converts a reqwest response to an axum response /// Only copies the response status, response body, and Content-Type header -async fn reqwest_response_to_axum_response( +pub async fn reqwest_response_to_axum_response( response: reqwest::Response, ) -> anyhow::Result { let status = response.status(); diff --git a/ecosystem/nft-metadata-crawler/src/config.rs b/ecosystem/nft-metadata-crawler/src/config.rs index 3de5d96d83568..55ff3f38b6734 100644 --- a/ecosystem/nft-metadata-crawler/src/config.rs +++ b/ecosystem/nft-metadata-crawler/src/config.rs @@ -4,6 +4,7 @@ use crate::{ asset_uploader::{ api::AssetUploaderApiContext, + throttler::{config::AssetUploaderThrottlerConfig, AssetUploaderThrottlerContext}, worker::{config::AssetUploaderWorkerConfig, AssetUploaderWorkerContext}, }, parser::{config::ParserConfig, ParserContext}, @@ -32,6 +33,7 @@ pub enum ServerConfig { Parser(ParserConfig), AssetUploaderWorker(AssetUploaderWorkerConfig), AssetUploaderApi, + AssetUploaderThrottler(AssetUploaderThrottlerConfig), } /// Structs to hold config from YAML @@ -49,6 +51,7 @@ pub enum ServerContext { Parser(ParserContext), AssetUploaderWorker(AssetUploaderWorkerContext), AssetUploaderApi(AssetUploaderApiContext), + AssetUploaderThrottler(AssetUploaderThrottlerContext), } impl ServerConfig { @@ -68,6 +71,12 @@ impl ServerConfig { ServerConfig::AssetUploaderApi => { ServerContext::AssetUploaderApi(AssetUploaderApiContext::new(pool)) }, + ServerConfig::AssetUploaderThrottler(asset_uploader_throttler_config) => { + ServerContext::AssetUploaderThrottler(AssetUploaderThrottlerContext::new( + asset_uploader_throttler_config.clone(), + pool, + )) + }, } } } @@ -99,6 +108,7 @@ impl RunnableConfig for NFTMetadataCrawlerConfig { ServerConfig::Parser(_) => "parser", ServerConfig::AssetUploaderWorker(_) => "asset_uploader_worker", ServerConfig::AssetUploaderApi => "asset_uploader_api", + ServerConfig::AssetUploaderThrottler(_) => "asset_uploader_throttler", } .to_string() } diff --git a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs index fd190ac41f25c..36dd1a5c4fb77 100644 --- a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs +++ b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs @@ -1,24 +1,29 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::schema::nft_metadata_crawler::asset_uploader_request_statuses; +use crate::{ + models::asset_uploader_request_statuses_query::AssetUploaderRequestStatusesQuery, + schema::nft_metadata_crawler::asset_uploader_request_statuses, +}; use axum::http::StatusCode; use diesel::prelude::*; use field_count::FieldCount; use serde::{Deserialize, Serialize}; use uuid::Uuid; -#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[derive( + Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize, AsChangeset, +)] #[diesel(primary_key(request_id, asset_uri))] #[diesel(table_name = asset_uploader_request_statuses)] pub struct AssetUploaderRequestStatuses { - request_id: Uuid, - asset_uri: String, - application_id: Uuid, - status_code: i64, - error_message: Option, - cdn_image_uri: Option, - num_failures: i64, + pub request_id: Uuid, + pub asset_uri: String, + pub application_id: Uuid, + pub status_code: i64, + pub error_message: Option, + pub cdn_image_uri: Option, + pub num_failures: i64, } impl AssetUploaderRequestStatuses { @@ -51,3 +56,17 @@ impl AssetUploaderRequestStatuses { } } } + +impl From<&AssetUploaderRequestStatusesQuery> for AssetUploaderRequestStatuses { + fn from(query: &AssetUploaderRequestStatusesQuery) -> Self { + Self { + request_id: query.request_id, + asset_uri: query.asset_uri.clone(), + application_id: query.application_id, + status_code: query.status_code, + error_message: query.error_message.clone(), + cdn_image_uri: query.cdn_image_uri.clone(), + num_failures: query.num_failures, + } + } +} diff --git a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs index 0b7ed72e9f3cb..bf1959bd5e141 100644 --- a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs +++ b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs @@ -7,7 +7,20 @@ use field_count::FieldCount; use serde::{Deserialize, Serialize}; use uuid::Uuid; -#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Queryable, Serialize)] +#[derive( + Clone, + Debug, + Deserialize, + FieldCount, + Identifiable, + Queryable, + Serialize, + Hash, + PartialEq, + Eq, + PartialOrd, + Ord, // TODO: Custom Ord implementation for fairness +)] #[diesel(primary_key(request_id, asset_uri))] #[diesel(table_name = asset_uploader_request_statuses)] pub struct AssetUploaderRequestStatusesQuery { From 39760287e4f0b5383c685ce756459af746f63258 Mon Sep 17 00:00:00 2001 From: Justin Chang Date: Wed, 9 Oct 2024 03:53:04 -0700 Subject: [PATCH 2/8] upsert --- .../src/asset_uploader/throttler/mod.rs | 16 ++++++++++++---- .../models/asset_uploader_request_statuses.rs | 4 +--- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs index 1e0e1933a84bd..2a8600766370b 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs @@ -16,6 +16,7 @@ use axum::{http::StatusCode as AxumStatusCode, response::IntoResponse, routing:: use config::AssetUploaderThrottlerConfig; use diesel::{ r2d2::{ConnectionManager, Pool}, + upsert::excluded, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, }; use parking_lot::Mutex; @@ -115,10 +116,17 @@ impl AssetUploaderThrottlerContext { asset.error_message = Some(body.errors.join(", ")); } - let query = diesel::update( - asset_uploader_request_statuses.find((&asset.request_id, &asset.asset_uri)), - ) - .set(&asset); + let query = diesel::insert_into(asset_uploader_request_statuses) + .values(asset) + .on_conflict((request_id, asset_uri)) + .do_update() + .set(( + status_code.eq(excluded(status_code)), + error_message.eq(excluded(error_message)), + cdn_image_uri.eq(excluded(cdn_image_uri)), + num_failures.eq(excluded(num_failures)), + inserted_at.eq(excluded(inserted_at)), + )); let debug_query = diesel::debug_query::(&query).to_string(); debug!("Executing Query: {}", debug_query); diff --git a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs index 36dd1a5c4fb77..23bcef6d5d35e 100644 --- a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs +++ b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs @@ -11,9 +11,7 @@ use field_count::FieldCount; use serde::{Deserialize, Serialize}; use uuid::Uuid; -#[derive( - Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize, AsChangeset, -)] +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] #[diesel(primary_key(request_id, asset_uri))] #[diesel(table_name = asset_uploader_request_statuses)] pub struct AssetUploaderRequestStatuses { From 8da706e469c1c684f90ec57e0a7f10ddc55a5839 Mon Sep 17 00:00:00 2001 From: Justin Chang Date: Wed, 9 Oct 2024 04:00:13 -0700 Subject: [PATCH 3/8] awesome --- ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs index 1d991fb1f6b03..4414bfe337e36 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs @@ -99,7 +99,7 @@ impl Server for AssetUploaderWorkerContext { /// Converts a reqwest response to an axum response /// Only copies the response status, response body, and Content-Type header -pub async fn reqwest_response_to_axum_response( +async fn reqwest_response_to_axum_response( response: reqwest::Response, ) -> anyhow::Result { let status = response.status(); From 6f9df9d1dce4256d3b0ab30c1cbc2368d2225bb4 Mon Sep 17 00:00:00 2001 From: Justin Chang Date: Fri, 18 Oct 2024 02:38:04 -0700 Subject: [PATCH 4/8] AHHHHHHHHHHHHHHHHHHHHHHH --- Cargo.lock | 1 + ecosystem/nft-metadata-crawler/Cargo.toml | 1 + .../up.sql | 2 +- .../src/asset_uploader/api/get_status.rs | 2 +- .../src/asset_uploader/api/mod.rs | 2 +- .../src/asset_uploader/throttler/mod.rs | 281 +++++++++++++----- .../src/asset_uploader/worker/mod.rs | 136 ++++++++- .../models/asset_uploader_request_statuses.rs | 30 +- .../asset_uploader_request_statuses_query.rs | 17 +- ecosystem/nft-metadata-crawler/src/schema.rs | 5 +- 10 files changed, 378 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0240b49a6a329..03f1223887714 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3104,6 +3104,7 @@ dependencies = [ "image", "once_cell", "parking_lot 0.12.1", + "rayon", "regex", "reqwest 0.11.23", "serde", diff --git a/ecosystem/nft-metadata-crawler/Cargo.toml b/ecosystem/nft-metadata-crawler/Cargo.toml index 2fb34094d2be9..79bc223999905 100644 --- a/ecosystem/nft-metadata-crawler/Cargo.toml +++ b/ecosystem/nft-metadata-crawler/Cargo.toml @@ -38,6 +38,7 @@ google-cloud-storage = { workspace = true } image = { workspace = true } once_cell = { workspace = true } parking_lot = { workspace = true } +rayon = { workspace = true } regex = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } diff --git a/ecosystem/nft-metadata-crawler/migrations/2024-10-02-062003_add_asset_uploader_requests_table/up.sql b/ecosystem/nft-metadata-crawler/migrations/2024-10-02-062003_add_asset_uploader_requests_table/up.sql index 384f15fc6c7c4..6c014e2bab40d 100644 --- a/ecosystem/nft-metadata-crawler/migrations/2024-10-02-062003_add_asset_uploader_requests_table/up.sql +++ b/ecosystem/nft-metadata-crawler/migrations/2024-10-02-062003_add_asset_uploader_requests_table/up.sql @@ -3,7 +3,7 @@ CREATE TABLE nft_metadata_crawler.asset_uploader_request_statuses ( asset_uri VARCHAR NOT NULL, application_id UUID NOT NULL, status_code BIGINT NOT NULL DEFAULT 202, - error_message VARCHAR, + error_messages TEXT[], cdn_image_uri VARCHAR, num_failures BIGINT NOT NULL DEFAULT 0, request_received_at TIMESTAMP NOT NULL DEFAULT NOW(), diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/api/get_status.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/api/get_status.rs index badb30c22d66c..608cf6d7a63d6 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/api/get_status.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/api/get_status.rs @@ -32,7 +32,7 @@ pub fn get_status( } else { status_response.insert(row.asset_uri, GetStatusResponseSuccess::Error { status_code: row.status_code as u16, - error_message: row.error_message, + error_message: row.error_messages, }); }; } diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/api/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/api/mod.rs index df6b16d5808b7..3b817778dd3c6 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/api/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/api/mod.rs @@ -50,7 +50,7 @@ pub enum GetStatusResponseSuccess { }, Error { status_code: u16, - error_message: Option, + error_message: Option>>, }, } diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs index 2a8600766370b..1d576e40cc8cc 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs @@ -2,15 +2,17 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - asset_uploader::worker::UploadRequest, + asset_uploader::worker::{GetExistingResponse, UploadRequest}, config::Server, models::{ asset_uploader_request_statuses::AssetUploaderRequestStatuses, asset_uploader_request_statuses_query::AssetUploaderRequestStatusesQuery, + parsed_asset_uris::ParsedAssetUris, parsed_asset_uris_query::ParsedAssetUrisQuery, }, - schema, + schema::{self}, + utils::database::upsert_uris, }; -use ahash::AHashSet; +use ahash::{AHashMap, AHashSet}; use anyhow::Context; use axum::{http::StatusCode as AxumStatusCode, response::IntoResponse, routing::post, Extension}; use config::AssetUploaderThrottlerConfig; @@ -19,19 +21,19 @@ use diesel::{ upsert::excluded, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, }; -use parking_lot::Mutex; use reqwest::{Client, StatusCode as ReqwestStatusCode}; use serde::Deserialize; use std::{ collections::BTreeSet, + fmt::Display, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, time::Duration, }; -use tokio::sync::Notify; -use tracing::{debug, error}; +use tokio::sync::{Mutex, Notify}; +use tracing::{debug, error, info}; use url::Url; pub mod config; @@ -44,18 +46,35 @@ struct CloudflareImageUploadResponseResult { id: String, } +#[derive(Debug, Deserialize)] +struct CloudflareImageUploadResponseError { + code: i64, + message: String, +} + +impl Display for CloudflareImageUploadResponseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}: {}", self.code, self.message) + } +} + #[derive(Debug, Deserialize)] struct CloudflareImageUploadResponse { - errors: Vec, - result: CloudflareImageUploadResponseResult, + errors: Vec, + result: Option, +} + +#[derive(Clone)] +pub struct UploadQueue { + asset_queue: BTreeSet, + in_progress_assets: AHashSet, } #[derive(Clone)] pub struct AssetUploaderThrottlerContext { config: AssetUploaderThrottlerConfig, pool: Pool>, - asset_queue: Arc>>, - in_progress_assets: Arc>>, + upload_queue: Arc>, inserted_notify: Arc, is_rate_limited: Arc, rate_limit_over_notify: Arc, @@ -70,8 +89,10 @@ impl AssetUploaderThrottlerContext { Self { config, pool, - asset_queue: Arc::new(Mutex::new(BTreeSet::new())), - in_progress_assets: Arc::new(Mutex::new(AHashSet::new())), + upload_queue: Arc::new(Mutex::new(UploadQueue { + asset_queue: BTreeSet::new(), + in_progress_assets: AHashSet::new(), + })), inserted_notify: Arc::new(Notify::new()), is_rate_limited: Arc::new(AtomicBool::new(false)), rate_limit_over_notify: Arc::new(Notify::new()), @@ -81,11 +102,10 @@ impl AssetUploaderThrottlerContext { async fn upload_asset( &self, - asset: &AssetUploaderRequestStatusesQuery, - ) -> anyhow::Result { - use schema::nft_metadata_crawler::asset_uploader_request_statuses::dsl::*; - + asset: AssetUploaderRequestStatuses, + ) -> anyhow::Result { // Make a request to the worker to upload the asset + info!(asset_uri = ?asset.asset_uri, "Requesting worker to upload asset"); let res = self .client .post(self.config.asset_uploader_worker_uri.clone()) @@ -101,38 +121,85 @@ impl AssetUploaderThrottlerContext { let body = serde_json::from_str::(&body)?; // Update the request in Postgres with the response - let mut asset: AssetUploaderRequestStatuses = asset.into(); + let mut asset = asset; asset.status_code = status.as_u16() as i64; if status == ReqwestStatusCode::OK { - asset.cdn_image_uri = Some(format!( + let cdn_image_uri = Some(format!( "{}/{}/{}/{}", self.config.cloudflare_image_delivery_prefix, self.config.cloudflare_account_hash, - body.result.id, + body.result.context("Result not found")?.id, self.config.cloudflare_default_variant, )); + + asset.cdn_image_uri = cdn_image_uri.clone(); + + // Update the asset URI in the parsed_asset_uris table + let mut parsed_asset_uri = ParsedAssetUris::new(&asset.asset_uri); + parsed_asset_uri.set_cdn_image_uri(cdn_image_uri); + upsert_uris(&mut self.pool.get()?, &parsed_asset_uri, 0)?; } else { asset.num_failures += 1; - asset.error_message = Some(body.errors.join(", ")); + asset.error_messages = Some( + body.errors + .iter() + .map(|err| Some(err.to_string())) + .collect::>(), + ); } - let query = diesel::insert_into(asset_uploader_request_statuses) - .values(asset) - .on_conflict((request_id, asset_uri)) - .do_update() - .set(( - status_code.eq(excluded(status_code)), - error_message.eq(excluded(error_message)), - cdn_image_uri.eq(excluded(cdn_image_uri)), - num_failures.eq(excluded(num_failures)), - inserted_at.eq(excluded(inserted_at)), + self.update_request_status(&asset)?; + Ok(asset) + } + + async fn get_from_cloudflare( + &self, + asset: AssetUploaderRequestStatuses, + ) -> anyhow::Result { + // Make a request to the worker to look up the asset + info!(asset_uri = ?asset.asset_uri, "Requesting worker to look up asset"); + let mut asset_uploader_worker_uri = Url::parse(&self.config.asset_uploader_worker_uri)?; + asset_uploader_worker_uri.set_path("get_existing"); + let res = self + .client + .get(asset_uploader_worker_uri) + .query(&AHashMap::from_iter(vec![( + "url".to_string(), + asset.asset_uri.clone(), + )])) + .send() + .await + .context("Error sending upload request to worker")?; + + let status = res.status(); + let body = res.text().await?; + let body = serde_json::from_str::(&body)?; + + // Update the request in Postgres with the response + let mut asset = asset; + asset.status_code = status.as_u16() as i64; + if status == ReqwestStatusCode::OK { + let cdn_image_uri = Some(format!( + "{}/{}/{}/{}", + self.config.cloudflare_image_delivery_prefix, + self.config.cloudflare_account_hash, + body.id, + self.config.cloudflare_default_variant, )); - let debug_query = diesel::debug_query::(&query).to_string(); - debug!("Executing Query: {}", debug_query); - query.execute(&mut self.pool.get()?)?; + asset.cdn_image_uri = cdn_image_uri.clone(); - Ok(status) + // Update the asset URI in the parsed_asset_uris table + let mut parsed_asset_uri = ParsedAssetUris::new(&asset.asset_uri); + parsed_asset_uri.set_cdn_image_uri(cdn_image_uri); + upsert_uris(&mut self.pool.get()?, &parsed_asset_uri, 0)?; + } else { + asset.num_failures += 1; + asset.error_messages = Some(vec![Some("Asset not found in Cloudflare".to_string())]); + } + + self.update_request_status(&asset)?; + Ok(asset) } async fn handle_upload_assets(&self) { @@ -145,49 +212,93 @@ impl AssetUploaderThrottlerContext { } // Wait until notified if queue is empty - let is_empty = self.asset_queue.lock().is_empty(); + let is_empty = self.upload_queue.lock().await.asset_queue.is_empty(); if is_empty { self.inserted_notify.notified().await; } // Pop the first asset from the queue and add it to the in-progress set - let mut queue = self.asset_queue.lock(); - let asset = queue.pop_first().unwrap(); // Safe to unwrap because we checked if the queue is empty - let mut in_progress = self.in_progress_assets.lock(); - in_progress.insert(asset.clone()); - drop(in_progress); - drop(queue); + let mut upload_queue = self.upload_queue.lock().await; + let asset = upload_queue.asset_queue.pop_first().unwrap(); // Safe to unwrap because we checked if the queue is empty + upload_queue.in_progress_assets.insert(asset.clone()); + drop(upload_queue); // Upload the asset in a separate task // If successful, remove the asset from the in-progress set and continue to next asset - // If unsuccessful, add the asset back to the queue + // If rate limited, sleep for 5 minutes then notify + // If unsuccessful due to conflict, attempt to look up the asset in Cloudflare + // If unsuccessful for other reason, add the asset back to the queue let self_clone = self_arc.clone(); tokio::spawn(async move { - if let Ok(res) = self_clone.upload_asset(&asset).await { - if res.is_success() { - let mut in_progress = self_clone.in_progress_assets.lock(); - in_progress.remove(&asset); - } else { - // If rate limited, sleep for 5 minutes then notify - if res == ReqwestStatusCode::TOO_MANY_REQUESTS { - self_clone.is_rate_limited.store(true, Ordering::Relaxed); - tokio::time::sleep(FIVE_MINUTES).await; - self_clone.rate_limit_over_notify.notify_one(); - } + // Handle upload depending on previous attempt status. + // If previous attempt resulted in a 409, the asset likely already exists, so we call a different endpoint on the worker to perform the lookup. + let upload_res = match ReqwestStatusCode::from_u16(asset.status_code as u16)? { + ReqwestStatusCode::CONFLICT => { + self_clone.get_from_cloudflare(asset.clone()).await + }, + _ => self_clone.upload_asset(asset.clone()).await, + }; + + let mut upload_queue = self_clone.upload_queue.lock().await; + match upload_res { + Ok(asset) => { + let mut asset = asset; + match ReqwestStatusCode::from_u16(asset.status_code as u16)? { + ReqwestStatusCode::OK => { + // If success, remove asset from in-progress set and end early + upload_queue.in_progress_assets.remove(&asset); + anyhow::Ok(()) + }, + ReqwestStatusCode::TOO_MANY_REQUESTS => { + // If rate limited, sleep for 5 minutes then notify + self_clone.is_rate_limited.store(true, Ordering::Relaxed); + tokio::time::sleep(FIVE_MINUTES).await; + self_clone.rate_limit_over_notify.notify_one(); + Ok(()) + }, + ReqwestStatusCode::CONFLICT => { + // If conflict, attempt to get cdn_image_uri from parsed_asset_uris table + if let Some(parsed_asset_uri) = + ParsedAssetUrisQuery::get_by_asset_uri( + &mut self_clone.pool.get()?, + &asset.asset_uri, + ) + { + // If cdn_image_uri found, update asset and request status + if let Some(cdn_image_uri) = parsed_asset_uri.cdn_image_uri { + asset.cdn_image_uri = Some(cdn_image_uri); + self_clone.update_request_status(&asset)?; + return Ok(()); + } + } - let mut queue = self_clone.asset_queue.lock(); - queue.insert(asset); - }; - } else { - error!(asset_uri = ?asset.asset_uri, "[Asset Uploader Throttler] Error uploading asset"); - let mut queue = self_clone.asset_queue.lock(); - queue.insert(asset); + // If cdn_image_uri still not found and num_failures < 3, add asset back to queue. + if asset.cdn_image_uri.is_none() && asset.num_failures < 3 { + self_clone.update_request_status(&asset)?; + upload_queue.asset_queue.insert(asset); + self_clone.inserted_notify.notify_one(); + return Ok(()); + } + + // Remove asset from in-progress set and end early. + // No point in retrying more than 3 times because the asset already exists and could not be found in Postgrs or Cloudflare. + upload_queue.in_progress_assets.remove(&asset); + Ok(()) + }, + _ => Ok(()), + } + }, + Err(e) => { + error!(error = ?e, asset_uri = asset.asset_uri, "[Asset Uploader Throttler] Error uploading asset"); + upload_queue.asset_queue.insert(asset); + Ok(()) + }, } }); } } - async fn update_queue(&self) -> anyhow::Result<()> { + async fn update_queue(&self) -> anyhow::Result { use schema::nft_metadata_crawler::asset_uploader_request_statuses::dsl::*; let query = asset_uploader_request_statuses @@ -197,26 +308,35 @@ impl AssetUploaderThrottlerContext { let debug_query = diesel::debug_query::(&query).to_string(); debug!("Executing Query: {}", debug_query); - let rows = query.load(&mut self.pool.get()?)?; + let rows: Vec = query.load(&mut self.pool.get()?)?; - let mut queue = self.asset_queue.lock(); - let in_progress = self.in_progress_assets.lock(); + let mut num_queued = 0; for row in rows { - if !queue.contains(&row) && !in_progress.contains(&row) { - queue.insert(row); + let row: AssetUploaderRequestStatuses = (&row).into(); + let upload_queue = &mut self.upload_queue.lock().await; + if !upload_queue.in_progress_assets.contains(&row) { + upload_queue.asset_queue.insert(row); + num_queued += 1; } } - Ok(()) + Ok(num_queued) } async fn start_update_loop(&self) { let poll_interval_seconds = Duration::from_secs(self.config.poll_interval_seconds); loop { - if let Err(e) = self.update_queue().await { - error!(error = ?e, "[Asset Uploader Throttler] Error updating queue"); + match self.update_queue().await { + Ok(num_queued) => { + if num_queued > 0 { + self.inserted_notify.notify_one(); + } + }, + Err(e) => { + error!(error = ?e, "[Asset Uploader Throttler] Error updating queue"); + }, } - self.inserted_notify.notify_one(); + tokio::time::sleep(poll_interval_seconds).await; } } @@ -230,6 +350,27 @@ impl AssetUploaderThrottlerContext { }, } } + + fn update_request_status(&self, asset: &AssetUploaderRequestStatuses) -> anyhow::Result<()> { + use schema::nft_metadata_crawler::asset_uploader_request_statuses::dsl::*; + + let query = diesel::insert_into(asset_uploader_request_statuses) + .values(asset) + .on_conflict((request_id, asset_uri)) + .do_update() + .set(( + status_code.eq(excluded(status_code)), + error_messages.eq(excluded(error_messages)), + cdn_image_uri.eq(excluded(cdn_image_uri)), + num_failures.eq(excluded(num_failures)), + inserted_at.eq(excluded(inserted_at)), + )); + + let debug_query = diesel::debug_query::(&query).to_string(); + debug!("Executing Query: {}", debug_query); + query.execute(&mut self.pool.get()?)?; + Ok(()) + } } impl Server for AssetUploaderThrottlerContext { diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs index 4414bfe337e36..f859f23e84473 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs @@ -5,10 +5,17 @@ use crate::{ asset_uploader::worker::config::AssetUploaderWorkerConfig, config::Server, utils::constants::MAX_ASSET_UPLOAD_RETRY_SECONDS, }; +use ahash::AHashMap; use anyhow::Context; use axum::{ - body::Body, http::StatusCode, response::IntoResponse, routing::post, Extension, Json, Router, + body::Body, + extract::Query, + http::StatusCode, + response::IntoResponse, + routing::{get, post}, + Extension, Json, Router, }; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use reqwest::{multipart::Form, Client}; use serde::{Deserialize, Serialize}; use std::{sync::Arc, time::Duration}; @@ -17,6 +24,30 @@ use url::Url; pub mod config; +const MAX_IMAGES_PER_PAGE_CLOUDFLARE_STRING: &str = "10000"; + +#[derive(Debug, Deserialize)] +struct CloudflareImageListResponseResultImage { + filename: String, + id: String, + meta: Option>, +} + +#[derive(Debug, Deserialize)] +struct CloudflareImageListResponseResult { + images: Vec, +} + +#[derive(Debug, Deserialize)] +struct CloudflareImageListResponse { + result: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct GetExistingResponse { + pub id: String, +} + #[derive(Clone)] pub struct AssetUploaderWorkerContext { config: Arc, @@ -42,7 +73,12 @@ impl AssetUploaderWorkerContext { .build() .context("Error building reqwest client")?; let form = Form::new() - .text("id", hashed_url.clone()) // Replace with actual metadata + .text("id", hashed_url.clone()) + .text( + // Save the asset_uri in the upload metadata to enable retrieval by asset_uri later + "metadata", + format!("{{\"asset_uri\": \"{}\"}}", url.to_string()), + ) .text("url", url.to_string()); info!( @@ -87,12 +123,108 @@ impl AssetUploaderWorkerContext { }, } } + + /// Uploads an asset to Cloudflare and returns the response + async fn get_by_asset_uri(&self, url: &Url) -> anyhow::Result> { + let mut page = 1; + let hashed_url = sha256::digest(url.to_string()); + let client = Client::builder() + .timeout(Duration::from_secs(MAX_ASSET_UPLOAD_RETRY_SECONDS)) + .build() + .context("Error building reqwest client")?; + let mut params = AHashMap::new(); + params.insert( + "per_page", + MAX_IMAGES_PER_PAGE_CLOUDFLARE_STRING.to_string(), + ); + + loop { + info!( + asset_uri = ?url, + "[Asset Uploader] Finding asset from Cloudflare" + ); + + params.insert("page", page.to_string()); + let res = client + .get(format!( + "https://api.cloudflare.com/client/v4/accounts/{}/images/v1", + self.config.cloudflare_account_id + )) + .header( + "Authorization", + format!("Bearer {}", self.config.cloudflare_auth_key), + ) + .query(¶ms) + .send() + .await + .context("Error sending request to Cloudflare")?; + + let body = res.text().await.context("Error reading response body")?; + let body = serde_json::from_str::(&body) + .context("Error parsing response body")?; + let images = body + .result + .context("Error getting result from response body")? + .images; + + let res = images.par_iter().find_any(|image| { + // Metadata not guaranteed to exist + let meta_url = if let Some(meta) = &image.meta { + meta.get("asset_uri") + } else { + None + }; + + image.filename == hashed_url || meta_url == Some(&url.to_string()) + }); + + if let Some(image) = res { + return Ok(Some(image.id.clone())); + } + + if images.len() + < MAX_IMAGES_PER_PAGE_CLOUDFLARE_STRING + .parse::() + .context("Error parsing MAX_IMAGES_PER_PAGE_CLOUDFLARE_STRING")? + { + return Ok(None); + } + + page += 1; + } + } + + async fn handle_get_by_asset_uri( + Extension(context): Extension>, + Query(request): Query, + ) -> impl IntoResponse { + info!(asset_uri = ?request.url, "[Asset Uploader] Retrieving asset by asset_uri"); + match context.get_by_asset_uri(&request.url).await { + Ok(Some(id)) => { + info!(asset_uri = ?request.url, id = ?id, "[Asset Uploader] Asset found by asset_uri"); + (StatusCode::OK, Json(GetExistingResponse { id })).into_response() + }, + Ok(None) => { + info!(asset_uri = ?request.url, "[Asset Uploader] Asset not found by asset_uri"); + (StatusCode::NOT_FOUND, "Asset not found by asset_uri").into_response() + }, + Err(e) => { + error!(asset_uri = ?request.url, error = ?e, "[Asset Uploader] Error retrieving asset by asset_uri"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Error retrieving asset by asset_uri {}: {}", request.url, e), + ) + .into_response() + }, + } + } } impl Server for AssetUploaderWorkerContext { fn build_router(&self) -> Router { Router::new() .route("/", post(Self::handle_upload)) + .route("/get_existing", get(Self::handle_get_by_asset_uri)) .layer(Extension(Arc::new(self.clone()))) } } diff --git a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs index 23bcef6d5d35e..3433541e0f978 100644 --- a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs +++ b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs @@ -9,9 +9,22 @@ use axum::http::StatusCode; use diesel::prelude::*; use field_count::FieldCount; use serde::{Deserialize, Serialize}; +use std::hash::Hash; use uuid::Uuid; -#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[derive( + Clone, + Debug, + Deserialize, + FieldCount, + Identifiable, + Insertable, + Serialize, + PartialEq, + Eq, + PartialOrd, + Ord, // TODO: Custom Ord implementation for fairness +)] #[diesel(primary_key(request_id, asset_uri))] #[diesel(table_name = asset_uploader_request_statuses)] pub struct AssetUploaderRequestStatuses { @@ -19,11 +32,18 @@ pub struct AssetUploaderRequestStatuses { pub asset_uri: String, pub application_id: Uuid, pub status_code: i64, - pub error_message: Option, + pub error_messages: Option>>, pub cdn_image_uri: Option, pub num_failures: i64, } +impl Hash for AssetUploaderRequestStatuses { + fn hash(&self, state: &mut H) { + self.request_id.hash(state); + self.asset_uri.hash(state); + } +} + impl AssetUploaderRequestStatuses { pub fn new(request_id: Uuid, asset_uri: &str, application_id: Uuid) -> Self { Self { @@ -31,7 +51,7 @@ impl AssetUploaderRequestStatuses { asset_uri: asset_uri.to_string(), application_id, status_code: StatusCode::ACCEPTED.as_u16() as i64, - error_message: None, + error_messages: None, cdn_image_uri: None, num_failures: 0, } @@ -48,7 +68,7 @@ impl AssetUploaderRequestStatuses { asset_uri: asset_uri.to_string(), application_id, status_code: StatusCode::OK.as_u16() as i64, - error_message: None, + error_messages: None, cdn_image_uri: Some(cdn_image_uri.to_string()), num_failures: 0, } @@ -62,7 +82,7 @@ impl From<&AssetUploaderRequestStatusesQuery> for AssetUploaderRequestStatuses { asset_uri: query.asset_uri.clone(), application_id: query.application_id, status_code: query.status_code, - error_message: query.error_message.clone(), + error_messages: query.error_messages.clone(), cdn_image_uri: query.cdn_image_uri.clone(), num_failures: query.num_failures, } diff --git a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs index bf1959bd5e141..13dc6184a7263 100644 --- a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs +++ b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs @@ -7,20 +7,7 @@ use field_count::FieldCount; use serde::{Deserialize, Serialize}; use uuid::Uuid; -#[derive( - Clone, - Debug, - Deserialize, - FieldCount, - Identifiable, - Queryable, - Serialize, - Hash, - PartialEq, - Eq, - PartialOrd, - Ord, // TODO: Custom Ord implementation for fairness -)] +#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Queryable, Serialize)] #[diesel(primary_key(request_id, asset_uri))] #[diesel(table_name = asset_uploader_request_statuses)] pub struct AssetUploaderRequestStatusesQuery { @@ -28,7 +15,7 @@ pub struct AssetUploaderRequestStatusesQuery { pub asset_uri: String, pub application_id: Uuid, pub status_code: i64, - pub error_message: Option, + pub error_messages: Option>>, pub cdn_image_uri: Option, pub num_failures: i64, pub request_received_at: chrono::NaiveDateTime, diff --git a/ecosystem/nft-metadata-crawler/src/schema.rs b/ecosystem/nft-metadata-crawler/src/schema.rs index 86d6510dfa1b8..b0fb2a7e00a16 100644 --- a/ecosystem/nft-metadata-crawler/src/schema.rs +++ b/ecosystem/nft-metadata-crawler/src/schema.rs @@ -1,6 +1,3 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - // @generated automatically by Diesel CLI. pub mod nft_metadata_crawler { @@ -10,7 +7,7 @@ pub mod nft_metadata_crawler { asset_uri -> Varchar, application_id -> Uuid, status_code -> Int8, - error_message -> Nullable, + error_messages -> Nullable>>, cdn_image_uri -> Nullable, num_failures -> Int8, request_received_at -> Timestamp, From 06a4c03bd488825e8ee0460f9bcc9f2932ee0f97 Mon Sep 17 00:00:00 2001 From: Justin Chang Date: Fri, 18 Oct 2024 02:43:52 -0700 Subject: [PATCH 5/8] boom --- ecosystem/nft-metadata-crawler/src/schema.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ecosystem/nft-metadata-crawler/src/schema.rs b/ecosystem/nft-metadata-crawler/src/schema.rs index b0fb2a7e00a16..d2d532ba23887 100644 --- a/ecosystem/nft-metadata-crawler/src/schema.rs +++ b/ecosystem/nft-metadata-crawler/src/schema.rs @@ -1,3 +1,6 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + // @generated automatically by Diesel CLI. pub mod nft_metadata_crawler { From c27a61030c565b7dcf05cd8111e8968a62348199 Mon Sep 17 00:00:00 2001 From: Justin Chang Date: Fri, 18 Oct 2024 02:48:05 -0700 Subject: [PATCH 6/8] boom --- .../src/asset_uploader/throttler/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs index 1d576e40cc8cc..65326cce713bc 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs @@ -156,8 +156,8 @@ impl AssetUploaderThrottlerContext { &self, asset: AssetUploaderRequestStatuses, ) -> anyhow::Result { - // Make a request to the worker to look up the asset - info!(asset_uri = ?asset.asset_uri, "Requesting worker to look up asset"); + // Make a request to the worker to lookup the asset + info!(asset_uri = ?asset.asset_uri, "Requesting worker to lookup asset"); let mut asset_uploader_worker_uri = Url::parse(&self.config.asset_uploader_worker_uri)?; asset_uploader_worker_uri.set_path("get_existing"); let res = self @@ -226,7 +226,7 @@ impl AssetUploaderThrottlerContext { // Upload the asset in a separate task // If successful, remove the asset from the in-progress set and continue to next asset // If rate limited, sleep for 5 minutes then notify - // If unsuccessful due to conflict, attempt to look up the asset in Cloudflare + // If unsuccessful due to conflict, attempt to lookup the asset in Cloudflare // If unsuccessful for other reason, add the asset back to the queue let self_clone = self_arc.clone(); tokio::spawn(async move { From 46d4d4d56c2b6a87a0fde6b1a98d5912c89bb944 Mon Sep 17 00:00:00 2001 From: Justin Chang Date: Mon, 21 Oct 2024 18:01:34 -0700 Subject: [PATCH 7/8] lint --- .../nft-metadata-crawler/src/asset_uploader/throttler/mod.rs | 2 +- ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs index 65326cce713bc..47429003282cc 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs @@ -187,7 +187,7 @@ impl AssetUploaderThrottlerContext { self.config.cloudflare_default_variant, )); - asset.cdn_image_uri = cdn_image_uri.clone(); + asset.cdn_image_uri.clone_from(&cdn_image_uri); // Update the asset URI in the parsed_asset_uris table let mut parsed_asset_uri = ParsedAssetUris::new(&asset.asset_uri); diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs index f859f23e84473..708da221fcda0 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs @@ -77,7 +77,7 @@ impl AssetUploaderWorkerContext { .text( // Save the asset_uri in the upload metadata to enable retrieval by asset_uri later "metadata", - format!("{{\"asset_uri\": \"{}\"}}", url.to_string()), + format!("{{\"asset_uri\": \"{}\"}}", url), ) .text("url", url.to_string()); From d0cdb9ae88c7b61542d76ba7f72ad272c00a3877 Mon Sep 17 00:00:00 2001 From: Justin Chang Date: Mon, 21 Oct 2024 18:05:07 -0700 Subject: [PATCH 8/8] lint --- .../nft-metadata-crawler/src/asset_uploader/throttler/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs index 47429003282cc..77267226cb1e8 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs @@ -132,7 +132,7 @@ impl AssetUploaderThrottlerContext { self.config.cloudflare_default_variant, )); - asset.cdn_image_uri = cdn_image_uri.clone(); + asset.cdn_image_uri.clone_from(&cdn_image_uri); // Update the asset URI in the parsed_asset_uris table let mut parsed_asset_uri = ParsedAssetUris::new(&asset.asset_uri);