Skip to content

Commit

Permalink
feat(metadata_json): add new crate for continuesly process messages a…
Browse files Browse the repository at this point in the history
…nd do backfill.
  • Loading branch information
kespinola committed Jan 17, 2024
1 parent f4dbe2c commit 686a2b2
Show file tree
Hide file tree
Showing 17 changed files with 382 additions and 374 deletions.
28 changes: 1 addition & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 2 additions & 33 deletions metadata_json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,9 @@ redis = { version = "0.22.3", features = [
"tokio-native-tls-comp",
] }
futures = { version = "0.3.25" }
futures-util = "0.3.27"
base64 = "0.21.0"
indicatif = "0.17.5"
thiserror = "1.0.31"
serde_json = "1.0.81"
cadence = "0.29.0"
cadence-macros = "0.29.0"
hyper = "0.14.23"
anchor-client = "0.28.0"
das-tree-backfiller = { path = "../tree_backfiller" }
tokio = { version = "1.26.0", features = ["full", "tracing"] }
sqlx = { version = "0.6.2", features = [
Expand All @@ -51,46 +45,21 @@ sea-orm = { version = "0.10.6", features = [
] }
sea-query = { version = "0.28.1", features = ["postgres-array"] }
chrono = "0.4.19"
cadence = "0.29.0"
cadence-macros = "0.29.0"
tokio-postgres = "0.7.7"
serde = "1.0.136"
bs58 = "0.4.0"
reqwest = "0.11.11"
plerkle_messenger = { version = "1.6.0", features = ['redis'] }
plerkle_serialization = { version = "1.6.0" }
flatbuffers = "23.1.21"
lazy_static = "1.4.0"
regex = "1.5.5"
digital_asset_types = { path = "../digital_asset_types", features = [
"json_types",
"sql_types",
] }
mpl-bubblegum = "1.0.1-beta.3"
spl-account-compression = { version = "0.2.0", features = ["no-entrypoint"] }
spl-concurrent-merkle-tree = "0.2.0"
uuid = "1.0.0"
async-trait = "0.1.53"
num-traits = "0.2.15"
blockbuster = "0.9.0-beta.1"
figment = { version = "0.10.6", features = ["env", "toml", "yaml"] }
solana-sdk = "~1.16.16"
solana-client = "~1.16.16"
spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] }
solana-transaction-status = "~1.16.16"
solana-account-decoder = "~1.16.16"
solana-geyser-plugin-interface = "~1.16.16"
solana-sdk-macro = "~1.16.16"
rand = "0.8.5"
rust-crypto = "0.2.36"
url = "2.3.1"
anchor-lang = "0.28.0"
borsh = "~0.10.3"
stretto = { version = "0.7", features = ["async"] }
tokio-stream = "0.1.12"
tracing-subscriber = { version = "0.3.16", features = [
"json",
"env-filter",
"ansi",
] }
clap = { version = "4.2.2", features = ["derive", "cargo", "env"] }

[lints]
Expand Down
102 changes: 102 additions & 0 deletions metadata_json/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# DAS Metadata JSON Indexer CLI

## Overview
The DAS Metadata JSON Indexer CLI is a tool for indexing metadata JSON associated with tokens. It supports operations such as ingesting new metadata and backfilling missing metadata, as well as providing metrics and performance tuning options.

## Features
- **Ingest**: Process and index new metadata JSON files with various configuration options.
- **Backfill**: Fill in missing metadata for previously indexed tokens with configurable parameters.
- **Metrics**: Collect and send metrics to a specified host and port.

