Skip to content

Commit

Permalink
feat(metadata_json): add single command to index a metadata json in t…
Browse files Browse the repository at this point in the history
…he main thread for a mint (#129)
  • Loading branch information
kespinola authored Feb 19, 2024
1 parent 63008f1 commit 417f71b
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 21 deletions.
5 changes: 4 additions & 1 deletion integration_tests/tests/integration_tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ pub async fn apply_migrations_and_delete_data(db: Arc<DatabaseConnection>) {

async fn load_ingest_program_transformer(pool: sqlx::Pool<sqlx::Postgres>) -> ProgramTransformer {
// HACK: We don't really use this background task handler but we need it to create the sender
let mut background_task_manager = TaskManager::new(rand_string(), pool.clone(), vec![]);
let mut background_task_manager =
TaskManager::try_new_async(rand_string(), pool.clone(), None, vec![])
.await
.unwrap();
background_task_manager.start_listener(true);
let bg_task_sender = background_task_manager.get_sender().unwrap();
ProgramTransformer::new(pool, bg_task_sender, false)
Expand Down
1 change: 1 addition & 0 deletions metadata_json/src/cmds/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod backfill;
pub mod ingest;
pub mod single;
63 changes: 63 additions & 0 deletions metadata_json/src/cmds/single.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use crate::worker::{perform_metadata_json_task, FetchMetadataJsonError, MetadataJsonTaskError};
use cadence_macros::statsd_count;
use clap::Parser;
use das_tree_backfiller::{
db,
metrics::{setup_metrics, MetricsArgs},
};
use log::{error, info};
use reqwest::ClientBuilder;
use tokio::time::Duration;

#[derive(Parser, Clone, Debug)]
pub struct SingleArgs {
#[clap(flatten)]
metrics: MetricsArgs,

#[clap(flatten)]
database: db::PoolArgs,

#[arg(long, default_value = "1000")]
timeout: u64,

mint: String, // Accept mint as an argument
}

pub async fn run(args: SingleArgs) -> Result<(), anyhow::Error> {
let pool = db::connect(args.database).await?;

setup_metrics(args.metrics)?;

let asset_data = bs58::decode(args.mint.as_str()).into_vec()?;

let client = ClientBuilder::new()
.timeout(Duration::from_millis(args.timeout))
.build()?;

if let Err(e) = perform_metadata_json_task(client, pool, asset_data).await {
error!("Asset {} {}", args.mint, e);

match e {
MetadataJsonTaskError::Fetch(FetchMetadataJsonError::Response { status, .. }) => {
let status = &status.to_string();

statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata", "status" => status);
}
MetadataJsonTaskError::Fetch(FetchMetadataJsonError::Parse { .. }) => {
statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata");
}
MetadataJsonTaskError::Fetch(FetchMetadataJsonError::GenericReqwest(_e)) => {
statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata");
}
_ => {
statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata");
}
}
} else {
statsd_count!("ingester.bgtask.success", 1, "type" => "DownloadMetadata");
}

info!("Ingesting stopped");

Ok(())
}
4 changes: 3 additions & 1 deletion metadata_json/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod cmds;
mod stream;
mod worker;

use cmds::{backfill, ingest};
use cmds::{backfill, ingest, single};

#[derive(Parser)]
#[command(author, about, next_line_help = true)]
Expand All @@ -18,6 +18,7 @@ enum Action {
Ingest(ingest::IngestArgs),
Backfill(backfill::BackfillArgs),
Report,
Single(single::SingleArgs),
}

#[tokio::main]
Expand All @@ -29,6 +30,7 @@ async fn main() -> Result<(), anyhow::Error> {
match args.action {
Action::Ingest(args) => ingest::run(args).await,
Action::Backfill(args) => backfill::run(args).await,
Action::Single(args) => single::run(args).await,
Action::Report => {
println!("Report");
Ok(())
Expand Down
28 changes: 9 additions & 19 deletions metadata_json/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,27 +83,17 @@ fn spawn_task(client: Client, pool: sqlx::PgPool, asset_data: Vec<u8>) -> JoinHa

match e {
MetadataJsonTaskError::Fetch(FetchMetadataJsonError::Response {
status,
url,
..
status, ..
}) => {
let status = &status.to_string();
let host = url.host_str().unwrap_or("unknown");

statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata", "status" => status, "host" => host);
statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata", "status" => status);
}
MetadataJsonTaskError::Fetch(FetchMetadataJsonError::Parse { url, .. }) => {
let host = url.host_str().unwrap_or("unknown");

statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata", "host" => host);
MetadataJsonTaskError::Fetch(FetchMetadataJsonError::Parse { .. }) => {
statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata");
}
MetadataJsonTaskError::Fetch(FetchMetadataJsonError::GenericReqwest(e)) => {
let host = e
.url()
.map(|url| url.host_str().unwrap_or("unknown"))
.unwrap_or("unknown");

statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata", "host" => host);
MetadataJsonTaskError::Fetch(FetchMetadataJsonError::GenericReqwest(_e)) => {
statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata");
}
_ => {
statsd_count!("ingester.bgtask.error", 1, "type" => "DownloadMetadata");
Expand All @@ -124,7 +114,7 @@ fn spawn_task(client: Client, pool: sqlx::PgPool, asset_data: Vec<u8>) -> JoinHa
}

#[derive(thiserror::Error, Debug)]
enum MetadataJsonTaskError {
pub enum MetadataJsonTaskError {
#[error("sea orm: {0}")]
SeaOrm(#[from] sea_orm::DbErr),
#[error("metadata json: {0}")]
Expand All @@ -133,7 +123,7 @@ enum MetadataJsonTaskError {
AssetNotFound,
}

async fn perform_metadata_json_task(
pub async fn perform_metadata_json_task(
client: Client,
pool: sqlx::PgPool,
asset_data: Vec<u8>,
Expand All @@ -157,7 +147,7 @@ async fn perform_metadata_json_task(
}

#[derive(thiserror::Error, Debug)]
enum FetchMetadataJsonError {
pub enum FetchMetadataJsonError {
#[error("reqwest: {0}")]
GenericReqwest(#[from] reqwest::Error),
#[error("json parse for url({url}) with {source}")]
Expand Down

0 comments on commit 417f71b

Please sign in to comment.