From 686a2b2e6b4b660aa0c8a3dcaee3b2b2b0e50738 Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Mon, 15 Jan 2024 13:55:15 +0100 Subject: [PATCH] feat(metadata_json): add new crate for continuesly process messages and do backfill. --- Cargo.lock | 28 +--- metadata_json/Cargo.toml | 35 +---- metadata_json/README.md | 102 ++++++++++++++ metadata_json/src/cmds/backfill.rs | 12 +- metadata_json/src/cmds/ingest.rs | 11 +- metadata_json/src/stream/mod.rs | 2 + metadata_json/src/stream/receiver.rs | 19 ++- metadata_json/src/stream/sender.rs | 112 +++++++++++++++ metadata_json/src/worker.rs | 78 ++++++----- nft_ingester/Cargo.toml | 1 + nft_ingester/src/config.rs | 2 + nft_ingester/src/main.rs | 25 +++- nft_ingester/src/stream.rs | 6 +- nft_ingester/src/tasks/mod.rs | 51 ++++++- tools/bgtask_creator/src/main.rs | 201 +-------------------------- tree_backfiller/src/backfiller.rs | 32 ++--- tree_backfiller/src/metrics.rs | 39 ++---- 17 files changed, 382 insertions(+), 374 deletions(-) create mode 100644 metadata_json/README.md diff --git a/Cargo.lock b/Cargo.lock index e4141288e..c5e3eb2a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1585,14 +1585,8 @@ dependencies = [ name = "das-metadata-json" version = "0.1.0" dependencies = [ - "anchor-client", - "anchor-lang", "anyhow", - "async-trait", "backon", - "base64 0.21.4", - "blockbuster", - "borsh 0.10.3", "bs58 0.4.0", "cadence", "cadence-macros", @@ -1603,44 +1597,23 @@ dependencies = [ "digital_asset_types", "env_logger 0.10.0", "figment", - "flatbuffers", "futures", - "futures-util", - "hyper", "indicatif", - "lazy_static", "log", - "mpl-bubblegum", - "num-traits", "plerkle_messenger", - "plerkle_serialization", "rand 0.8.5", "redis", - "regex", "reqwest", - "rust-crypto", "sea-orm", "sea-query 0.28.5", "serde", "serde_json", - "solana-account-decoder", - "solana-client", - "solana-geyser-plugin-interface", - "solana-sdk", - "solana-sdk-macro", - "solana-transaction-status", - "spl-account-compression", - "spl-concurrent-merkle-tree", - "spl-token 4.0.0", "sqlx", - "stretto", "thiserror", "tokio", "tokio-postgres", "tokio-stream", - "tracing-subscriber", "url", - "uuid", ] [[package]] @@ -3202,6 +3175,7 @@ dependencies = [ "cadence-macros", "chrono", "clap 4.4.6", + "das-metadata-json", "digital_asset_types", "env_logger 0.10.0", "figment", diff --git a/metadata_json/Cargo.toml b/metadata_json/Cargo.toml index 7e5d80d28..325987dd0 100644 --- a/metadata_json/Cargo.toml +++ b/metadata_json/Cargo.toml @@ -23,15 +23,9 @@ redis = { version = "0.22.3", features = [ "tokio-native-tls-comp", ] } futures = { version = "0.3.25" } -futures-util = "0.3.27" -base64 = "0.21.0" indicatif = "0.17.5" thiserror = "1.0.31" serde_json = "1.0.81" -cadence = "0.29.0" -cadence-macros = "0.29.0" -hyper = "0.14.23" -anchor-client = "0.28.0" das-tree-backfiller = { path = "../tree_backfiller" } tokio = { version = "1.26.0", features = ["full", "tracing"] } sqlx = { version = "0.6.2", features = [ @@ -51,46 +45,21 @@ sea-orm = { version = "0.10.6", features = [ ] } sea-query = { version = "0.28.1", features = ["postgres-array"] } chrono = "0.4.19" +cadence = "0.29.0" +cadence-macros = "0.29.0" tokio-postgres = "0.7.7" serde = "1.0.136" bs58 = "0.4.0" reqwest = "0.11.11" plerkle_messenger = { version = "1.6.0", features = ['redis'] } -plerkle_serialization = { version = "1.6.0" } -flatbuffers = "23.1.21" -lazy_static = "1.4.0" -regex = "1.5.5" digital_asset_types = { path = "../digital_asset_types", features = [ "json_types", "sql_types", ] } -mpl-bubblegum = "1.0.1-beta.3" -spl-account-compression = { version = "0.2.0", features = ["no-entrypoint"] } -spl-concurrent-merkle-tree = "0.2.0" -uuid = "1.0.0" -async-trait = "0.1.53" -num-traits = "0.2.15" -blockbuster = "0.9.0-beta.1" figment = { version = "0.10.6", features = ["env", "toml", "yaml"] } -solana-sdk = "~1.16.16" -solana-client = "~1.16.16" -spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] } -solana-transaction-status = "~1.16.16" -solana-account-decoder = "~1.16.16" -solana-geyser-plugin-interface = "~1.16.16" -solana-sdk-macro = "~1.16.16" rand = "0.8.5" -rust-crypto = "0.2.36" url = "2.3.1" -anchor-lang = "0.28.0" -borsh = "~0.10.3" -stretto = { version = "0.7", features = ["async"] } tokio-stream = "0.1.12" -tracing-subscriber = { version = "0.3.16", features = [ - "json", - "env-filter", - "ansi", -] } clap = { version = "4.2.2", features = ["derive", "cargo", "env"] } [lints] diff --git a/metadata_json/README.md b/metadata_json/README.md new file mode 100644 index 000000000..5e5754297 --- /dev/null +++ b/metadata_json/README.md @@ -0,0 +1,102 @@ +# DAS Metadata JSON Indexer CLI + +## Overview +The DAS Metadata JSON Indexer CLI is a tool for indexing metadata JSON associated with tokens. It supports operations such as ingesting new metadata and backfilling missing metadata, as well as providing metrics and performance tuning options. + +## Features +- **Ingest**: Process and index new metadata JSON files with various configuration options. +- **Backfill**: Fill in missing metadata for previously indexed tokens with configurable parameters. +- **Metrics**: Collect and send metrics to a specified host and port. + +## Installation +Ensure you have Rust installed on your machine. If not, install it from [the official Rust website](https://www.rust-lang.org/). + + +``` +cargo run --bin das-metadata-json -- --help +``` + +## Usage + +### Ingest Command + +To continuously process metadata JSON, the METADATA_JSON Redis stream is monitored. Upon reading an ID from the stream, the ingest loop lookups the corresponding asset_data using the ID within the DAS DB, fetches the metadata JSON, and then updates the asset_data record with the retrieved metadata. +``` +das-metadata-json ingest [OPTIONS] --messenger-redis-url --database-url +``` + +#### Options +- `--messenger-redis-url `: The Redis URL for the messenger service. +- `--messenger-redis-batch-size `: Batch size for Redis operations (default: 100). +- `--metrics-host `: Host for sending metrics (default: 127.0.0.1). +- `--metrics-port `: Port for sending metrics (default: 8125). +- `--metrics-prefix `: Prefix for metrics (default: das.backfiller). +- `--database-url `: The database URL. +- `--database-max-connections `: Maximum database connections (default: 125). +- `--database-min-connections `: Minimum database connections (default: 5). +- `--timeout `: Timeout for operations in milliseconds (default: 1000). +- `--queue-size `: Size of the job queue (default: 1000). +- `--worker-count `: Number of worker threads (default: 100). +- `-h, --help`: Print help information. + +### Backfill Command + +To backfill any `asset_data` marked for indexing with `reindex=true`: + +``` +das-metadata-json backfill [OPTIONS] --database-url +``` + +#### Options +- `--database-url `: The database URL. +- `--database-max-connections `: Maximum database connections (default: 125). +- `--database-min-connections `: Minimum database connections (default: 5). +- `--metrics-host `: Host for sending metrics (default: 127.0.0.1). +- `--metrics-port `: Port for sending metrics (default: 8125). +- `--metrics-prefix `: Prefix for metrics (default: das.backfiller). +- `--queue-size `: Size of the job queue (default: 1000). +- `--worker-count `: Number of worker threads (default: 100). +- `--timeout `: Timeout for operations in milliseconds (default: 1000). +- `--batch-size `: Number of records to process in a single batch (default: 1000). +- `-h, --help`: Print help information. + +## Lib + +The `das-metadata-json` crate provides a `sender` module which can be integrated in a third-party service (eg `nft_ingester`) to push asset data IDs for indexing. To configure follow the steps below: + +### Configuration + +1. **Set up the `SenderArgs`:** Ensure that the `nft_ingester` is configured with the necessary `SenderArgs`. These arguments include the Redis URL, batch size, and the number of queue connections. For example: + +```rust +let sender_args = SenderArgs { +messenger_redis_url: "redis://localhost:6379".to_string(), +messenger_redis_batch_size: "100".to_string(), +messenger_queue_connections: 5, +}; +``` + +2. **Initialize the `SenderPool`:** Use the `try_from_config` async function to create a `SenderPool` instance from the `SenderArgs`. This will set up the necessary channels and messengers for communication. + +```rust +let sender_pool = SenderPool::try_from_config(sender_args).await?; +``` + +3. **Push Asset Data IDs for Indexing:** With the `SenderPool` instance, you can now push asset data IDs to be indexed using the `push` method. The IDs should be serialized into a byte array before being sent. The `asset_data` record should be written to the database before pushing its ID. + +```rust +let message = asset_data.id; + +sender_pool.push(&message).await?; +``` + +Within the `nft_ingester`, the `sender_pool` is orchestrated by the `TaskManager`. When configured appropriately, upon receiving a `DownloadMetadata` task, the `task_manager` will forego the usual process of creating a task record. Instead, it will directly push the asset ID to the `METADATA_JSON` Redis stream. This action queues the ID for processing by the `das-metadata-json` indexer, streamlining the workflow for indexing metadata JSON. + +## Configuration +The CLI can be configured using command-line options or environment variables. For options that have an associated environment variable, you can set the variable instead of passing the option on the command line. + +## Logging +Logging is managed by `env_logger`. Set the `RUST_LOG` environment variable to control the logging level, e.g., `RUST_LOG=info`. + +## Error Handling +The CLI provides error messages for any issues encountered during execution. \ No newline at end of file diff --git a/metadata_json/src/cmds/backfill.rs b/metadata_json/src/cmds/backfill.rs index 095607248..d265ec7c7 100644 --- a/metadata_json/src/cmds/backfill.rs +++ b/metadata_json/src/cmds/backfill.rs @@ -1,17 +1,13 @@ use { crate::worker::{Worker, WorkerArgs}, - backon::{ExponentialBuilder, Retryable}, clap::Parser, das_tree_backfiller::db, - das_tree_backfiller::metrics::{Metrics, MetricsArgs}, + das_tree_backfiller::metrics::{setup_metrics, MetricsArgs}, digital_asset_types::dao::asset_data, log::info, reqwest::ClientBuilder, sea_orm::{entity::*, prelude::*, query::*, EntityTrait, SqlxPostgresConnector}, - tokio::{ - sync::mpsc, - time::{Duration, Instant}, - }, + tokio::time::Duration, }; #[derive(Parser, Clone, Debug)] @@ -37,7 +33,7 @@ pub async fn run(args: BackfillArgs) -> Result<(), anyhow::Error> { let pool = db::connect(args.database).await?; - let metrics = Metrics::try_from_config(args.metrics)?; + setup_metrics(args.metrics)?; let client = ClientBuilder::new() .timeout(Duration::from_millis(args.timeout)) @@ -45,7 +41,7 @@ pub async fn run(args: BackfillArgs) -> Result<(), anyhow::Error> { let worker = Worker::from(args.worker); - let (tx, handle) = worker.start(pool.clone(), metrics.clone(), client.clone()); + let (tx, handle) = worker.start(pool.clone(), client.clone()); let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); diff --git a/metadata_json/src/cmds/ingest.rs b/metadata_json/src/cmds/ingest.rs index e2fe0ec15..728ab2706 100644 --- a/metadata_json/src/cmds/ingest.rs +++ b/metadata_json/src/cmds/ingest.rs @@ -3,12 +3,11 @@ use crate::worker::{Worker, WorkerArgs}; use clap::Parser; use das_tree_backfiller::{ db, - metrics::{Metrics, MetricsArgs}, + metrics::{setup_metrics, MetricsArgs}, }; -use digital_asset_types::dao::asset_data; use log::info; -use reqwest::{Client, ClientBuilder}; -use tokio::{sync::mpsc, time::Duration}; +use reqwest::ClientBuilder; +use tokio::time::Duration; #[derive(Parser, Clone, Debug)] pub struct IngestArgs { @@ -33,7 +32,7 @@ pub async fn run(args: IngestArgs) -> Result<(), anyhow::Error> { let pool = db::connect(args.database).await?; - let metrics = Metrics::try_from_config(args.metrics)?; + setup_metrics(args.metrics)?; let client = ClientBuilder::new() .timeout(Duration::from_millis(args.timeout)) @@ -41,7 +40,7 @@ pub async fn run(args: IngestArgs) -> Result<(), anyhow::Error> { let worker = Worker::from(args.worker); - let (tx, handle) = worker.start(pool.clone(), metrics.clone(), client.clone()); + let (tx, handle) = worker.start(pool.clone(), client.clone()); while let Ok(messages) = rx.recv().await { for message in messages.clone() { diff --git a/metadata_json/src/stream/mod.rs b/metadata_json/src/stream/mod.rs index 74ef564eb..cebe18ccc 100644 --- a/metadata_json/src/stream/mod.rs +++ b/metadata_json/src/stream/mod.rs @@ -3,3 +3,5 @@ pub mod sender; pub use receiver::*; pub use sender::*; + +pub const METADATA_JSON_STREAM: &'static str = "METADATA_JSON"; diff --git a/metadata_json/src/stream/receiver.rs b/metadata_json/src/stream/receiver.rs index 8a3bcf717..ff17556c5 100644 --- a/metadata_json/src/stream/receiver.rs +++ b/metadata_json/src/stream/receiver.rs @@ -1,12 +1,10 @@ +use super::METADATA_JSON_STREAM; use clap::Parser; use figment::value::{Dict, Value}; -use plerkle_messenger::{ - select_messenger, Messenger, MessengerConfig, MessengerError, MessengerType, RecvData, -}; +use plerkle_messenger::{select_messenger, Messenger, MessengerConfig, MessengerType, RecvData}; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; use std::sync::{Arc, Mutex}; -const METADATA_JSON_STREAM: &'static str = "METADATA_JSON"; - #[derive(Clone, Debug, Parser)] pub struct ReceiverArgs { #[arg(long, env)] @@ -15,6 +13,14 @@ pub struct ReceiverArgs { pub messenger_redis_batch_size: String, } +fn rand_string() -> String { + thread_rng() + .sample_iter(&Alphanumeric) + .take(30) + .map(char::from) + .collect() +} + impl From for MessengerConfig { fn from(args: ReceiverArgs) -> Self { let mut connection_config = Dict::new(); @@ -31,10 +37,11 @@ impl From for MessengerConfig { "pipeline_size_bytes".to_string(), Value::from(1u128.to_string()), ); + connection_config.insert("consumer_id".to_string(), Value::from(rand_string())); Self { messenger_type: MessengerType::Redis, - connection_config: connection_config, + connection_config, } } } diff --git a/metadata_json/src/stream/sender.rs b/metadata_json/src/stream/sender.rs index e69de29bb..cd2ca1315 100644 --- a/metadata_json/src/stream/sender.rs +++ b/metadata_json/src/stream/sender.rs @@ -0,0 +1,112 @@ +use super::METADATA_JSON_STREAM; +use anyhow::Result; +use clap::Parser; +use figment::value::{Dict, Value}; +use plerkle_messenger::{Messenger, MessengerConfig, MessengerType}; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; +use serde::Deserialize; +use std::num::TryFromIntError; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::sync::{mpsc::error::TrySendError, Mutex}; + +#[derive(Clone, Debug, Parser, Deserialize, PartialEq)] +pub struct SenderArgs { + #[arg(long, env)] + pub messenger_redis_url: String, + #[arg(long, env, default_value = "100")] + pub messenger_redis_batch_size: String, + #[arg(long, env, default_value = "5")] + pub messenger_queue_connections: u64, +} + +fn rand_string() -> String { + thread_rng() + .sample_iter(&Alphanumeric) + .take(30) + .map(char::from) + .collect() +} + +impl From for MessengerConfig { + fn from(args: SenderArgs) -> Self { + let mut connection_config = Dict::new(); + + connection_config.insert( + "redis_connection_str".to_string(), + Value::from(args.messenger_redis_url), + ); + connection_config.insert( + "batch_size".to_string(), + Value::from(args.messenger_redis_batch_size), + ); + connection_config.insert( + "pipeline_size_bytes".to_string(), + Value::from(1u128.to_string()), + ); + connection_config.insert("consumer_id".to_string(), Value::from(rand_string())); + + Self { + messenger_type: MessengerType::Redis, + connection_config, + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum SenderPoolError { + #[error("messenger")] + Messenger(#[from] plerkle_messenger::MessengerError), + #[error("tokio try send to channel")] + TrySendMessengerChannel(#[from] TrySendError>), + #[error("revc messenger connection")] + RecvMessengerConnection, + #[error("try from int")] + TryFromInt(#[from] TryFromIntError), + #[error("tokio send to channel")] + SendMessengerChannel(#[from] mpsc::error::SendError>), +} + +#[derive(Debug, Clone)] +pub struct SenderPool { + tx: mpsc::Sender>, + rx: Arc>>>, +} + +impl SenderPool { + #[allow(dead_code)] + pub async fn try_from_config(config: SenderArgs) -> anyhow::Result { + let size = usize::try_from(config.messenger_queue_connections)?; + let (tx, rx) = mpsc::channel(size); + + for _ in 0..config.messenger_queue_connections { + let messenger_config: MessengerConfig = config.clone().into(); + let mut messenger = plerkle_messenger::select_messenger(messenger_config).await?; + messenger.add_stream(METADATA_JSON_STREAM).await?; + messenger + .set_buffer_size(METADATA_JSON_STREAM, 10000000000000000) + .await; + + tx.try_send(messenger)?; + } + + Ok(Self { + tx, + rx: Arc::new(Mutex::new(rx)), + }) + } + #[allow(dead_code)] + pub async fn push(&self, message: &[u8]) -> Result<(), SenderPoolError> { + let mut rx = self.rx.lock().await; + let mut messenger = rx + .recv() + .await + .ok_or(SenderPoolError::RecvMessengerConnection)?; + + messenger.send(METADATA_JSON_STREAM, message).await?; + + self.tx.send(messenger).await?; + + Ok(()) + } +} diff --git a/metadata_json/src/worker.rs b/metadata_json/src/worker.rs index cffe02d6f..741a9af65 100644 --- a/metadata_json/src/worker.rs +++ b/metadata_json/src/worker.rs @@ -1,21 +1,14 @@ use { backon::{ExponentialBuilder, Retryable}, + cadence_macros::{statsd_count, statsd_time}, clap::Parser, - das_tree_backfiller::{ - db, - metrics::{Metrics, MetricsArgs}, - }, digital_asset_types::dao::asset_data, futures::{stream::FuturesUnordered, StreamExt}, indicatif::HumanDuration, - log::{debug, error, info}, - reqwest::{Client, ClientBuilder, Url}, - sea_orm::{entity::*, prelude::*, query::*, EntityTrait, SqlxPostgresConnector}, - tokio::{ - sync::mpsc, - task::JoinHandle, - time::{Duration, Instant}, - }, + log::{debug, error}, + reqwest::{Client, Url}, + sea_orm::{entity::*, prelude::*, EntityTrait, SqlxPostgresConnector}, + tokio::{sync::mpsc, task::JoinHandle, time::Instant}, }; #[derive(Parser, Clone, Debug)] @@ -52,7 +45,6 @@ impl Worker { pub fn start( &self, pool: sqlx::PgPool, - metrics: Metrics, client: Client, ) -> (mpsc::Sender>, JoinHandle<()>) { let (tx, mut rx) = mpsc::channel::>(self.queue_size); @@ -67,10 +59,9 @@ impl Worker { } let pool = pool.clone(); - let metrics = metrics.clone(); let client = client.clone(); - handlers.push(spawn_task(client, pool, metrics, asset_data)); + handlers.push(spawn_task(client, pool, asset_data)); } while let Some(_) = handlers.next().await {} @@ -80,12 +71,7 @@ impl Worker { } } -fn spawn_task( - client: Client, - pool: sqlx::PgPool, - metrics: Metrics, - asset_data: Vec, -) -> JoinHandle<()> { +fn spawn_task(client: Client, pool: sqlx::PgPool, asset_data: Vec) -> JoinHandle<()> { tokio::spawn(async move { let timing = Instant::now(); @@ -95,9 +81,36 @@ fn spawn_task( if let Err(e) = perform_metadata_json_task(client, pool, asset_data).await { error!("Asset {} {}", asset_data_id, e); - metrics.increment("ingester.bgtask.error"); + match e { + MetadataJsonTaskError::Fetch(FetchMetadataJsonError::Response { + status, + url, + .. + }) => { + let status = &status.to_string(); + let host = url.host_str().unwrap_or("unknown"); + + statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata", "status" => status, "host" => host); + } + MetadataJsonTaskError::Fetch(FetchMetadataJsonError::Parse { url, .. }) => { + let host = url.host_str().unwrap_or("unknown"); + + statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata", "host" => host); + } + MetadataJsonTaskError::Fetch(FetchMetadataJsonError::GenericReqwest(e)) => { + let host = e + .url() + .map(|url| url.host_str().unwrap_or("unknown")) + .unwrap_or("unknown"); + + statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata", "host" => host); + } + _ => { + statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata"); + } + } } else { - metrics.increment("ingester.bgtask.success"); + statsd_count!("ingester.bgtask.success", 1, "type" => "DownloadMetadata"); } debug!( @@ -106,7 +119,7 @@ fn spawn_task( HumanDuration(timing.elapsed()) ); - metrics.time("ingester.bgtask.finished", timing.elapsed()); + statsd_time!("ingester.bgtask.finished", timing.elapsed(), "type" => "DownloadMetadata"); }) } @@ -135,17 +148,12 @@ async fn perform_metadata_json_task( let metadata = fetch_metadata_json(client, &asset_data.metadata_url).await?; - let asset_data_active_model = asset_data::ActiveModel { - id: Set(asset_data.id), - metadata: Set(metadata), - reindex: Set(Some(false)), - ..Default::default() - }; - - asset_data_active_model - .update(&conn) - .await - .map_err(Into::into) + let mut asset_data: asset_data::ActiveModel = asset_data.into(); + + asset_data.metadata = Set(metadata); + asset_data.reindex = Set(Some(false)); + + asset_data.update(&conn).await.map_err(Into::into) } #[derive(thiserror::Error, Debug)] diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml index 1115020c7..44d593ab2 100644 --- a/nft_ingester/Cargo.toml +++ b/nft_ingester/Cargo.toml @@ -35,6 +35,7 @@ sea-orm = { version = "0.10.6", features = [ "with-chrono", "mock", ] } +das-metadata-json = { path = "../metadata_json" } sea-query = { version = "0.28.1", features = ["postgres-array"] } chrono = "0.4.19" tokio-postgres = "0.7.7" diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 6e11fb539..1f6b4387d 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -1,3 +1,4 @@ +use das_metadata_json::SenderArgs; use figment::{ providers::{Env, Format, Yaml}, value::Value, @@ -34,6 +35,7 @@ pub struct IngesterConfig { pub code_version: Option<&'static str>, pub background_task_runner_config: Option, pub cl_audits: Option, // save transaction logs for compressed nfts + pub metadata_json_sender: Option, } impl IngesterConfig { diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index f426f3ca4..9f2bde4c1 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -25,6 +25,8 @@ use crate::{ use cadence_macros::{is_global_default_set, statsd_count}; use chrono::Duration; use clap::{arg, command, value_parser}; +use das_metadata_json::{SenderPool, METADATA_JSON_STREAM}; +use futures_util::sink::Send; use log::{error, info}; use plerkle_messenger::{ redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_BACKFILL_STREAM, ACCOUNT_STREAM, @@ -87,9 +89,18 @@ pub async fn main() -> Result<(), IngesterError> { task_runner_config.timeout.unwrap_or(3), )), })]; - - let mut background_task_manager = - TaskManager::new(rand_string(), database_pool.clone(), bg_task_definitions); + let sender_pool = if let Some(args) = config.metadata_json_sender.clone() { + Some(SenderPool::try_from_config(args).await.unwrap()) + } else { + None + }; + + let mut background_task_manager = TaskManager::new( + rand_string(), + database_pool.clone(), + sender_pool, + bg_task_definitions, + ); // This is how we send new bg tasks let bg_task_listener = background_task_manager .start_listener(role == IngesterRole::BackgroundTaskRunner || role == IngesterRole::All); @@ -118,6 +129,11 @@ pub async fn main() -> Result<(), IngesterError> { config.messenger_config.clone(), TRANSACTION_BACKFILL_STREAM, )?; + let mut timer_metadata_json = StreamSizeTimer::new( + stream_metrics_timer, + config.messenger_config.clone(), + METADATA_JSON_STREAM, + )?; if let Some(t) = timer_acc.start::().await { tasks.spawn(t); @@ -131,6 +147,9 @@ pub async fn main() -> Result<(), IngesterError> { if let Some(t) = timer_backfiller_txn.start::().await { tasks.spawn(t); } + if let Some(t) = timer_metadata_json.start::().await { + tasks.spawn(t); + } // Stream Consumers Setup ------------------------------------- if role == IngesterRole::Ingester || role == IngesterRole::All { diff --git a/nft_ingester/src/stream.rs b/nft_ingester/src/stream.rs index 3b36397b3..e5db78d33 100644 --- a/nft_ingester/src/stream.rs +++ b/nft_ingester/src/stream.rs @@ -1,15 +1,13 @@ - use crate::{error::IngesterError, metric}; use cadence_macros::{is_global_default_set, statsd_count, statsd_gauge}; -use log::{error}; +use log::error; use plerkle_messenger::{Messenger, MessengerConfig}; use tokio::{ - task::{JoinHandle}, + task::JoinHandle, time::{self, Duration}, }; - pub struct StreamSizeTimer { interval: tokio::time::Duration, messenger_config: MessengerConfig, diff --git a/nft_ingester/src/tasks/mod.rs b/nft_ingester/src/tasks/mod.rs index eebcd1eb9..9e29658d6 100644 --- a/nft_ingester/src/tasks/mod.rs +++ b/nft_ingester/src/tasks/mod.rs @@ -3,6 +3,7 @@ use async_trait::async_trait; use cadence_macros::{is_global_default_set, statsd_count, statsd_histogram}; use chrono::{Duration, NaiveDateTime, Utc}; use crypto::{digest::Digest, sha2::Sha256}; +use das_metadata_json::SenderPool; use digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks}; use log::{debug, error, info, warn}; use sea_orm::{ @@ -97,6 +98,7 @@ pub trait IntoTaskData: Sized { pub struct TaskManager { instance_name: String, pool: Pool, + metadata_json_sender: Option, producer: Option>, registered_task_types: Arc>>, } @@ -234,6 +236,7 @@ impl TaskManager { pub fn new( instance_name: String, pool: Pool, + metadata_json_sender: Option, task_defs: Vec>, ) -> Self { let mut tasks = HashMap::new(); @@ -244,6 +247,7 @@ impl TaskManager { instance_name, pool, producer: None, + metadata_json_sender, registered_task_types: Arc::new(tasks), } } @@ -251,7 +255,6 @@ impl TaskManager { pub fn new_task_handler( pool: Pool, instance_name: String, - _name: String, task: TaskData, tasks_def: Arc>>, process_now: bool, @@ -319,9 +322,14 @@ impl TaskManager { let task_map = self.registered_task_types.clone(); let pool = self.pool.clone(); let instance_name = self.instance_name.clone(); + let sender_pool = self.metadata_json_sender.clone(); tokio::task::spawn(async move { while let Some(task) = receiver.recv().await { + let instance_name = instance_name.clone(); + let task_name = task.name; + let sender_pool = sender_pool.clone(); + if let Some(task_created_time) = task.created_at { let bus_time = Utc::now().timestamp_millis() - task_created_time.timestamp_millis(); @@ -329,30 +337,59 @@ impl TaskManager { statsd_histogram!("ingester.bgtask.bus_time", bus_time as u64, "type" => task.name); } } - let name = instance_name.clone(); + + if task_name == "DownloadMetadata" { + if let Some(sender_pool) = sender_pool { + let download_metadata_task = DownloadMetadata::from_task_data(task); + + if let Ok(download_metadata_task) = download_metadata_task { + if let Err(_) = sender_pool + .push(&download_metadata_task.asset_data_id) + .await + { + metric! { + statsd_count!("ingester.metadata_json.send.failed", 1); + } + } else { + metric! { + statsd_count!("ingester.bgtask.new", 1, "type" => task_name); + } + } + } else { + metric! { + statsd_count!("ingester.metadata_json.send.failed", 1); + } + } + + continue; + } + } + if let Ok(hash) = task.hash() { let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone()); let task_entry = tasks::Entity::find_by_id(hash.clone()) .filter(tasks::Column::Status.ne(TaskStatus::Pending)) .one(&conn) .await; + if let Ok(Some(e)) = task_entry { metric! { statsd_count!("ingester.bgtask.identical", 1, "type" => &e.task_type); } continue; } - metric! { - statsd_count!("ingester.bgtask.new", 1, "type" => &task.name); - } + TaskManager::new_task_handler( pool.clone(), - instance_name.clone(), - name, + instance_name, task, task_map.clone(), process_on_receive, ); + + metric! { + statsd_count!("ingester.bgtask.new", 1, "type" => task_name); + } } } }) diff --git a/tools/bgtask_creator/src/main.rs b/tools/bgtask_creator/src/main.rs index 7b4916559..3cc7161b6 100644 --- a/tools/bgtask_creator/src/main.rs +++ b/tools/bgtask_creator/src/main.rs @@ -2,34 +2,26 @@ use reqwest::Client; use { clap::{value_parser, Arg, ArgAction, Command}, - das_tree_backfiller::{ - backfiller::Counter, - metrics::{Metrics, MetricsArgs}, - }, digital_asset_types::dao::{ asset, asset_authority, asset_creators, asset_data, asset_grouping, sea_orm_active_enums::TaskStatus, tasks, tokens, }, futures::TryStreamExt, - indicatif::HumanDuration, log::{debug, error, info}, nft_ingester::{ config::{init_logger, rand_string, setup_config}, database::setup_database, error::IngesterError, - metrics::setup_metrics, tasks::{BgTask, DownloadMetadata, DownloadMetadataTask, IntoTaskData, TaskManager}, }, prometheus::{IntGaugeVec, Opts, Registry}, - reqwest::ClientBuilder, sea_orm::{ entity::*, query::*, DbBackend, DeleteResult, EntityTrait, JsonValue, SqlxPostgresConnector, }, solana_sdk::pubkey::Pubkey, sqlx::types::chrono::Utc, std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc, time}, - time::{Duration, Instant}, - tokio::sync::{mpsc, Semaphore}, + time::Duration, txn_forwarder::save_metrics, }; @@ -140,64 +132,6 @@ async fn main() -> anyhow::Result<()> { Command::new("create") .about("Create new background tasks for missing assets (reindex=true)"), ) - .subcommand( - Command::new("run") - .about("Reindex flagged asset_data. Query the database for all assets with reindex=true and try to fetch the metadata in a worker thread.") - .arg( - Arg::new("worker_count") - .long("worker-count") - .help("Number of worker threads to use") - .required(false) - .action(ArgAction::Set) - .value_parser(value_parser!(usize)) - .default_value("200") - ) - .arg( - Arg::new("timeout") - .long("timeout") - .help("Request time out in milliseconds for a metadata json request") - .required(false) - .action(ArgAction::Set) - .value_parser(value_parser!(u64)) - .default_value("30000") - ) - .arg( - Arg::new("queue_size") - .long("queue-size") - .help("The channel size for the worker threads") - .required(false) - .action(ArgAction::Set) - .value_parser(value_parser!(usize)) - .default_value("10000") - ) - .arg( - Arg::new("metrics_host") - .long("metrics-host") - .help("Metrics host") - .required(false) - .action(ArgAction::Set) - .value_parser(value_parser!(String)) - .default_value("127.0.0.1") - ) - .arg( - Arg::new("metrics_port") - .long("metrics-port") - .help("Metrics port") - .required(false) - .action(ArgAction::Set) - .value_parser(value_parser!(u16)) - .default_value("8125") - ) - .arg( - Arg::new("metrics_prefix") - .long("metrics-prefix") - .help("Metrics naming prefix") - .required(false) - .action(ArgAction::Set) - .value_parser(value_parser!(String)) - .default_value("das_ingester") - ) - ) .subcommand(Command::new("delete").about("Delete ALL pending background tasks")) .get_matches(); @@ -223,9 +157,6 @@ async fn main() -> anyhow::Result<()> { // Pull Env variables into config struct let config = setup_config(config_path); - // Optionally setup metrics if config demands it - setup_metrics(&config); - // One pool many clones, this thing is thread safe and send sync let database_pool = setup_database(config.clone()).await; @@ -369,135 +300,6 @@ WHERE } } } - Some(("run", sub_matches)) => { - let worker_count = sub_matches - .get_one::("worker_count") - .copied() - .ok_or(anyhow::anyhow!("Missing worker count"))?; - let timeout = sub_matches - .get_one::("timeout") - .copied() - .ok_or(anyhow::anyhow!("Missing requeest timeout"))?; - let batch_size = matches - .get_one::("batch_size") - .copied() - .ok_or(anyhow::anyhow!("Missing batch size"))?; - let queue_size = sub_matches - .get_one::("queue_size") - .copied() - .ok_or(anyhow::anyhow!("Missing queue size"))?; - let metrics_host = sub_matches - .get_one::("metrics_host") - .ok_or(anyhow::anyhow!("Missing metrics host"))? - .to_string(); - let metrics_port = sub_matches - .get_one::("metrics_port") - .copied() - .ok_or(anyhow::anyhow!("Missing metrics port"))?; - let metrics_prefix = sub_matches - .get_one::("metrics_prefix") - .ok_or(anyhow::anyhow!("Missing metrics prefix"))? - .to_string(); - - let config_path = matches.get_one::("config"); - if let Some(config_path) = config_path { - info!("Loading config from: {}", config_path.display()); - } - - let config = setup_config(config_path); - - setup_metrics(&config); - - let metrics_config = MetricsArgs { - metrics_host, - metrics_port, - metrics_prefix, - }; - let metrics = Metrics::try_from_config(metrics_config)?; - - let pool = setup_database(config.clone()).await; - - let client = ClientBuilder::new() - .timeout(Duration::from_millis(timeout)) - .build()?; - let client = Arc::new(client); - - let condition = asset_data::Column::Reindex.eq(true); - let asset_data = find_by_type(authority, collection, creator, mint, condition); - - let mut asset_data_missing = asset_data - .0 - .order_by(asset_data::Column::Id, Order::Asc) - .paginate(&conn, batch_size) - .into_stream(); - - let (tx, mut rx) = mpsc::channel::(queue_size); - - let count = Counter::new(); - let task_count = count.clone(); - - tokio::spawn(async move { - let semaphore = Arc::new(Semaphore::new(worker_count)); - - while let Some(asset_data) = rx.recv().await { - let semaphore = semaphore.clone(); - let count = task_count.clone(); - let client = Arc::clone(&client); - let pool = pool.clone(); - let metrics = metrics.clone(); - - tokio::spawn(async move { - count.increment(); - - let _permit = semaphore.acquire().await?; - - let timing = Instant::now(); - - let asset_data_id = asset_data.id.clone(); - let asset_data_id = bs58::encode(asset_data_id).into_string(); - - if let Err(e) = perform_metadata_json_task(&client, pool, asset_data).await - { - error!("Asset {} metadata json task: {}", asset_data_id, e); - - metrics.increment("ingester.bgtask.error"); - } else { - metrics.increment("ingester.bgtask.success"); - } - - debug!( - "Asset {} finished in {}", - asset_data_id, - HumanDuration(timing.elapsed()) - ); - - count.decrement(); - - Ok::<(), anyhow::Error>(()) - }); - } - - Ok::<(), anyhow::Error>(()) - }); - - while let Some(assets) = asset_data_missing.try_next().await? { - let assets_count = assets.len(); - - for asset in assets { - tx.send(asset).await?; - } - - if assets_count < usize::try_from(batch_size)? { - break; - } - } - - info!("Waiting for tasks to finish"); - count.zero().await; - - info!("Tasks finished"); - () - } Some(("create", _)) => { // @TODO : add a delete option that first deletes all matching tasks to the criteria or condition @@ -554,7 +356,6 @@ WHERE let res = TaskManager::new_task_handler( database_pool.clone(), name.clone(), - name, task_data, task_map.clone(), false, diff --git a/tree_backfiller/src/backfiller.rs b/tree_backfiller/src/backfiller.rs index b70cf0637..4ec028e6f 100644 --- a/tree_backfiller/src/backfiller.rs +++ b/tree_backfiller/src/backfiller.rs @@ -1,11 +1,12 @@ use crate::{ db, - metrics::{Metrics, MetricsArgs}, + metrics::{setup_metrics, MetricsArgs}, queue, rpc::{Rpc, SolanaRpcArgs}, tree::{self, TreeGapFill, TreeGapModel}, }; use anyhow::Result; +use cadence_macros::{statsd_count, statsd_time}; use clap::{Parser, ValueEnum}; use digital_asset_types::dao::cl_audits_v2; use indicatif::HumanDuration; @@ -136,10 +137,7 @@ pub async fn run(config: Args) -> Result<()> { let transaction_solana_rpc = solana_rpc.clone(); let gap_solana_rpc = solana_rpc.clone(); - let metrics = Metrics::try_from_config(config.metrics)?; - let tree_metrics = metrics.clone(); - let transaction_metrics = metrics.clone(); - let gap_metrics = metrics.clone(); + setup_metrics(config.metrics)?; let (sig_sender, mut sig_receiver) = mpsc::channel::(config.signature_channel_size); let (gap_sender, mut gap_receiver) = mpsc::channel::(config.gap_channel_size); @@ -157,7 +155,6 @@ pub async fn run(config: Args) -> Result<()> { while let Some(signature) = sig_receiver.recv().await { let solana_rpc = transaction_solana_rpc.clone(); - let metrics = transaction_metrics.clone(); let queue = queue.clone(); let semaphore = semaphore.clone(); let count = transaction_worker_transaction_count.clone(); @@ -171,12 +168,12 @@ pub async fn run(config: Args) -> Result<()> { if let Err(e) = tree::transaction(&solana_rpc, queue, signature).await { error!("tree transaction: {:?}", e); - metrics.increment("transaction.failed"); + statsd_count!("transaction.failed", 1); } else { - metrics.increment("transaction.succeeded"); + statsd_count!("transaction.succeeded", 1); } - metrics.time("transaction.queued", timing.elapsed()); + statsd_time!("transaction.queued", timing.elapsed()); count.decrement(); @@ -192,7 +189,6 @@ pub async fn run(config: Args) -> Result<()> { while let Some(gap) = gap_receiver.recv().await { let solana_rpc = gap_solana_rpc.clone(); - let metrics = gap_metrics.clone(); let sig_sender = sig_sender.clone(); let semaphore = semaphore.clone(); let count = gap_worker_gap_count.clone(); @@ -206,12 +202,13 @@ pub async fn run(config: Args) -> Result<()> { if let Err(e) = gap.crawl(&solana_rpc, sig_sender).await { error!("tree transaction: {:?}", e); - metrics.increment("gap.failed"); + + statsd_count!("gap.failed", 1); } else { - metrics.increment("gap.succeeded"); + statsd_count!("gap.succeeded", 1); } - metrics.time("gap.queued", timing.elapsed()); + statsd_time!("gap.queued", timing.elapsed()); count.decrement(); @@ -243,7 +240,6 @@ pub async fn run(config: Args) -> Result<()> { for tree in trees { let semaphore = semaphore.clone(); let gap_sender = gap_sender.clone(); - let metrics = tree_metrics.clone(); let pool = pool.clone(); let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); @@ -299,15 +295,15 @@ pub async fn run(config: Args) -> Result<()> { for gap in gaps { if let Err(e) = gap_sender.send(gap).await { - metrics.increment("gap.failed"); + statsd_count!("gap.failed", 1); error!("send gap: {:?}", e); } } info!("crawling tree {} with {} gaps", tree.pubkey, gap_count); - metrics.increment("tree.succeeded"); - metrics.time("tree.crawled", timing.elapsed()); + statsd_count!("tree.succeeded", 1); + statsd_time!("tree.crawled", timing.elapsed()); Ok::<(), anyhow::Error>(()) }); @@ -324,7 +320,7 @@ pub async fn run(config: Args) -> Result<()> { transaction_count.zero().await; info!("all transactions queued"); - metrics.time("job.completed", started.elapsed()); + statsd_time!("job.completed", started.elapsed()); info!( "crawled {} trees in {}", diff --git a/tree_backfiller/src/metrics.rs b/tree_backfiller/src/metrics.rs index 91d60d570..a5ac4f4f6 100644 --- a/tree_backfiller/src/metrics.rs +++ b/tree_backfiller/src/metrics.rs @@ -1,9 +1,8 @@ use anyhow::Result; -use cadence::{BufferedUdpMetricSink, Counted, Gauged, QueuingMetricSink, StatsdClient, Timed}; +use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient, Timed}; +use cadence_macros::set_global_default; use clap::Parser; -use log::error; -use std::time::Duration; -use std::{net::UdpSocket, sync::Arc}; +use std::net::UdpSocket; #[derive(Clone, Parser, Debug)] pub struct MetricsArgs { @@ -15,32 +14,18 @@ pub struct MetricsArgs { pub metrics_prefix: String, } -#[derive(Clone, Debug)] -pub struct Metrics(Arc); +pub fn setup_metrics(config: MetricsArgs) -> Result<()> { + let host = (config.metrics_host, config.metrics_port); -impl Metrics { - pub fn try_from_config(config: MetricsArgs) -> Result { - let host = (config.metrics_host, config.metrics_port); + let socket = UdpSocket::bind("0.0.0.0:0")?; + socket.set_nonblocking(true)?; - let socket = UdpSocket::bind("0.0.0.0:0")?; - socket.set_nonblocking(true)?; + let udp_sink = BufferedUdpMetricSink::from(host, socket)?; + let queuing_sink = QueuingMetricSink::from(udp_sink); - let udp_sink = BufferedUdpMetricSink::from(host, socket)?; - let queuing_sink = QueuingMetricSink::from(udp_sink); - let client = StatsdClient::from_sink(&config.metrics_prefix, queuing_sink); + let client = StatsdClient::from_sink(&config.metrics_prefix, queuing_sink); - Ok(Metrics(Arc::new(client))) - } + set_global_default(client); - pub fn time(&self, key: &str, duration: Duration) { - if let Err(e) = self.0.time(key, duration) { - error!("submitting time: {:?}", e) - } - } - - pub fn increment(&self, key: &str) { - if let Err(e) = self.0.count(key, 1) { - error!("submitting increment: {:?}", e) - } - } + Ok(()) }