Skip to content

Commit

Permalink
Merge pull request #163 from rpcpool/espi/das-bubblegum-tree
Browse files Browse the repository at this point in the history
[Feat] Ops cli with tree backfiller
  • Loading branch information
kespinola authored Feb 6, 2024
2 parents 39cbb71 + 0846b2b commit d4c912d
Show file tree
Hide file tree
Showing 17 changed files with 1,280 additions and 13 deletions.
93 changes: 93 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 16 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
[workspace]
members = [
"core",
"das_api",
"digital_asset_types",
"metaplex-rpc-proxy",
"migration",
"nft_ingester",
"ops",
"tools/acc_forwarder",
"tools/bgtask_creator",
"tools/fetch_trees",
Expand Down Expand Up @@ -36,8 +38,9 @@ cadence = "0.29.0"
cadence-macros = "0.29.0"
chrono = "0.4.19"
clap = "4.2.2"
das_api = {path = "das_api"}
digital_asset_types = {path = "digital_asset_types"}
das_api = { path = "das_api" }
das-core = { path = "core" }
digital_asset_types = { path = "digital_asset_types" }
enum-iterator = "1.2.0"
enum-iterator-derive = "1.1.0"
env_logger = "0.10.0"
Expand All @@ -50,21 +53,21 @@ futures-util = "0.3.27"
hex = "0.4.3"
hyper = "0.14.23"
indexmap = "1.9.3"
insta = {version = "1.34.0", features = ["json"]}
insta = { version = "1.34.0", features = ["json"] }
itertools = "0.10.1"
jsonpath_lib = "0.3.0"
jsonrpsee = "0.16.2"
jsonrpsee-core = "0.16.2"
lazy_static = "1.4.0"
log = "0.4.17"
metrics = "0.20.1"
migration = {path = "migration"}
migration = { path = "migration" }
mime_guess = "2.0.4"
mpl-bubblegum = "1.2.0"
mpl-token-metadata = "4.1.1"
nft_ingester = {path = "nft_ingester"}
nft_ingester = { path = "nft_ingester" }
num-derive = "0.3.3"
num-integer = {version = "0.1.44", default_features = false}
num-integer = { version = "0.1.44", default_features = false }
num-traits = "0.2.15"
once_cell = "1.19.0"
open-rpc-derive = "0.0.4"
Expand All @@ -80,13 +83,13 @@ reqwest = "0.11.13"
rust-crypto = "0.2.36"
schemars = "0.8.6"
schemars_derive = "0.8.6"
sea-orm = {version = "0.10.6", features = [
sea-orm = { version = "0.10.6", features = [
"macros",
"runtime-tokio-rustls",
"sqlx-postgres",
"with-chrono",
"mock",
]}
] }
sea-orm-migration = "0.10.6"
sea-query = "0.28.1"
serde = "1.0.137"
Expand All @@ -103,25 +106,25 @@ spl-account-compression = "0.2.0"
spl-associated-token-account = ">= 1.1.3, < 3.0"
spl-concurrent-merkle-tree = "0.2.0"
spl-noop = "0.2.0"
spl-token = {version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"]}
sqlx = {version = "0.6.2", features = [
spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] }
sqlx = { version = "0.6.2", features = [
"macros",
"runtime-tokio-rustls",
"postgres",
"uuid",
"offline",
"json",
]}
] }
stretto = "0.7.2"
thiserror = "1.0.31"
tokio = {version = "1.30.0", features = ["macros", "rt-multi-thread"]}
tokio = { version = "1.30.0", features = ["macros", "rt-multi-thread"] }
tokio-postgres = "0.7.7"
tokio-stream = "0.1.14"
tower = "0.4.13"
tower-http = "0.3.5"
tracing = "0.1.35"
tracing-subscriber = "0.3.16"
txn_forwarder = {path = "tools/txn_forwarder"}
txn_forwarder = { path = "tools/txn_forwarder" }
url = "2.3.1"
uuid = "1.0.0"
wasi = "0.7.0"
Expand Down
18 changes: 18 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "das-core"
version.workspace = true
edition.workspace = true
repository.workspace = true
publish.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = { workspace = true }
anyhow = { workspace = true }
sqlx = { workspace = true }
cadence = { workspace = true }
cadence-macros = { workspace = true }

