Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NFT Metadata Crawler] Asset Uploader Throttler #14904

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ecosystem/nft-metadata-crawler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ futures = { workspace = true }
google-cloud-storage = { workspace = true }
image = { workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions ecosystem/nft-metadata-crawler/src/asset_uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
// SPDX-License-Identifier: Apache-2.0

pub mod api;
pub mod throttler;
pub mod worker;
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct AssetUploaderThrottlerConfig {
/// URI for the Asset Uploader Worker
pub asset_uploader_worker_uri: String,
/// Interval in seconds to poll Postgres to update upload queue
#[serde(default = "AssetUploaderThrottlerConfig::default_poll_interval_seconds")]
pub poll_interval_seconds: u64,
/// Maximum number of rows to poll from Postgres
#[serde(default = "AssetUploaderThrottlerConfig::default_poll_rows_limit")]
pub poll_rows_limit: u64,
/// Cloudflare Account Hash provided at the images home page used for generating the CDN image URLs
pub cloudflare_account_hash: String,
/// Cloudflare Image Delivery URL prefix provided at the images home page used for generating the CDN image URLs
pub cloudflare_image_delivery_prefix: String,
/// In addition to on the fly transformations, Cloudflare images can be returned in preset variants. This is the default variant used with the saved CDN image URLs.
pub cloudflare_default_variant: String,
}

impl AssetUploaderThrottlerConfig {
pub const fn default_poll_interval_seconds() -> u64 {
10
}

pub const fn default_poll_rows_limit() -> u64 {
600
}
}
253 changes: 253 additions & 0 deletions ecosystem/nft-metadata-crawler/src/asset_uploader/throttler/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
// Copyright © Aptos Foundation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i assume we only will have single throttler instance anytime with in memory queue? let say in future we replace cloudflare with another provider without rate limit concerns, i guess we can even safely remove throttler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes and yes 👍

// SPDX-License-Identifier: Apache-2.0

use crate::{
asset_uploader::worker::UploadRequest,
config::Server,
models::{
asset_uploader_request_statuses::AssetUploaderRequestStatuses,
asset_uploader_request_statuses_query::AssetUploaderRequestStatusesQuery,
},
schema,
};
use ahash::AHashSet;
use anyhow::Context;
use axum::{http::StatusCode as AxumStatusCode, response::IntoResponse, routing::post, Extension};
use config::AssetUploaderThrottlerConfig;
use diesel::{
r2d2::{ConnectionManager, Pool},
upsert::excluded,
ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl,
};
use parking_lot::Mutex;
use reqwest::{Client, StatusCode as ReqwestStatusCode};
use serde::Deserialize;
use std::{
collections::BTreeSet,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::sync::Notify;
use tracing::{debug, error};
use url::Url;

pub mod config;

const FIVE_MINUTES: Duration = Duration::from_secs(60 * 5);

// Structs below are for accessing relevant data in a typed way for Cloudflare API calls
#[derive(Debug, Deserialize)]
struct CloudflareImageUploadResponseResult {
id: String,
}

#[derive(Debug, Deserialize)]
struct CloudflareImageUploadResponse {
errors: Vec<String>,
result: CloudflareImageUploadResponseResult,
}

#[derive(Clone)]
pub struct AssetUploaderThrottlerContext {
config: AssetUploaderThrottlerConfig,
pool: Pool<ConnectionManager<PgConnection>>,
asset_queue: Arc<Mutex<BTreeSet<AssetUploaderRequestStatusesQuery>>>,
in_progress_assets: Arc<Mutex<AHashSet<AssetUploaderRequestStatusesQuery>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use single lock for both queues to avoid possible race conditions btw queue updates and uploads? where an asset could be added to the queue by update_queue just as it's being removed and processed by handle_upload_assets? chatgpt suggested https://docs.rs/crossbeam-queue/latest/crossbeam_queue/ as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oki doki

inserted_notify: Arc<Notify>,
is_rate_limited: Arc<AtomicBool>,
rate_limit_over_notify: Arc<Notify>,
client: Arc<Client>,
}

impl AssetUploaderThrottlerContext {
pub fn new(
config: AssetUploaderThrottlerConfig,
pool: Pool<ConnectionManager<PgConnection>>,
) -> Self {
Self {
config,
pool,
asset_queue: Arc::new(Mutex::new(BTreeSet::new())),
in_progress_assets: Arc::new(Mutex::new(AHashSet::new())),
inserted_notify: Arc::new(Notify::new()),
is_rate_limited: Arc::new(AtomicBool::new(false)),
rate_limit_over_notify: Arc::new(Notify::new()),
client: Arc::new(Client::new()),
}
}

async fn upload_asset(
&self,
asset: &AssetUploaderRequestStatusesQuery,
) -> anyhow::Result<ReqwestStatusCode> {
use schema::nft_metadata_crawler::asset_uploader_request_statuses::dsl::*;

// Make a request to the worker to upload the asset
let res = self
.client
.post(self.config.asset_uploader_worker_uri.clone())
.json(&UploadRequest {
url: Url::parse(&asset.asset_uri)?,
})
.send()
.await
.context("Error sending upload request to worker")?;

let status = res.status();
let body = res.text().await?;
let body = serde_json::from_str::<CloudflareImageUploadResponse>(&body)?;

// Update the request in Postgres with the response
let mut asset: AssetUploaderRequestStatuses = asset.into();
asset.status_code = status.as_u16() as i64;
if status == ReqwestStatusCode::OK {
asset.cdn_image_uri = Some(format!(
"{}/{}/{}/{}",
self.config.cloudflare_image_delivery_prefix,
self.config.cloudflare_account_hash,
body.result.id,
self.config.cloudflare_default_variant,
));
} else {
asset.num_failures += 1;
asset.error_message = Some(body.errors.join(", "));
}

let query = diesel::insert_into(asset_uploader_request_statuses)
.values(asset)
.on_conflict((request_id, asset_uri))
.do_update()
.set((
status_code.eq(excluded(status_code)),
error_message.eq(excluded(error_message)),
cdn_image_uri.eq(excluded(cdn_image_uri)),
num_failures.eq(excluded(num_failures)),
inserted_at.eq(excluded(inserted_at)),
));

let debug_query = diesel::debug_query::<diesel::pg::Pg, _>(&query).to_string();
debug!("Executing Query: {}", debug_query);
query.execute(&mut self.pool.get()?)?;

Ok(status)
}

async fn handle_upload_assets(&self) {
let self_arc = Arc::new(self.clone());
loop {
// Wait until notified if rate limited
if self.is_rate_limited.load(Ordering::Relaxed) {
self.rate_limit_over_notify.notified().await;
self.is_rate_limited.store(false, Ordering::Relaxed);
}

// Wait until notified if queue is empty
let is_empty = self.asset_queue.lock().is_empty();
if is_empty {
self.inserted_notify.notified().await;
}

// Pop the first asset from the queue and add it to the in-progress set
let mut queue = self.asset_queue.lock();
let asset = queue.pop_first().unwrap(); // Safe to unwrap because we checked if the queue is empty
let mut in_progress = self.in_progress_assets.lock();
in_progress.insert(asset.clone());
drop(in_progress);
drop(queue);

// Upload the asset in a separate task
// If successful, remove the asset from the in-progress set and continue to next asset
// If unsuccessful, add the asset back to the queue
let self_clone = self_arc.clone();
tokio::spawn(async move {
if let Ok(res) = self_clone.upload_asset(&asset).await {
if res.is_success() {
let mut in_progress = self_clone.in_progress_assets.lock();
in_progress.remove(&asset);
} else {
// If rate limited, sleep for 5 minutes then notify
if res == ReqwestStatusCode::TOO_MANY_REQUESTS {
self_clone.is_rate_limited.store(true, Ordering::Relaxed);
tokio::time::sleep(FIVE_MINUTES).await;
self_clone.rate_limit_over_notify.notify_one();
}

let mut queue = self_clone.asset_queue.lock();
queue.insert(asset);
};
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its possible that server restarts when asset from in progress queue was already sent for uploading. so you might run into errors where it's already uploaded, so failed to upload again but DB wasn't updated yet due to server restart. we might need to handle that error specifically and write to DB even if it errors (also make a GET to cloudflare to get the uploaded url?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a lot of error handling around asset already exists 409 (5409) errors:

  • If the case you described happens, the result is the asset existing in Cloudflare but not the DB. I introduce another endpoint to the worker that performs a best effort "lookup" operation to see if the asset_uri exists in Cloudflare. It needs to parse through the list of ALL images in Cloudflare to see if the asset we want is already there (there isn't a lookup option 😡). We help the lookup by additionally writing the original asset_uri in the metadata for each uploaded asset. This is an expensive operation, but it should basically never be used.
  • For other 409s, we attempt to first grab the data from the parsed_aasset_uris table to see if there's already an exiting CDN URI there.

error!(asset_uri = ?asset.asset_uri, "[Asset Uploader Throttler] Error uploading asset");
let mut queue = self_clone.asset_queue.lock();
queue.insert(asset);
}
});
}
}

async fn update_queue(&self) -> anyhow::Result<()> {
use schema::nft_metadata_crawler::asset_uploader_request_statuses::dsl::*;

let query = asset_uploader_request_statuses
.filter(status_code.ne(ReqwestStatusCode::OK.as_u16() as i64))
.order_by(inserted_at.asc())
.limit(self.config.poll_rows_limit as i64);

let debug_query = diesel::debug_query::<diesel::pg::Pg, _>(&query).to_string();
debug!("Executing Query: {}", debug_query);
let rows = query.load(&mut self.pool.get()?)?;

let mut queue = self.asset_queue.lock();
let in_progress = self.in_progress_assets.lock();
for row in rows {
if !queue.contains(&row) && !in_progress.contains(&row) {
queue.insert(row);
}
}

Ok(())
}

async fn start_update_loop(&self) {
let poll_interval_seconds = Duration::from_secs(self.config.poll_interval_seconds);
loop {
if let Err(e) = self.update_queue().await {
error!(error = ?e, "[Asset Uploader Throttler] Error updating queue");
}
self.inserted_notify.notify_one();
tokio::time::sleep(poll_interval_seconds).await;
}
}

async fn handle_update_queue(Extension(context): Extension<Arc<Self>>) -> impl IntoResponse {
match context.update_queue().await {
Ok(_) => AxumStatusCode::OK,
Err(e) => {
error!(error = ?e, "[Asset Uploader Throttler] Error updating queue");
AxumStatusCode::INTERNAL_SERVER_ERROR
},
}
}
}

impl Server for AssetUploaderThrottlerContext {
fn build_router(&self) -> axum::Router {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm wonder do we need to have a fixed threads like 4 to upload assets? right now we're single threaded in asset uploading

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The uploading is actually all spun off onto a separate thread.

The only thing sequential about this is getting the next asset to upload, which is needed bc we want all the uploading to be in order

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remind me why we want all uploading to be in order? for fairness?

let self_arc = Arc::new(self.clone());

let self_arc_clone = self_arc.clone();
tokio::spawn(async move {
self_arc_clone.handle_upload_assets().await;
});

let self_arc_clone = self_arc.clone();
tokio::spawn(async move {
self_arc_clone.start_update_loop().await;
});

axum::Router::new()
.route("/update_queue", post(Self::handle_update_queue))
.layer(Extension(self_arc.clone()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use axum::{
body::Body, http::StatusCode, response::IntoResponse, routing::post, Extension, Json, Router,
};
use reqwest::{multipart::Form, Client};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use tracing::{error, info};
use url::Url;
Expand All @@ -22,9 +22,9 @@ pub struct AssetUploaderWorkerContext {
config: Arc<AssetUploaderWorkerConfig>,
}

#[derive(Debug, Deserialize)]
struct UploadRequest {
url: Url,
#[derive(Debug, Deserialize, Serialize)]
pub struct UploadRequest {
pub url: Url,
}

impl AssetUploaderWorkerContext {
Expand Down
10 changes: 10 additions & 0 deletions ecosystem/nft-metadata-crawler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::{
asset_uploader::{
api::AssetUploaderApiContext,
throttler::{config::AssetUploaderThrottlerConfig, AssetUploaderThrottlerContext},
worker::{config::AssetUploaderWorkerConfig, AssetUploaderWorkerContext},
},
parser::{config::ParserConfig, ParserContext},
Expand Down Expand Up @@ -32,6 +33,7 @@ pub enum ServerConfig {
Parser(ParserConfig),
AssetUploaderWorker(AssetUploaderWorkerConfig),
AssetUploaderApi,
AssetUploaderThrottler(AssetUploaderThrottlerConfig),
}

/// Structs to hold config from YAML
Expand All @@ -49,6 +51,7 @@ pub enum ServerContext {
Parser(ParserContext),
AssetUploaderWorker(AssetUploaderWorkerContext),
AssetUploaderApi(AssetUploaderApiContext),
AssetUploaderThrottler(AssetUploaderThrottlerContext),
}

impl ServerConfig {
Expand All @@ -68,6 +71,12 @@ impl ServerConfig {
ServerConfig::AssetUploaderApi => {
ServerContext::AssetUploaderApi(AssetUploaderApiContext::new(pool))
},
ServerConfig::AssetUploaderThrottler(asset_uploader_throttler_config) => {
ServerContext::AssetUploaderThrottler(AssetUploaderThrottlerContext::new(
asset_uploader_throttler_config.clone(),
pool,
))
},
}
}
}
Expand Down Expand Up @@ -99,6 +108,7 @@ impl RunnableConfig for NFTMetadataCrawlerConfig {
ServerConfig::Parser(_) => "parser",
ServerConfig::AssetUploaderWorker(_) => "asset_uploader_worker",
ServerConfig::AssetUploaderApi => "asset_uploader_api",
ServerConfig::AssetUploaderThrottler(_) => "asset_uploader_throttler",
}
.to_string()
}
Expand Down
Loading
Loading