## Installation
Ensure you have Rust installed on your machine. If not, install it from [the official Rust website](https://www.rust-lang.org/).


```
cargo run --bin das-metadata-json -- --help
```

## Usage

### Ingest Command

To continuously process metadata JSON, the METADATA_JSON Redis stream is monitored. Upon reading an ID from the stream, the ingest loop lookups the corresponding asset_data using the ID within the DAS DB, fetches the metadata JSON, and then updates the asset_data record with the retrieved metadata.
```
das-metadata-json ingest [OPTIONS] --messenger-redis-url <MESSENGER_REDIS_URL> --database-url <DATABASE_URL>
```

#### Options
- `--messenger-redis-url <MESSENGER_REDIS_URL>`: The Redis URL for the messenger service.
- `--messenger-redis-batch-size <MESSENGER_REDIS_BATCH_SIZE>`: Batch size for Redis operations (default: 100).
- `--metrics-host <METRICS_HOST>`: Host for sending metrics (default: 127.0.0.1).
- `--metrics-port <METRICS_PORT>`: Port for sending metrics (default: 8125).
- `--metrics-prefix <METRICS_PREFIX>`: Prefix for metrics (default: das.backfiller).
- `--database-url <DATABASE_URL>`: The database URL.
- `--database-max-connections <DATABASE_MAX_CONNECTIONS>`: Maximum database connections (default: 125).
- `--database-min-connections <DATABASE_MIN_CONNECTIONS>`: Minimum database connections (default: 5).
- `--timeout <TIMEOUT>`: Timeout for operations in milliseconds (default: 1000).
- `--queue-size <QUEUE_SIZE>`: Size of the job queue (default: 1000).
- `--worker-count <WORKER_COUNT>`: Number of worker threads (default: 100).
- `-h, --help`: Print help information.

### Backfill Command

To backfill any `asset_data` marked for indexing with `reindex=true`:

```
das-metadata-json backfill [OPTIONS] --database-url <DATABASE_URL>
```

#### Options
- `--database-url <DATABASE_URL>`: The database URL.
- `--database-max-connections <DATABASE_MAX_CONNECTIONS>`: Maximum database connections (default: 125).
- `--database-min-connections <DATABASE_MIN_CONNECTIONS>`: Minimum database connections (default: 5).
- `--metrics-host <METRICS_HOST>`: Host for sending metrics (default: 127.0.0.1).
- `--metrics-port <METRICS_PORT>`: Port for sending metrics (default: 8125).
- `--metrics-prefix <METRICS_PREFIX>`: Prefix for metrics (default: das.backfiller).
- `--queue-size <QUEUE_SIZE>`: Size of the job queue (default: 1000).
- `--worker-count <WORKER_COUNT>`: Number of worker threads (default: 100).
- `--timeout <TIMEOUT>`: Timeout for operations in milliseconds (default: 1000).
- `--batch-size <BATCH_SIZE>`: Number of records to process in a single batch (default: 1000).
- `-h, --help`: Print help information.

## Lib

The `das-metadata-json` crate provides a `sender` module which can be integrated in a third-party service (eg `nft_ingester`) to push asset data IDs for indexing. To configure follow the steps below:

### Configuration

1. **Set up the `SenderArgs`:** Ensure that the `nft_ingester` is configured with the necessary `SenderArgs`. These arguments include the Redis URL, batch size, and the number of queue connections. For example:

```rust
let sender_args = SenderArgs {
messenger_redis_url: "redis://localhost:6379".to_string(),
messenger_redis_batch_size: "100".to_string(),
messenger_queue_connections: 5,
};
```

2. **Initialize the `SenderPool`:** Use the `try_from_config` async function to create a `SenderPool` instance from the `SenderArgs`. This will set up the necessary channels and messengers for communication.

```rust
let sender_pool = SenderPool::try_from_config(sender_args).await?;
```

3. **Push Asset Data IDs for Indexing:** With the `SenderPool` instance, you can now push asset data IDs to be indexed using the `push` method. The IDs should be serialized into a byte array before being sent. The `asset_data` record should be written to the database before pushing its ID.

```rust
let message = asset_data.id;

sender_pool.push(&message).await?;
```

Within the `nft_ingester`, the `sender_pool` is orchestrated by the `TaskManager`. When configured appropriately, upon receiving a `DownloadMetadata` task, the `task_manager` will forego the usual process of creating a task record. Instead, it will directly push the asset ID to the `METADATA_JSON` Redis stream. This action queues the ID for processing by the `das-metadata-json` indexer, streamlining the workflow for indexing metadata JSON.

## Configuration
The CLI can be configured using command-line options or environment variables. For options that have an associated environment variable, you can set the variable instead of passing the option on the command line.

## Logging
Logging is managed by `env_logger`. Set the `RUST_LOG` environment variable to control the logging level, e.g., `RUST_LOG=info`.

## Error Handling
The CLI provides error messages for any issues encountered during execution.
12 changes: 4 additions & 8 deletions metadata_json/src/cmds/backfill.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use {
crate::worker::{Worker, WorkerArgs},
backon::{ExponentialBuilder, Retryable},
clap::Parser,
das_tree_backfiller::db,
das_tree_backfiller::metrics::{Metrics, MetricsArgs},
das_tree_backfiller::metrics::{setup_metrics, MetricsArgs},
digital_asset_types::dao::asset_data,
log::info,
reqwest::ClientBuilder,
sea_orm::{entity::*, prelude::*, query::*, EntityTrait, SqlxPostgresConnector},
tokio::{
sync::mpsc,
time::{Duration, Instant},
},
tokio::time::Duration,
};

#[derive(Parser, Clone, Debug)]
Expand All @@ -37,15 +33,15 @@ pub async fn run(args: BackfillArgs) -> Result<(), anyhow::Error> {

let pool = db::connect(args.database).await?;

let metrics = Metrics::try_from_config(args.metrics)?;
setup_metrics(args.metrics)?;

let client = ClientBuilder::new()
.timeout(Duration::from_millis(args.timeout))
.build()?;

let worker = Worker::from(args.worker);

let (tx, handle) = worker.start(pool.clone(), metrics.clone(), client.clone());
let (tx, handle) = worker.start(pool.clone(), client.clone());

let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);

Expand Down
11 changes: 5 additions & 6 deletions metadata_json/src/cmds/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use crate::worker::{Worker, WorkerArgs};
use clap::Parser;
use das_tree_backfiller::{
db,
metrics::{Metrics, MetricsArgs},
metrics::{setup_metrics, MetricsArgs},
};
use digital_asset_types::dao::asset_data;
use log::info;
use reqwest::{Client, ClientBuilder};
use tokio::{sync::mpsc, time::Duration};
use reqwest::ClientBuilder;
use tokio::time::Duration;

#[derive(Parser, Clone, Debug)]
pub struct IngestArgs {
Expand All @@ -33,15 +32,15 @@ pub async fn run(args: IngestArgs) -> Result<(), anyhow::Error> {

let pool = db::connect(args.database).await?;

let metrics = Metrics::try_from_config(args.metrics)?;
setup_metrics(args.metrics)?;

let client = ClientBuilder::new()
.timeout(Duration::from_millis(args.timeout))
.build()?;

let worker = Worker::from(args.worker);

let (tx, handle) = worker.start(pool.clone(), metrics.clone(), client.clone());
let (tx, handle) = worker.start(pool.clone(), client.clone());

while let Ok(messages) = rx.recv().await {
for message in messages.clone() {
Expand Down
2 changes: 2 additions & 0 deletions metadata_json/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ pub mod sender;

pub use receiver::*;
pub use sender::*;

pub const METADATA_JSON_STREAM: &'static str = "METADATA_JSON";
19 changes: 13 additions & 6 deletions metadata_json/src/stream/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use super::METADATA_JSON_STREAM;
use clap::Parser;
use figment::value::{Dict, Value};
use plerkle_messenger::{
select_messenger, Messenger, MessengerConfig, MessengerError, MessengerType, RecvData,
};
use plerkle_messenger::{select_messenger, Messenger, MessengerConfig, MessengerType, RecvData};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::sync::{Arc, Mutex};

const METADATA_JSON_STREAM: &'static str = "METADATA_JSON";

#[derive(Clone, Debug, Parser)]
pub struct ReceiverArgs {
#[arg(long, env)]
Expand All @@ -15,6 +13,14 @@ pub struct ReceiverArgs {
pub messenger_redis_batch_size: String,
}

fn rand_string() -> String {
thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect()
}

impl From<ReceiverArgs> for MessengerConfig {
fn from(args: ReceiverArgs) -> Self {
let mut connection_config = Dict::new();
Expand All @@ -31,10 +37,11 @@ impl From<ReceiverArgs> for MessengerConfig {
"pipeline_size_bytes".to_string(),
Value::from(1u128.to_string()),
);
connection_config.insert("consumer_id".to_string(), Value::from(rand_string()));

Self {
messenger_type: MessengerType::Redis,
connection_config: connection_config,
connection_config,
}
}
}
Expand Down
Loading

0 comments on commit 686a2b2

Please sign in to comment.