[lints]
workspace = true
38 changes: 38 additions & 0 deletions core/src/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use anyhow::Result;
use clap::Parser;
use sqlx::{
postgres::{PgConnectOptions, PgPoolOptions},
PgPool,
};

#[derive(Debug, Parser, Clone)]
pub struct PoolArgs {
/// The database URL.
#[arg(long, env)]
pub database_url: String,
/// The maximum number of connections to the database.
#[arg(long, env, default_value = "125")]
pub database_max_connections: u32,
/// The minimum number of connections to the database.
#[arg(long, env, default_value = "5")]
pub database_min_connections: u32,
}

///// Establishes a connection to the database using the provided configuration.
/////
///// # Arguments
/////
///// * `config` - A `PoolArgs` struct containing the database URL and the minimum and maximum number of connections.
/////
///// # Returns
/////
///// * `Result<DatabaseConnection, DbErr>` - On success, returns a `DatabaseConnection`. On failure, returns a `DbErr`.
pub async fn connect_db(config: PoolArgs) -> Result<PgPool, sqlx::Error> {
let options: PgConnectOptions = config.database_url.parse()?;

PgPoolOptions::new()
.min_connections(config.database_min_connections)
.max_connections(config.database_max_connections)
.connect_with(options)
.await
}
5 changes: 5 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod db;
mod metrics;

pub use db::*;
pub use metrics::*;
31 changes: 31 additions & 0 deletions core/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use anyhow::Result;
use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient};
use cadence_macros::set_global_default;
use clap::Parser;
use std::net::UdpSocket;

#[derive(Clone, Parser, Debug)]
pub struct MetricsArgs {
#[arg(long, env, default_value = "127.0.0.1")]
pub metrics_host: String,
#[arg(long, env, default_value = "8125")]
pub metrics_port: u16,
#[arg(long, env, default_value = "das.backfiller")]
pub metrics_prefix: String,
}

pub fn setup_metrics(config: MetricsArgs) -> Result<()> {
let host = (config.metrics_host, config.metrics_port);

let socket = UdpSocket::bind("0.0.0.0:0")?;
socket.set_nonblocking(true)?;

let udp_sink = BufferedUdpMetricSink::from(host, socket)?;
let queuing_sink = QueuingMetricSink::from(udp_sink);

let client = StatsdClient::from_sink(&config.metrics_prefix, queuing_sink);

set_global_default(client);

Ok(())
}
2 changes: 2 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod m20240104_203133_add_cl_audits_v2;
mod m20240104_203328_remove_cl_audits;
mod m20240116_130744_add_update_metadata_ix;
mod m20240117_120101_alter_creator_indices;
mod m20240124_173104_add_tree_seq_index_to_cl_audits_v2;

pub mod model;

Expand Down Expand Up @@ -81,6 +82,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240104_203328_remove_cl_audits::Migration),
Box::new(m20240116_130744_add_update_metadata_ix::Migration),
Box::new(m20240117_120101_alter_creator_indices::Migration),
Box::new(m20240124_173104_add_tree_seq_index_to_cl_audits_v2::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use super::model::table::ClAuditsV2;
use sea_orm::{ConnectionTrait, DatabaseBackend, Statement};
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let conn = manager.get_connection();

conn.execute(Statement::from_string(
DatabaseBackend::Postgres,
"CREATE INDEX CONCURRENTLY IF NOT EXISTS tree_seq_idx ON cl_audits_v2 (tree, seq);"
.to_string(),
))
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(
Index::drop()
.name("tree_seq_idx")
.table(ClAuditsV2::Table)
.to_owned(),
)
.await?;

Ok(())
}
}
Loading

0 comments on commit d4c912d

Please sign in to comment.