Skip to content

Commit

Permalink
[NFT Metadata Crawler] Asset Uploader API, Worker, Throttler (#14837)
Browse files Browse the repository at this point in the history
* squash

* rename

* address comments

* [NFT Metadata Crawler] Asset Uploader API (#14843)

* s

* fix

* edits

* update model

* upd oop

* [NFT Metadata Crawler] Asset Uploader Throttler (#14904)

* yay

* upsert

* awesome

* AHHHHHHHHHHHHHHHHHHHHHHH

* boom

* boom

* lint

* lint

* lint
  • Loading branch information
just-in-chang authored Oct 22, 2024
1 parent 7097148 commit bb6b5c7
Show file tree
Hide file tree
Showing 18 changed files with 1,160 additions and 206 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ aptos-cargo-cli = { path = "devtools/aptos-cargo-cli" }
# External crate dependencies.
# Please do not add any test features here: they should be declared by the individual crate.
aes-gcm = "0.10.3"
ahash = "0.8.11"
ahash = { version = "0.8.11", features = ["serde"] }
atty = "0.2.14"
nalgebra = "0.32"
float-cmp = "0.9.0"
Expand Down Expand Up @@ -552,7 +552,7 @@ derivation-path = "0.2.0"
derive_builder = "0.20.0"
determinator = "0.12.0"
derive_more = "0.99.11"
diesel = "2.1"
diesel = { version = "2.1", features = ["uuid"] }
# Use the crate version once this feature gets released on crates.io:
# https://github.com/weiznich/diesel_async/commit/e165e8c96a6c540ebde2d6d7c52df5c5620a4bf1
diesel-async = { git = "https://github.com/weiznich/diesel_async.git", rev = "d02798c67065d763154d7272dd0c09b39757d0f2", features = [
Expand Down
3 changes: 3 additions & 0 deletions ecosystem/nft-metadata-crawler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ repository = { workspace = true }
rust-version = { workspace = true }

[dependencies]
ahash = { workspace = true }
anyhow = { workspace = true }
aptos-indexer-grpc-server-framework = { workspace = true }
aptos-metrics-core = { workspace = true }
Expand All @@ -36,6 +37,7 @@ futures = { workspace = true }
google-cloud-storage = { workspace = true }
image = { workspace = true }
once_cell = { workspace = true }
rayon = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
Expand All @@ -44,3 +46,4 @@ sha256 = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS asset_uploader_status_code_inserted_at;
DROP TABLE IF EXISTS nft_metadata_crawler.asset_uploader_request_statuses;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE nft_metadata_crawler.asset_uploader_request_statuses (
request_id UUID NOT NULL,
asset_uri VARCHAR NOT NULL,
application_id UUID NOT NULL,
status_code BIGINT NOT NULL DEFAULT 202,
error_messages TEXT[],
cdn_image_uri VARCHAR,
num_failures BIGINT NOT NULL DEFAULT 0,
request_received_at TIMESTAMP NOT NULL DEFAULT NOW(),
inserted_at TIMESTAMP NOT NULL DEFAULT NOW(),
PRIMARY KEY (request_id, asset_uri)
);
CREATE INDEX IF NOT EXISTS asset_uploader_status_code_inserted_at ON nft_metadata_crawler.asset_uploader_request_statuses (status_code, inserted_at);
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
asset_uploader::api::GetStatusResponseSuccess,
models::asset_uploader_request_statuses_query::AssetUploaderRequestStatusesQuery, schema,
};
use ahash::AHashMap;
use axum::http::StatusCode;
use diesel::{
r2d2::{ConnectionManager, Pool, PooledConnection},
ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl,
};
use tracing::debug;
use uuid::Uuid;

pub fn get_status(
pool: Pool<ConnectionManager<PgConnection>>,
request_id: &str,
) -> anyhow::Result<AHashMap<String, GetStatusResponseSuccess>> {
let mut conn = pool.get()?;
let request_id = Uuid::parse_str(request_id)?;

let mut status_response = AHashMap::new();
let rows = query_status(&mut conn, &request_id)?;
for row in rows {
if row.status_code == StatusCode::OK.as_u16() as i64 {
status_response.insert(row.asset_uri, GetStatusResponseSuccess::Success {
status_code: StatusCode::OK.as_u16(),
cdn_image_uri: row.cdn_image_uri.unwrap_or_default(),
});
} else {
status_response.insert(row.asset_uri, GetStatusResponseSuccess::Error {
status_code: row.status_code as u16,
error_message: row.error_messages,
});
};
}

Ok(status_response)
}

fn query_status(
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
uuid: &Uuid,
) -> anyhow::Result<Vec<AssetUploaderRequestStatusesQuery>> {
use schema::nft_metadata_crawler::asset_uploader_request_statuses::dsl::*;

let query = asset_uploader_request_statuses.filter(request_id.eq(uuid));

let debug_query = diesel::debug_query::<diesel::pg::Pg, _>(&query).to_string();
debug!("Executing Query: {}", debug_query);
let rows = query.load(conn)?;
Ok(rows)
}
128 changes: 128 additions & 0 deletions ecosystem/nft-metadata-crawler/src/asset_uploader/api/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{asset_uploader::api::get_status::get_status, config::Server};
use ahash::AHashMap;
use axum::{
extract::Path,
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Extension, Json,
};
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::error;
use upload_batch::upload_batch;
use url::Url;

mod get_status;
mod upload_batch;

#[derive(Clone)]
pub struct AssetUploaderApiContext {
pool: Pool<ConnectionManager<PgConnection>>,
}

#[derive(Debug, Deserialize)]
struct BatchUploadRequest {
application_id: String,
urls: Vec<Url>,
}

#[derive(Serialize)]
#[serde(untagged)]
enum BatchUploadResponse {
Success { request_id: String },
Error { error: String },
}

#[derive(Serialize)]
#[serde(untagged)]
pub enum GetStatusResponseSuccess {
Success {
status_code: u16,
cdn_image_uri: String,
},
Error {
status_code: u16,
error_message: Option<Vec<Option<String>>>,
},
}

#[derive(Serialize)]
#[serde(untagged)]
enum GetStatusResponse {
Success {
request_id: String,
urls: AHashMap<String, GetStatusResponseSuccess>,
},
Error {
error: String,
},
}

impl AssetUploaderApiContext {
pub fn new(pool: Pool<ConnectionManager<PgConnection>>) -> Self {
Self { pool }
}

async fn handle_upload_batch(
Extension(context): Extension<Arc<AssetUploaderApiContext>>,
Json(request): Json<BatchUploadRequest>,
) -> impl IntoResponse {
match upload_batch(context.pool.clone(), &request) {
Ok(request_id) => (
StatusCode::OK,
Json(BatchUploadResponse::Success { request_id }),
),
Err(e) => {
error!(error = ?e, "Error uploading asset");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(BatchUploadResponse::Error {
error: format!("Error uploading asset: {}", e),
}),
)
},
}
}

async fn handle_get_status(
Extension(context): Extension<Arc<AssetUploaderApiContext>>,
Path(request_id): Path<String>, // Extracts request_id from the URL
) -> impl IntoResponse {
match get_status(context.pool.clone(), &request_id) {
Ok(statuses) => (
StatusCode::OK,
Json(GetStatusResponse::Success {
request_id,
urls: statuses,
}),
),
Err(e) => {
error!(error = ?e, "Error getting status");
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(GetStatusResponse::Error {
error: format!("Error getting status: {}", e),
}),
)
},
}
}
}

impl Server for AssetUploaderApiContext {
fn build_router(&self) -> axum::Router {
let self_arc = Arc::new(self.clone());
axum::Router::new()
.route("/upload", post(Self::handle_upload_batch))
.route("/status/:request_id", get(Self::handle_get_status))
.layer(Extension(self_arc.clone()))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
asset_uploader::api::BatchUploadRequest,
models::asset_uploader_request_statuses::AssetUploaderRequestStatuses, schema,
};
use ahash::AHashMap;
use anyhow::Context;
use diesel::{
r2d2::{ConnectionManager, Pool, PooledConnection},
BoolExpressionMethods, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl,
};
use tracing::debug;
use url::Url;
use uuid::Uuid;

/// Uploads a batch of assets to the asset uploader worker
pub fn upload_batch(
pool: Pool<ConnectionManager<PgConnection>>,
request: &BatchUploadRequest,
) -> anyhow::Result<String> {
let mut conn = pool.get()?;
let existing_rows = get_existing_rows(&mut conn, &request.urls)?;

let request_id = Uuid::new_v4();
let application_id = Uuid::parse_str(&request.application_id)?;
let mut request_statuses = vec![];
for url in &request.urls {
if let Some(cdn_image_uri) = existing_rows.get(url.as_str()) {
request_statuses.push(AssetUploaderRequestStatuses::new_completed(
request_id,
url.as_str(),
application_id,
cdn_image_uri.as_deref().unwrap(), // Safe to unwrap because we checked for existence when querying
));
} else {
request_statuses.push(AssetUploaderRequestStatuses::new(
request_id,
url.as_str(),
application_id,
));
}
}

insert_request_statuses(&mut conn, &request_statuses)?;
Ok(request_id.to_string())
}

fn get_existing_rows(
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
urls: &[Url],
) -> anyhow::Result<AHashMap<String, Option<String>>> {
use schema::nft_metadata_crawler::parsed_asset_uris::dsl::*;

let query = parsed_asset_uris
.filter(
asset_uri
.eq_any(urls.iter().map(Url::as_str))
.and(cdn_image_uri.is_not_null()),
)
.select((asset_uri, cdn_image_uri));

let debug_query = diesel::debug_query::<diesel::pg::Pg, _>(&query).to_string();
debug!("Executing Query: {}", debug_query);
let rows = query.load(conn)?;
Ok(AHashMap::from_iter(rows))
}

fn insert_request_statuses(
conn: &mut PooledConnection<ConnectionManager<PgConnection>>,
request_statuses: &[AssetUploaderRequestStatuses],
) -> anyhow::Result<usize> {
use schema::nft_metadata_crawler::asset_uploader_request_statuses::dsl::*;

let query =
diesel::insert_into(schema::nft_metadata_crawler::asset_uploader_request_statuses::table)
.values(request_statuses)
.on_conflict((request_id, asset_uri))
.do_nothing();

let debug_query = diesel::debug_query::<diesel::pg::Pg, _>(&query).to_string();
debug!("Executing Query: {}", debug_query);
query.execute(conn).context(debug_query)
}
Loading

0 comments on commit bb6b5c7

Please sign in to comment.