diff --git a/Cargo.lock b/Cargo.lock index 817f860a4..e4141288e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1354,6 +1354,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + [[package]] name = "core-foundation" version = "0.9.3" @@ -1575,9 +1581,71 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "das-metadata-json" +version = "0.1.0" +dependencies = [ + "anchor-client", + "anchor-lang", + "anyhow", + "async-trait", + "backon", + "base64 0.21.4", + "blockbuster", + "borsh 0.10.3", + "bs58 0.4.0", + "cadence", + "cadence-macros", + "chrono", + "clap 4.4.6", + "das-tree-backfiller", + "derive_more", + "digital_asset_types", + "env_logger 0.10.0", + "figment", + "flatbuffers", + "futures", + "futures-util", + "hyper", + "indicatif", + "lazy_static", + "log", + "mpl-bubblegum", + "num-traits", + "plerkle_messenger", + "plerkle_serialization", + "rand 0.8.5", + "redis", + "regex", + "reqwest", + "rust-crypto", + "sea-orm", + "sea-query 0.28.5", + "serde", + "serde_json", + "solana-account-decoder", + "solana-client", + "solana-geyser-plugin-interface", + "solana-sdk", + "solana-sdk-macro", + "solana-transaction-status", + "spl-account-compression", + "spl-concurrent-merkle-tree", + "spl-token 4.0.0", + "sqlx", + "stretto", + "thiserror", + "tokio", + "tokio-postgres", + "tokio-stream", + "tracing-subscriber", + "url", + "uuid", +] + [[package]] name = "das-tree-backfiller" -version = "0.8.0" +version = "0.1.0" dependencies = [ "anchor-client", "anchor-lang", @@ -1690,6 +1758,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_more" +version = "0.99.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +dependencies = [ + "convert_case", + "proc-macro2 1.0.69", + "quote 1.0.33", + "rustc_version", + "syn 1.0.109", +] + [[package]] name = "dialoguer" version = "0.10.4" diff --git a/Cargo.toml b/Cargo.toml index a96d5d566..d3677d7ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "digital_asset_types", + "metadata_json", "metaplex-rpc-proxy", "nft_ingester", "tree_backfiller", diff --git a/metadata_json/Cargo.toml b/metadata_json/Cargo.toml new file mode 100644 index 000000000..7e5d80d28 --- /dev/null +++ b/metadata_json/Cargo.toml @@ -0,0 +1,97 @@ +[package] +name = "das-metadata-json" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] + +[[bin]] +name = "das-metadata-json" + +[dependencies] + +backon = "0.4.1" +log = "0.4.17" +env_logger = "0.10.0" +anyhow = "1.0.75" +derive_more = "0.99.17" +redis = { version = "0.22.3", features = [ + "aio", + "tokio-comp", + "streams", + "tokio-native-tls-comp", +] } +futures = { version = "0.3.25" } +futures-util = "0.3.27" +base64 = "0.21.0" +indicatif = "0.17.5" +thiserror = "1.0.31" +serde_json = "1.0.81" +cadence = "0.29.0" +cadence-macros = "0.29.0" +hyper = "0.14.23" +anchor-client = "0.28.0" +das-tree-backfiller = { path = "../tree_backfiller" } +tokio = { version = "1.26.0", features = ["full", "tracing"] } +sqlx = { version = "0.6.2", features = [ + "macros", + "runtime-tokio-rustls", + "postgres", + "uuid", + "offline", + "json", +] } +sea-orm = { version = "0.10.6", features = [ + "macros", + "runtime-tokio-rustls", + "sqlx-postgres", + "with-chrono", + "mock", +] } +sea-query = { version = "0.28.1", features = ["postgres-array"] } +chrono = "0.4.19" +tokio-postgres = "0.7.7" +serde = "1.0.136" +bs58 = "0.4.0" +reqwest = "0.11.11" +plerkle_messenger = { version = "1.6.0", features = ['redis'] } +plerkle_serialization = { version = "1.6.0" } +flatbuffers = "23.1.21" +lazy_static = "1.4.0" +regex = "1.5.5" +digital_asset_types = { path = "../digital_asset_types", features = [ + "json_types", + "sql_types", +] } +mpl-bubblegum = "1.0.1-beta.3" +spl-account-compression = { version = "0.2.0", features = ["no-entrypoint"] } +spl-concurrent-merkle-tree = "0.2.0" +uuid = "1.0.0" +async-trait = "0.1.53" +num-traits = "0.2.15" +blockbuster = "0.9.0-beta.1" +figment = { version = "0.10.6", features = ["env", "toml", "yaml"] } +solana-sdk = "~1.16.16" +solana-client = "~1.16.16" +spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] } +solana-transaction-status = "~1.16.16" +solana-account-decoder = "~1.16.16" +solana-geyser-plugin-interface = "~1.16.16" +solana-sdk-macro = "~1.16.16" +rand = "0.8.5" +rust-crypto = "0.2.36" +url = "2.3.1" +anchor-lang = "0.28.0" +borsh = "~0.10.3" +stretto = { version = "0.7", features = ["async"] } +tokio-stream = "0.1.12" +tracing-subscriber = { version = "0.3.16", features = [ + "json", + "env-filter", + "ansi", +] } +clap = { version = "4.2.2", features = ["derive", "cargo", "env"] } + +[lints] +workspace = true diff --git a/metadata_json/src/cmds/backfill.rs b/metadata_json/src/cmds/backfill.rs new file mode 100644 index 000000000..095607248 --- /dev/null +++ b/metadata_json/src/cmds/backfill.rs @@ -0,0 +1,89 @@ +use { + crate::worker::{Worker, WorkerArgs}, + backon::{ExponentialBuilder, Retryable}, + clap::Parser, + das_tree_backfiller::db, + das_tree_backfiller::metrics::{Metrics, MetricsArgs}, + digital_asset_types::dao::asset_data, + log::info, + reqwest::ClientBuilder, + sea_orm::{entity::*, prelude::*, query::*, EntityTrait, SqlxPostgresConnector}, + tokio::{ + sync::mpsc, + time::{Duration, Instant}, + }, +}; + +#[derive(Parser, Clone, Debug)] +pub struct BackfillArgs { + #[clap(flatten)] + database: db::PoolArgs, + + #[command(flatten)] + metrics: MetricsArgs, + + #[command(flatten)] + worker: WorkerArgs, + + #[arg(long, default_value = "1000")] + timeout: u64, + + #[arg(long, default_value = "1000")] + batch_size: u64, +} + +pub async fn run(args: BackfillArgs) -> Result<(), anyhow::Error> { + let batch_size = args.batch_size; + + let pool = db::connect(args.database).await?; + + let metrics = Metrics::try_from_config(args.metrics)?; + + let client = ClientBuilder::new() + .timeout(Duration::from_millis(args.timeout)) + .build()?; + + let worker = Worker::from(args.worker); + + let (tx, handle) = worker.start(pool.clone(), metrics.clone(), client.clone()); + + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + + let mut condition = Condition::all(); + condition = condition.add(asset_data::Column::Reindex.eq(true)); + let query = asset_data::Entity::find() + .filter(condition) + .order_by(asset_data::Column::Id, Order::Asc); + + let mut after = None; + + loop { + let mut query = query.clone().cursor_by(asset_data::Column::Id); + let mut query = query.first(batch_size); + + if let Some(after) = after { + query = query.after(after); + } + + let assets = query.all(&conn).await?; + let assets_count = assets.len(); + + for asset in assets.clone() { + tx.send(asset.id).await?; + } + + if u64::try_from(assets_count)? < batch_size { + break; + } + + after = assets.last().cloned().map(|asset| asset.id); + } + + drop(tx); + + info!("Waiting for tasks to finish"); + handle.await?; + + info!("Tasks finished"); + Ok(()) +} diff --git a/metadata_json/src/cmds/ingest.rs b/metadata_json/src/cmds/ingest.rs new file mode 100644 index 000000000..e2fe0ec15 --- /dev/null +++ b/metadata_json/src/cmds/ingest.rs @@ -0,0 +1,62 @@ +use crate::stream::{Receiver, ReceiverArgs}; +use crate::worker::{Worker, WorkerArgs}; +use clap::Parser; +use das_tree_backfiller::{ + db, + metrics::{Metrics, MetricsArgs}, +}; +use digital_asset_types::dao::asset_data; +use log::info; +use reqwest::{Client, ClientBuilder}; +use tokio::{sync::mpsc, time::Duration}; + +#[derive(Parser, Clone, Debug)] +pub struct IngestArgs { + #[clap(flatten)] + receiver: ReceiverArgs, + + #[clap(flatten)] + metrics: MetricsArgs, + + #[clap(flatten)] + database: db::PoolArgs, + + #[arg(long, default_value = "1000")] + timeout: u64, + + #[clap(flatten)] + worker: WorkerArgs, +} + +pub async fn run(args: IngestArgs) -> Result<(), anyhow::Error> { + let rx = Receiver::try_from_config(args.receiver.into()).await?; + + let pool = db::connect(args.database).await?; + + let metrics = Metrics::try_from_config(args.metrics)?; + + let client = ClientBuilder::new() + .timeout(Duration::from_millis(args.timeout)) + .build()?; + + let worker = Worker::from(args.worker); + + let (tx, handle) = worker.start(pool.clone(), metrics.clone(), client.clone()); + + while let Ok(messages) = rx.recv().await { + for message in messages.clone() { + tx.send(message.data).await?; + } + + let ids: Vec = messages.into_iter().map(|m| m.id).collect(); + rx.ack(&ids).await?; + } + + drop(tx); + + handle.await?; + + info!("Ingesting stopped"); + + Ok(()) +} diff --git a/metadata_json/src/cmds/mod.rs b/metadata_json/src/cmds/mod.rs new file mode 100644 index 000000000..9bf21f89f --- /dev/null +++ b/metadata_json/src/cmds/mod.rs @@ -0,0 +1,2 @@ +pub mod backfill; +pub mod ingest; diff --git a/metadata_json/src/lib.rs b/metadata_json/src/lib.rs new file mode 100644 index 000000000..7ef1b944c --- /dev/null +++ b/metadata_json/src/lib.rs @@ -0,0 +1,3 @@ +mod stream; + +pub use stream::*; diff --git a/metadata_json/src/main.rs b/metadata_json/src/main.rs new file mode 100644 index 000000000..39d4abdb9 --- /dev/null +++ b/metadata_json/src/main.rs @@ -0,0 +1,37 @@ +use clap::{Parser, Subcommand}; + +mod cmds; +mod stream; +mod worker; + +use cmds::{backfill, ingest}; + +#[derive(Parser)] +#[command(author, about, next_line_help = true)] +struct Args { + #[command(subcommand)] + action: Action, +} + +#[derive(Subcommand, Clone)] +enum Action { + Ingest(ingest::IngestArgs), + Backfill(backfill::BackfillArgs), + Report, +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + env_logger::init(); + + let args = Args::parse(); + + match args.action { + Action::Ingest(args) => ingest::run(args).await, + Action::Backfill(args) => backfill::run(args).await, + Action::Report => { + println!("Report"); + Ok(()) + } + } +} diff --git a/metadata_json/src/stream/mod.rs b/metadata_json/src/stream/mod.rs new file mode 100644 index 000000000..74ef564eb --- /dev/null +++ b/metadata_json/src/stream/mod.rs @@ -0,0 +1,5 @@ +pub mod receiver; +pub mod sender; + +pub use receiver::*; +pub use sender::*; diff --git a/metadata_json/src/stream/receiver.rs b/metadata_json/src/stream/receiver.rs new file mode 100644 index 000000000..8a3bcf717 --- /dev/null +++ b/metadata_json/src/stream/receiver.rs @@ -0,0 +1,85 @@ +use clap::Parser; +use figment::value::{Dict, Value}; +use plerkle_messenger::{ + select_messenger, Messenger, MessengerConfig, MessengerError, MessengerType, RecvData, +}; +use std::sync::{Arc, Mutex}; + +const METADATA_JSON_STREAM: &'static str = "METADATA_JSON"; + +#[derive(Clone, Debug, Parser)] +pub struct ReceiverArgs { + #[arg(long, env)] + pub messenger_redis_url: String, + #[arg(long, env, default_value = "100")] + pub messenger_redis_batch_size: String, +} + +impl From for MessengerConfig { + fn from(args: ReceiverArgs) -> Self { + let mut connection_config = Dict::new(); + + connection_config.insert( + "redis_connection_str".to_string(), + Value::from(args.messenger_redis_url), + ); + connection_config.insert( + "batch_size".to_string(), + Value::from(args.messenger_redis_batch_size), + ); + connection_config.insert( + "pipeline_size_bytes".to_string(), + Value::from(1u128.to_string()), + ); + + Self { + messenger_type: MessengerType::Redis, + connection_config: connection_config, + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum ReceiverError { + #[error("unable to acquire mutex lock")] + Lock, + #[error("messenger: {0}")] + Messenger(#[from] plerkle_messenger::MessengerError), +} + +#[derive(Clone)] +pub struct Receiver(Arc>>); + +impl Receiver { + pub async fn try_from_config(config: MessengerConfig) -> Result { + let mut messenger = select_messenger(config).await?; + + messenger.add_stream(METADATA_JSON_STREAM).await?; + messenger + .set_buffer_size(METADATA_JSON_STREAM, 10000000000000000) + .await; + + Ok(Self(Arc::new(Mutex::new(messenger)))) + } + + pub async fn recv(&self) -> Result, ReceiverError> { + let mut messenger = self.0.lock().map_err(|_| ReceiverError::Lock)?; + + messenger + .recv( + METADATA_JSON_STREAM, + plerkle_messenger::ConsumptionType::All, + ) + .await + .map_err(Into::into) + } + + pub async fn ack(&self, ids: &[String]) -> Result<(), ReceiverError> { + let mut messenger = self.0.lock().map_err(|_| ReceiverError::Lock)?; + + messenger + .ack_msg(METADATA_JSON_STREAM, ids) + .await + .map_err(Into::into) + } +} diff --git a/metadata_json/src/stream/sender.rs b/metadata_json/src/stream/sender.rs new file mode 100644 index 000000000..e69de29bb diff --git a/metadata_json/src/worker.rs b/metadata_json/src/worker.rs new file mode 100644 index 000000000..cffe02d6f --- /dev/null +++ b/metadata_json/src/worker.rs @@ -0,0 +1,203 @@ +use { + backon::{ExponentialBuilder, Retryable}, + clap::Parser, + das_tree_backfiller::{ + db, + metrics::{Metrics, MetricsArgs}, + }, + digital_asset_types::dao::asset_data, + futures::{stream::FuturesUnordered, StreamExt}, + indicatif::HumanDuration, + log::{debug, error, info}, + reqwest::{Client, ClientBuilder, Url}, + sea_orm::{entity::*, prelude::*, query::*, EntityTrait, SqlxPostgresConnector}, + tokio::{ + sync::mpsc, + task::JoinHandle, + time::{Duration, Instant}, + }, +}; + +#[derive(Parser, Clone, Debug)] +pub struct WorkerArgs { + #[arg(long, env, default_value = "1000")] + queue_size: usize, + #[arg(long, env, default_value = "100")] + worker_count: usize, +} + +pub struct Worker { + queue_size: usize, + worker_count: usize, +} + +impl From for Worker { + fn from(args: WorkerArgs) -> Self { + Self { + queue_size: args.queue_size, + worker_count: args.worker_count, + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum WorkerError { + #[error("send error: {0}")] + Send(#[from] mpsc::error::SendError), + #[error("join error: {0}")] + Join(#[from] tokio::task::JoinError), +} + +impl Worker { + pub fn start( + &self, + pool: sqlx::PgPool, + metrics: Metrics, + client: Client, + ) -> (mpsc::Sender>, JoinHandle<()>) { + let (tx, mut rx) = mpsc::channel::>(self.queue_size); + let worker_count = self.worker_count; + + let handle = tokio::spawn(async move { + let mut handlers = FuturesUnordered::new(); + + while let Some(asset_data) = rx.recv().await { + if handlers.len() >= worker_count { + handlers.next().await; + } + + let pool = pool.clone(); + let metrics = metrics.clone(); + let client = client.clone(); + + handlers.push(spawn_task(client, pool, metrics, asset_data)); + } + + while let Some(_) = handlers.next().await {} + }); + + (tx, handle) + } +} + +fn spawn_task( + client: Client, + pool: sqlx::PgPool, + metrics: Metrics, + asset_data: Vec, +) -> JoinHandle<()> { + tokio::spawn(async move { + let timing = Instant::now(); + + let asset_data_id = asset_data.clone(); + let asset_data_id = bs58::encode(asset_data_id).into_string(); + + if let Err(e) = perform_metadata_json_task(client, pool, asset_data).await { + error!("Asset {} {}", asset_data_id, e); + + metrics.increment("ingester.bgtask.error"); + } else { + metrics.increment("ingester.bgtask.success"); + } + + debug!( + "Asset {} finished in {}", + asset_data_id, + HumanDuration(timing.elapsed()) + ); + + metrics.time("ingester.bgtask.finished", timing.elapsed()); + }) +} + +#[derive(thiserror::Error, Debug)] +enum MetadataJsonTaskError { + #[error("sea orm: {0}")] + SeaOrm(#[from] sea_orm::DbErr), + #[error("metadata json: {0}")] + Fetch(#[from] FetchMetadataJsonError), + #[error("asset not found in the db")] + AssetNotFound, +} + +async fn perform_metadata_json_task( + client: Client, + pool: sqlx::PgPool, + asset_data: Vec, +) -> Result { + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + + let asset_data = asset_data::Entity::find() + .filter(asset_data::Column::Id.eq(asset_data)) + .one(&conn) + .await? + .ok_or(MetadataJsonTaskError::AssetNotFound)?; + + let metadata = fetch_metadata_json(client, &asset_data.metadata_url).await?; + + let asset_data_active_model = asset_data::ActiveModel { + id: Set(asset_data.id), + metadata: Set(metadata), + reindex: Set(Some(false)), + ..Default::default() + }; + + asset_data_active_model + .update(&conn) + .await + .map_err(Into::into) +} + +#[derive(thiserror::Error, Debug)] +enum FetchMetadataJsonError { + #[error("reqwest: {0}")] + GenericReqwest(#[from] reqwest::Error), + #[error("json parse for url({url}) with {source}")] + Parse { source: reqwest::Error, url: Url }, + #[error("response {status} for url ({url}) with {source}")] + Response { + source: reqwest::Error, + url: Url, + status: StatusCode, + }, + #[error("url parse: {0}")] + Url(#[from] url::ParseError), +} + +#[derive(Debug, derive_more::Display)] +pub enum StatusCode { + Unknown, + Code(reqwest::StatusCode), +} + +async fn fetch_metadata_json( + client: Client, + uri: &str, +) -> Result { + (|| async { + let url = Url::parse(uri)?; + + let response = client.get(url.clone()).send().await?; + + match response.error_for_status() { + Ok(res) => res + .json::() + .await + .map_err(|source| FetchMetadataJsonError::Parse { source, url }), + Err(source) => { + let status = source + .status() + .map(StatusCode::Code) + .unwrap_or(StatusCode::Unknown); + + return Err(FetchMetadataJsonError::Response { + source, + url, + status, + }); + } + } + }) + .retry(&ExponentialBuilder::default()) + .await +} diff --git a/nft_ingester/src/account_updates.rs b/nft_ingester/src/account_updates.rs index 2b3991a36..ca949ca56 100644 --- a/nft_ingester/src/account_updates.rs +++ b/nft_ingester/src/account_updates.rs @@ -25,6 +25,7 @@ pub fn account_worker( ) -> JoinHandle<()> { tokio::spawn(async move { let source = T::new(config).await; + if let Ok(mut msg) = source { let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender)); loop { diff --git a/tree_backfiller/Cargo.toml b/tree_backfiller/Cargo.toml index b634e099f..8f5a6ba73 100644 --- a/tree_backfiller/Cargo.toml +++ b/tree_backfiller/Cargo.toml @@ -1,12 +1,14 @@ [package] name = "das-tree-backfiller" -version = "0.8.0" +version = "0.1.0" edition = "2021" publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] + [[bin]] name = "das-tree-backfiller" diff --git a/tree_backfiller/src/queue.rs b/tree_backfiller/src/queue.rs index eaa7f312e..01588edc3 100644 --- a/tree_backfiller/src/queue.rs +++ b/tree_backfiller/src/queue.rs @@ -17,6 +17,8 @@ pub struct QueueArgs { pub messenger_redis_batch_size: String, #[arg(long, env, default_value = "25")] pub messenger_queue_connections: u64, + #[arg(long, env, default_value = "TXNFILL")] + pub messenger_queue_stream: String, } impl From for MessengerConfig {