diff --git a/Cargo.lock b/Cargo.lock index e7cd870fd0a89..03f1223887714 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3103,6 +3103,8 @@ dependencies = [ "google-cloud-storage", "image", "once_cell", + "parking_lot 0.12.1", + "rayon", "regex", "reqwest 0.11.23", "serde", diff --git a/ecosystem/nft-metadata-crawler/Cargo.toml b/ecosystem/nft-metadata-crawler/Cargo.toml index 35f20f5f83c6f..79bc223999905 100644 --- a/ecosystem/nft-metadata-crawler/Cargo.toml +++ b/ecosystem/nft-metadata-crawler/Cargo.toml @@ -37,6 +37,8 @@ futures = { workspace = true } google-cloud-storage = { workspace = true } image = { workspace = true } once_cell = { workspace = true } +parking_lot = { workspace = true } +rayon = { workspace = true } regex = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } diff --git a/ecosystem/nft-metadata-crawler/migrations/2024-10-02-062003_add_asset_uploader_requests_table/up.sql b/ecosystem/nft-metadata-crawler/migrations/2024-10-02-062003_add_asset_uploader_requests_table/up.sql index 384f15fc6c7c4..6c014e2bab40d 100644 --- a/ecosystem/nft-metadata-crawler/migrations/2024-10-02-062003_add_asset_uploader_requests_table/up.sql +++ b/ecosystem/nft-metadata-crawler/migrations/2024-10-02-062003_add_asset_uploader_requests_table/up.sql @@ -3,7 +3,7 @@ CREATE TABLE nft_metadata_crawler.asset_uploader_request_statuses ( asset_uri VARCHAR NOT NULL, application_id UUID NOT NULL, status_code BIGINT NOT NULL DEFAULT 202, - error_message VARCHAR, + error_messages TEXT[], cdn_image_uri VARCHAR, num_failures BIGINT NOT NULL DEFAULT 0, request_received_at TIMESTAMP NOT NULL DEFAULT NOW(), diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/api/get_status.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/api/get_status.rs index badb30c22d66c..608cf6d7a63d6 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/api/get_status.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/api/get_status.rs @@ -32,7 +32,7 @@ pub fn get_status( } else { status_response.insert(row.asset_uri, GetStatusResponseSuccess::Error { status_code: row.status_code as u16, - error_message: row.error_message, + error_message: row.error_messages, }); }; } diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/api/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/api/mod.rs index df6b16d5808b7..3b817778dd3c6 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/api/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/api/mod.rs @@ -50,7 +50,7 @@ pub enum GetStatusResponseSuccess { }, Error { status_code: u16, - error_message: Option, + error_message: Option>>, }, } diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/mod.rs index e528f38d1ad41..182efb465f8b4 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/mod.rs @@ -2,4 +2,5 @@ // SPDX-License-Identifier: Apache-2.0 pub mod api; +pub mod throttler; pub mod worker; diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/config.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/config.rs new file mode 100644 index 0000000000000..3481bf4ebf32d --- /dev/null +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/config.rs @@ -0,0 +1,33 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct AssetUploaderThrottlerConfig { + /// URI for the Asset Uploader Worker + pub asset_uploader_worker_uri: String, + /// Interval in seconds to poll Postgres to update upload queue + #[serde(default = "AssetUploaderThrottlerConfig::default_poll_interval_seconds")] + pub poll_interval_seconds: u64, + /// Maximum number of rows to poll from Postgres + #[serde(default = "AssetUploaderThrottlerConfig::default_poll_rows_limit")] + pub poll_rows_limit: u64, + /// Cloudflare Account Hash provided at the images home page used for generating the CDN image URLs + pub cloudflare_account_hash: String, + /// Cloudflare Image Delivery URL prefix provided at the images home page used for generating the CDN image URLs + pub cloudflare_image_delivery_prefix: String, + /// In addition to on the fly transformations, Cloudflare images can be returned in preset variants. This is the default variant used with the saved CDN image URLs. + pub cloudflare_default_variant: String, +} + +impl AssetUploaderThrottlerConfig { + pub const fn default_poll_interval_seconds() -> u64 { + 10 + } + + pub const fn default_poll_rows_limit() -> u64 { + 600 + } +} diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs new file mode 100644 index 0000000000000..77267226cb1e8 --- /dev/null +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs @@ -0,0 +1,394 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + asset_uploader::worker::{GetExistingResponse, UploadRequest}, + config::Server, + models::{ + asset_uploader_request_statuses::AssetUploaderRequestStatuses, + asset_uploader_request_statuses_query::AssetUploaderRequestStatusesQuery, + parsed_asset_uris::ParsedAssetUris, parsed_asset_uris_query::ParsedAssetUrisQuery, + }, + schema::{self}, + utils::database::upsert_uris, +}; +use ahash::{AHashMap, AHashSet}; +use anyhow::Context; +use axum::{http::StatusCode as AxumStatusCode, response::IntoResponse, routing::post, Extension}; +use config::AssetUploaderThrottlerConfig; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + upsert::excluded, + ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, +}; +use reqwest::{Client, StatusCode as ReqwestStatusCode}; +use serde::Deserialize; +use std::{ + collections::BTreeSet, + fmt::Display, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::sync::{Mutex, Notify}; +use tracing::{debug, error, info}; +use url::Url; + +pub mod config; + +const FIVE_MINUTES: Duration = Duration::from_secs(60 * 5); + +// Structs below are for accessing relevant data in a typed way for Cloudflare API calls +#[derive(Debug, Deserialize)] +struct CloudflareImageUploadResponseResult { + id: String, +} + +#[derive(Debug, Deserialize)] +struct CloudflareImageUploadResponseError { + code: i64, + message: String, +} + +impl Display for CloudflareImageUploadResponseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}: {}", self.code, self.message) + } +} + +#[derive(Debug, Deserialize)] +struct CloudflareImageUploadResponse { + errors: Vec, + result: Option, +} + +#[derive(Clone)] +pub struct UploadQueue { + asset_queue: BTreeSet, + in_progress_assets: AHashSet, +} + +#[derive(Clone)] +pub struct AssetUploaderThrottlerContext { + config: AssetUploaderThrottlerConfig, + pool: Pool>, + upload_queue: Arc>, + inserted_notify: Arc, + is_rate_limited: Arc, + rate_limit_over_notify: Arc, + client: Arc, +} + +impl AssetUploaderThrottlerContext { + pub fn new( + config: AssetUploaderThrottlerConfig, + pool: Pool>, + ) -> Self { + Self { + config, + pool, + upload_queue: Arc::new(Mutex::new(UploadQueue { + asset_queue: BTreeSet::new(), + in_progress_assets: AHashSet::new(), + })), + inserted_notify: Arc::new(Notify::new()), + is_rate_limited: Arc::new(AtomicBool::new(false)), + rate_limit_over_notify: Arc::new(Notify::new()), + client: Arc::new(Client::new()), + } + } + + async fn upload_asset( + &self, + asset: AssetUploaderRequestStatuses, + ) -> anyhow::Result { + // Make a request to the worker to upload the asset + info!(asset_uri = ?asset.asset_uri, "Requesting worker to upload asset"); + let res = self + .client + .post(self.config.asset_uploader_worker_uri.clone()) + .json(&UploadRequest { + url: Url::parse(&asset.asset_uri)?, + }) + .send() + .await + .context("Error sending upload request to worker")?; + + let status = res.status(); + let body = res.text().await?; + let body = serde_json::from_str::(&body)?; + + // Update the request in Postgres with the response + let mut asset = asset; + asset.status_code = status.as_u16() as i64; + if status == ReqwestStatusCode::OK { + let cdn_image_uri = Some(format!( + "{}/{}/{}/{}", + self.config.cloudflare_image_delivery_prefix, + self.config.cloudflare_account_hash, + body.result.context("Result not found")?.id, + self.config.cloudflare_default_variant, + )); + + asset.cdn_image_uri.clone_from(&cdn_image_uri); + + // Update the asset URI in the parsed_asset_uris table + let mut parsed_asset_uri = ParsedAssetUris::new(&asset.asset_uri); + parsed_asset_uri.set_cdn_image_uri(cdn_image_uri); + upsert_uris(&mut self.pool.get()?, &parsed_asset_uri, 0)?; + } else { + asset.num_failures += 1; + asset.error_messages = Some( + body.errors + .iter() + .map(|err| Some(err.to_string())) + .collect::>(), + ); + } + + self.update_request_status(&asset)?; + Ok(asset) + } + + async fn get_from_cloudflare( + &self, + asset: AssetUploaderRequestStatuses, + ) -> anyhow::Result { + // Make a request to the worker to lookup the asset + info!(asset_uri = ?asset.asset_uri, "Requesting worker to lookup asset"); + let mut asset_uploader_worker_uri = Url::parse(&self.config.asset_uploader_worker_uri)?; + asset_uploader_worker_uri.set_path("get_existing"); + let res = self + .client + .get(asset_uploader_worker_uri) + .query(&AHashMap::from_iter(vec![( + "url".to_string(), + asset.asset_uri.clone(), + )])) + .send() + .await + .context("Error sending upload request to worker")?; + + let status = res.status(); + let body = res.text().await?; + let body = serde_json::from_str::(&body)?; + + // Update the request in Postgres with the response + let mut asset = asset; + asset.status_code = status.as_u16() as i64; + if status == ReqwestStatusCode::OK { + let cdn_image_uri = Some(format!( + "{}/{}/{}/{}", + self.config.cloudflare_image_delivery_prefix, + self.config.cloudflare_account_hash, + body.id, + self.config.cloudflare_default_variant, + )); + + asset.cdn_image_uri.clone_from(&cdn_image_uri); + + // Update the asset URI in the parsed_asset_uris table + let mut parsed_asset_uri = ParsedAssetUris::new(&asset.asset_uri); + parsed_asset_uri.set_cdn_image_uri(cdn_image_uri); + upsert_uris(&mut self.pool.get()?, &parsed_asset_uri, 0)?; + } else { + asset.num_failures += 1; + asset.error_messages = Some(vec![Some("Asset not found in Cloudflare".to_string())]); + } + + self.update_request_status(&asset)?; + Ok(asset) + } + + async fn handle_upload_assets(&self) { + let self_arc = Arc::new(self.clone()); + loop { + // Wait until notified if rate limited + if self.is_rate_limited.load(Ordering::Relaxed) { + self.rate_limit_over_notify.notified().await; + self.is_rate_limited.store(false, Ordering::Relaxed); + } + + // Wait until notified if queue is empty + let is_empty = self.upload_queue.lock().await.asset_queue.is_empty(); + if is_empty { + self.inserted_notify.notified().await; + } + + // Pop the first asset from the queue and add it to the in-progress set + let mut upload_queue = self.upload_queue.lock().await; + let asset = upload_queue.asset_queue.pop_first().unwrap(); // Safe to unwrap because we checked if the queue is empty + upload_queue.in_progress_assets.insert(asset.clone()); + drop(upload_queue); + + // Upload the asset in a separate task + // If successful, remove the asset from the in-progress set and continue to next asset + // If rate limited, sleep for 5 minutes then notify + // If unsuccessful due to conflict, attempt to lookup the asset in Cloudflare + // If unsuccessful for other reason, add the asset back to the queue + let self_clone = self_arc.clone(); + tokio::spawn(async move { + // Handle upload depending on previous attempt status. + // If previous attempt resulted in a 409, the asset likely already exists, so we call a different endpoint on the worker to perform the lookup. + let upload_res = match ReqwestStatusCode::from_u16(asset.status_code as u16)? { + ReqwestStatusCode::CONFLICT => { + self_clone.get_from_cloudflare(asset.clone()).await + }, + _ => self_clone.upload_asset(asset.clone()).await, + }; + + let mut upload_queue = self_clone.upload_queue.lock().await; + match upload_res { + Ok(asset) => { + let mut asset = asset; + match ReqwestStatusCode::from_u16(asset.status_code as u16)? { + ReqwestStatusCode::OK => { + // If success, remove asset from in-progress set and end early + upload_queue.in_progress_assets.remove(&asset); + anyhow::Ok(()) + }, + ReqwestStatusCode::TOO_MANY_REQUESTS => { + // If rate limited, sleep for 5 minutes then notify + self_clone.is_rate_limited.store(true, Ordering::Relaxed); + tokio::time::sleep(FIVE_MINUTES).await; + self_clone.rate_limit_over_notify.notify_one(); + Ok(()) + }, + ReqwestStatusCode::CONFLICT => { + // If conflict, attempt to get cdn_image_uri from parsed_asset_uris table + if let Some(parsed_asset_uri) = + ParsedAssetUrisQuery::get_by_asset_uri( + &mut self_clone.pool.get()?, + &asset.asset_uri, + ) + { + // If cdn_image_uri found, update asset and request status + if let Some(cdn_image_uri) = parsed_asset_uri.cdn_image_uri { + asset.cdn_image_uri = Some(cdn_image_uri); + self_clone.update_request_status(&asset)?; + return Ok(()); + } + } + + // If cdn_image_uri still not found and num_failures < 3, add asset back to queue. + if asset.cdn_image_uri.is_none() && asset.num_failures < 3 { + self_clone.update_request_status(&asset)?; + upload_queue.asset_queue.insert(asset); + self_clone.inserted_notify.notify_one(); + return Ok(()); + } + + // Remove asset from in-progress set and end early. + // No point in retrying more than 3 times because the asset already exists and could not be found in Postgrs or Cloudflare. + upload_queue.in_progress_assets.remove(&asset); + Ok(()) + }, + _ => Ok(()), + } + }, + Err(e) => { + error!(error = ?e, asset_uri = asset.asset_uri, "[Asset Uploader Throttler] Error uploading asset"); + upload_queue.asset_queue.insert(asset); + Ok(()) + }, + } + }); + } + } + + async fn update_queue(&self) -> anyhow::Result { + use schema::nft_metadata_crawler::asset_uploader_request_statuses::dsl::*; + + let query = asset_uploader_request_statuses + .filter(status_code.ne(ReqwestStatusCode::OK.as_u16() as i64)) + .order_by(inserted_at.asc()) + .limit(self.config.poll_rows_limit as i64); + + let debug_query = diesel::debug_query::(&query).to_string(); + debug!("Executing Query: {}", debug_query); + let rows: Vec = query.load(&mut self.pool.get()?)?; + + let mut num_queued = 0; + for row in rows { + let row: AssetUploaderRequestStatuses = (&row).into(); + let upload_queue = &mut self.upload_queue.lock().await; + if !upload_queue.in_progress_assets.contains(&row) { + upload_queue.asset_queue.insert(row); + num_queued += 1; + } + } + + Ok(num_queued) + } + + async fn start_update_loop(&self) { + let poll_interval_seconds = Duration::from_secs(self.config.poll_interval_seconds); + loop { + match self.update_queue().await { + Ok(num_queued) => { + if num_queued > 0 { + self.inserted_notify.notify_one(); + } + }, + Err(e) => { + error!(error = ?e, "[Asset Uploader Throttler] Error updating queue"); + }, + } + + tokio::time::sleep(poll_interval_seconds).await; + } + } + + async fn handle_update_queue(Extension(context): Extension>) -> impl IntoResponse { + match context.update_queue().await { + Ok(_) => AxumStatusCode::OK, + Err(e) => { + error!(error = ?e, "[Asset Uploader Throttler] Error updating queue"); + AxumStatusCode::INTERNAL_SERVER_ERROR + }, + } + } + + fn update_request_status(&self, asset: &AssetUploaderRequestStatuses) -> anyhow::Result<()> { + use schema::nft_metadata_crawler::asset_uploader_request_statuses::dsl::*; + + let query = diesel::insert_into(asset_uploader_request_statuses) + .values(asset) + .on_conflict((request_id, asset_uri)) + .do_update() + .set(( + status_code.eq(excluded(status_code)), + error_messages.eq(excluded(error_messages)), + cdn_image_uri.eq(excluded(cdn_image_uri)), + num_failures.eq(excluded(num_failures)), + inserted_at.eq(excluded(inserted_at)), + )); + + let debug_query = diesel::debug_query::(&query).to_string(); + debug!("Executing Query: {}", debug_query); + query.execute(&mut self.pool.get()?)?; + Ok(()) + } +} + +impl Server for AssetUploaderThrottlerContext { + fn build_router(&self) -> axum::Router { + let self_arc = Arc::new(self.clone()); + + let self_arc_clone = self_arc.clone(); + tokio::spawn(async move { + self_arc_clone.handle_upload_assets().await; + }); + + let self_arc_clone = self_arc.clone(); + tokio::spawn(async move { + self_arc_clone.start_update_loop().await; + }); + + axum::Router::new() + .route("/update_queue", post(Self::handle_update_queue)) + .layer(Extension(self_arc.clone())) + } +} diff --git a/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs b/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs index 3e8bb27a089ab..708da221fcda0 100644 --- a/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs +++ b/ecosystem/nft-metadata-crawler/src/asset_uploader/worker/mod.rs @@ -5,26 +5,57 @@ use crate::{ asset_uploader::worker::config::AssetUploaderWorkerConfig, config::Server, utils::constants::MAX_ASSET_UPLOAD_RETRY_SECONDS, }; +use ahash::AHashMap; use anyhow::Context; use axum::{ - body::Body, http::StatusCode, response::IntoResponse, routing::post, Extension, Json, Router, + body::Body, + extract::Query, + http::StatusCode, + response::IntoResponse, + routing::{get, post}, + Extension, Json, Router, }; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use reqwest::{multipart::Form, Client}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::{sync::Arc, time::Duration}; use tracing::{error, info}; use url::Url; pub mod config; +const MAX_IMAGES_PER_PAGE_CLOUDFLARE_STRING: &str = "10000"; + +#[derive(Debug, Deserialize)] +struct CloudflareImageListResponseResultImage { + filename: String, + id: String, + meta: Option>, +} + +#[derive(Debug, Deserialize)] +struct CloudflareImageListResponseResult { + images: Vec, +} + +#[derive(Debug, Deserialize)] +struct CloudflareImageListResponse { + result: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct GetExistingResponse { + pub id: String, +} + #[derive(Clone)] pub struct AssetUploaderWorkerContext { config: Arc, } -#[derive(Debug, Deserialize)] -struct UploadRequest { - url: Url, +#[derive(Debug, Deserialize, Serialize)] +pub struct UploadRequest { + pub url: Url, } impl AssetUploaderWorkerContext { @@ -42,7 +73,12 @@ impl AssetUploaderWorkerContext { .build() .context("Error building reqwest client")?; let form = Form::new() - .text("id", hashed_url.clone()) // Replace with actual metadata + .text("id", hashed_url.clone()) + .text( + // Save the asset_uri in the upload metadata to enable retrieval by asset_uri later + "metadata", + format!("{{\"asset_uri\": \"{}\"}}", url), + ) .text("url", url.to_string()); info!( @@ -87,12 +123,108 @@ impl AssetUploaderWorkerContext { }, } } + + /// Uploads an asset to Cloudflare and returns the response + async fn get_by_asset_uri(&self, url: &Url) -> anyhow::Result> { + let mut page = 1; + let hashed_url = sha256::digest(url.to_string()); + let client = Client::builder() + .timeout(Duration::from_secs(MAX_ASSET_UPLOAD_RETRY_SECONDS)) + .build() + .context("Error building reqwest client")?; + let mut params = AHashMap::new(); + params.insert( + "per_page", + MAX_IMAGES_PER_PAGE_CLOUDFLARE_STRING.to_string(), + ); + + loop { + info!( + asset_uri = ?url, + "[Asset Uploader] Finding asset from Cloudflare" + ); + + params.insert("page", page.to_string()); + let res = client + .get(format!( + "https://api.cloudflare.com/client/v4/accounts/{}/images/v1", + self.config.cloudflare_account_id + )) + .header( + "Authorization", + format!("Bearer {}", self.config.cloudflare_auth_key), + ) + .query(¶ms) + .send() + .await + .context("Error sending request to Cloudflare")?; + + let body = res.text().await.context("Error reading response body")?; + let body = serde_json::from_str::(&body) + .context("Error parsing response body")?; + let images = body + .result + .context("Error getting result from response body")? + .images; + + let res = images.par_iter().find_any(|image| { + // Metadata not guaranteed to exist + let meta_url = if let Some(meta) = &image.meta { + meta.get("asset_uri") + } else { + None + }; + + image.filename == hashed_url || meta_url == Some(&url.to_string()) + }); + + if let Some(image) = res { + return Ok(Some(image.id.clone())); + } + + if images.len() + < MAX_IMAGES_PER_PAGE_CLOUDFLARE_STRING + .parse::() + .context("Error parsing MAX_IMAGES_PER_PAGE_CLOUDFLARE_STRING")? + { + return Ok(None); + } + + page += 1; + } + } + + async fn handle_get_by_asset_uri( + Extension(context): Extension>, + Query(request): Query, + ) -> impl IntoResponse { + info!(asset_uri = ?request.url, "[Asset Uploader] Retrieving asset by asset_uri"); + match context.get_by_asset_uri(&request.url).await { + Ok(Some(id)) => { + info!(asset_uri = ?request.url, id = ?id, "[Asset Uploader] Asset found by asset_uri"); + (StatusCode::OK, Json(GetExistingResponse { id })).into_response() + }, + Ok(None) => { + info!(asset_uri = ?request.url, "[Asset Uploader] Asset not found by asset_uri"); + (StatusCode::NOT_FOUND, "Asset not found by asset_uri").into_response() + }, + Err(e) => { + error!(asset_uri = ?request.url, error = ?e, "[Asset Uploader] Error retrieving asset by asset_uri"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Error retrieving asset by asset_uri {}: {}", request.url, e), + ) + .into_response() + }, + } + } } impl Server for AssetUploaderWorkerContext { fn build_router(&self) -> Router { Router::new() .route("/", post(Self::handle_upload)) + .route("/get_existing", get(Self::handle_get_by_asset_uri)) .layer(Extension(Arc::new(self.clone()))) } } diff --git a/ecosystem/nft-metadata-crawler/src/config.rs b/ecosystem/nft-metadata-crawler/src/config.rs index 3de5d96d83568..55ff3f38b6734 100644 --- a/ecosystem/nft-metadata-crawler/src/config.rs +++ b/ecosystem/nft-metadata-crawler/src/config.rs @@ -4,6 +4,7 @@ use crate::{ asset_uploader::{ api::AssetUploaderApiContext, + throttler::{config::AssetUploaderThrottlerConfig, AssetUploaderThrottlerContext}, worker::{config::AssetUploaderWorkerConfig, AssetUploaderWorkerContext}, }, parser::{config::ParserConfig, ParserContext}, @@ -32,6 +33,7 @@ pub enum ServerConfig { Parser(ParserConfig), AssetUploaderWorker(AssetUploaderWorkerConfig), AssetUploaderApi, + AssetUploaderThrottler(AssetUploaderThrottlerConfig), } /// Structs to hold config from YAML @@ -49,6 +51,7 @@ pub enum ServerContext { Parser(ParserContext), AssetUploaderWorker(AssetUploaderWorkerContext), AssetUploaderApi(AssetUploaderApiContext), + AssetUploaderThrottler(AssetUploaderThrottlerContext), } impl ServerConfig { @@ -68,6 +71,12 @@ impl ServerConfig { ServerConfig::AssetUploaderApi => { ServerContext::AssetUploaderApi(AssetUploaderApiContext::new(pool)) }, + ServerConfig::AssetUploaderThrottler(asset_uploader_throttler_config) => { + ServerContext::AssetUploaderThrottler(AssetUploaderThrottlerContext::new( + asset_uploader_throttler_config.clone(), + pool, + )) + }, } } } @@ -99,6 +108,7 @@ impl RunnableConfig for NFTMetadataCrawlerConfig { ServerConfig::Parser(_) => "parser", ServerConfig::AssetUploaderWorker(_) => "asset_uploader_worker", ServerConfig::AssetUploaderApi => "asset_uploader_api", + ServerConfig::AssetUploaderThrottler(_) => "asset_uploader_throttler", } .to_string() } diff --git a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs index fd190ac41f25c..3433541e0f978 100644 --- a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs +++ b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses.rs @@ -1,24 +1,47 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::schema::nft_metadata_crawler::asset_uploader_request_statuses; +use crate::{ + models::asset_uploader_request_statuses_query::AssetUploaderRequestStatusesQuery, + schema::nft_metadata_crawler::asset_uploader_request_statuses, +}; use axum::http::StatusCode; use diesel::prelude::*; use field_count::FieldCount; use serde::{Deserialize, Serialize}; +use std::hash::Hash; use uuid::Uuid; -#[derive(Clone, Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)] +#[derive( + Clone, + Debug, + Deserialize, + FieldCount, + Identifiable, + Insertable, + Serialize, + PartialEq, + Eq, + PartialOrd, + Ord, // TODO: Custom Ord implementation for fairness +)] #[diesel(primary_key(request_id, asset_uri))] #[diesel(table_name = asset_uploader_request_statuses)] pub struct AssetUploaderRequestStatuses { - request_id: Uuid, - asset_uri: String, - application_id: Uuid, - status_code: i64, - error_message: Option, - cdn_image_uri: Option, - num_failures: i64, + pub request_id: Uuid, + pub asset_uri: String, + pub application_id: Uuid, + pub status_code: i64, + pub error_messages: Option>>, + pub cdn_image_uri: Option, + pub num_failures: i64, +} + +impl Hash for AssetUploaderRequestStatuses { + fn hash(&self, state: &mut H) { + self.request_id.hash(state); + self.asset_uri.hash(state); + } } impl AssetUploaderRequestStatuses { @@ -28,7 +51,7 @@ impl AssetUploaderRequestStatuses { asset_uri: asset_uri.to_string(), application_id, status_code: StatusCode::ACCEPTED.as_u16() as i64, - error_message: None, + error_messages: None, cdn_image_uri: None, num_failures: 0, } @@ -45,9 +68,23 @@ impl AssetUploaderRequestStatuses { asset_uri: asset_uri.to_string(), application_id, status_code: StatusCode::OK.as_u16() as i64, - error_message: None, + error_messages: None, cdn_image_uri: Some(cdn_image_uri.to_string()), num_failures: 0, } } } + +impl From<&AssetUploaderRequestStatusesQuery> for AssetUploaderRequestStatuses { + fn from(query: &AssetUploaderRequestStatusesQuery) -> Self { + Self { + request_id: query.request_id, + asset_uri: query.asset_uri.clone(), + application_id: query.application_id, + status_code: query.status_code, + error_messages: query.error_messages.clone(), + cdn_image_uri: query.cdn_image_uri.clone(), + num_failures: query.num_failures, + } + } +} diff --git a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs index 0b7ed72e9f3cb..13dc6184a7263 100644 --- a/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs +++ b/ecosystem/nft-metadata-crawler/src/models/asset_uploader_request_statuses_query.rs @@ -15,7 +15,7 @@ pub struct AssetUploaderRequestStatusesQuery { pub asset_uri: String, pub application_id: Uuid, pub status_code: i64, - pub error_message: Option, + pub error_messages: Option>>, pub cdn_image_uri: Option, pub num_failures: i64, pub request_received_at: chrono::NaiveDateTime, diff --git a/ecosystem/nft-metadata-crawler/src/schema.rs b/ecosystem/nft-metadata-crawler/src/schema.rs index 86d6510dfa1b8..d2d532ba23887 100644 --- a/ecosystem/nft-metadata-crawler/src/schema.rs +++ b/ecosystem/nft-metadata-crawler/src/schema.rs @@ -10,7 +10,7 @@ pub mod nft_metadata_crawler { asset_uri -> Varchar, application_id -> Uuid, status_code -> Int8, - error_message -> Nullable, + error_messages -> Nullable>>, cdn_image_uri -> Nullable, num_failures -> Int8, request_received_at -> Timestamp,