diff --git a/common/models/src/ns_api.rs b/common/models/src/ns_api.rs new file mode 100644 index 0000000000..5d875420e2 --- /dev/null +++ b/common/models/src/ns_api.rs @@ -0,0 +1,7 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TestrunAssignment { + pub testrun_id: i64, + pub gateway_identity_key: String, +} diff --git a/nym-node-status-agent/Cargo.toml b/nym-node-status-agent/Cargo.toml new file mode 100644 index 0000000000..f89065b5b3 --- /dev/null +++ b/nym-node-status-agent/Cargo.toml @@ -0,0 +1,27 @@ +# Copyright 2024 - Nym Technologies SA +# SPDX-License-Identifier: Apache-2.0 + + +[package] +name = "nym-node-status-agent" +version = "0.1.4" +authors.workspace = true +repository.workspace = true +homepage.workspace = true +documentation.workspace = true +edition.workspace = true +license.workspace = true +rust-version.workspace = true +readme.workspace = true + +[dependencies] +anyhow = { workspace = true} +clap = { workspace = true, features = ["derive", "env"] } +nym-bin-common = { path = "../common/bin-common", features = ["models"]} +nym-common-models = { path = "../common/models" } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process"] } +tokio-util = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } +reqwest = { workspace = true, features = ["json"] } +serde_json = { workspace = true } diff --git a/nym-node-status-agent/run.sh b/nym-node-status-agent/run.sh new file mode 100755 index 0000000000..675e39c109 --- /dev/null +++ b/nym-node-status-agent/run.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +set -eu + +environment="qa" + +source ../envs/${environment}.env + +export RUST_LOG="debug" + +crate_root=$(dirname $(realpath "$0")) +gateway_probe_src=$(dirname $(dirname "$crate_root"))/nym-vpn-client/nym-vpn-core +echo "gateway_probe_src=$gateway_probe_src" +echo "crate_root=$crate_root" + +export NODE_STATUS_AGENT_PROBE_PATH="$crate_root/nym-gateway-probe" + +# build & copy over GW probe +function copy_gw_probe() { + pushd $gateway_probe_src + git switch main + git pull + cargo build --release --package nym-gateway-probe + cp target/release/nym-gateway-probe "$crate_root" + $crate_root/nym-gateway-probe --version + popd +} + +function build_agent() { + cargo build --package nym-node-status-agent --release +} + +function swarm() { + local workers=$1 + echo "Running $workers in parallel" + + build_agent + + for ((i = 1; i <= $workers; i++)); do + ../target/release/nym-node-status-agent run-probe & + done + + wait + + echo "All agents completed" +} + +export NODE_STATUS_AGENT_SERVER_ADDRESS="http://127.0.0.1" +export NODE_STATUS_AGENT_SERVER_PORT="8000" + +copy_gw_probe + +swarm 8 + +# cargo run -- run-probe diff --git a/nym-node-status-agent/src/cli.rs b/nym-node-status-agent/src/cli.rs new file mode 100644 index 0000000000..81e70e2da9 --- /dev/null +++ b/nym-node-status-agent/src/cli.rs @@ -0,0 +1,105 @@ +use clap::{Parser, Subcommand}; +use nym_bin_common::bin_info; +use nym_common_models::ns_api::TestrunAssignment; +use std::sync::OnceLock; +use tracing::instrument; + +use crate::probe::GwProbe; + +// Helper for passing LONG_VERSION to clap +fn pretty_build_info_static() -> &'static str { + static PRETTY_BUILD_INFORMATION: OnceLock = OnceLock::new(); + PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print()) +} + +#[derive(Parser, Debug)] +#[clap(author = "Nymtech", version, long_version = pretty_build_info_static(), about)] +pub(crate) struct Args { + #[command(subcommand)] + pub(crate) command: Command, + #[arg(short, long, env = "NODE_STATUS_AGENT_SERVER_ADDRESS")] + pub(crate) server_address: String, + + #[arg(short = 'p', long, env = "NODE_STATUS_AGENT_SERVER_PORT")] + pub(crate) server_port: u16, + // TODO dz accept keypair for identification / auth +} + +#[derive(Subcommand, Debug)] +pub(crate) enum Command { + RunProbe { + /// path of binary to run + #[arg(long, env = "NODE_STATUS_AGENT_PROBE_PATH")] + probe_path: String, + }, +} + +impl Args { + pub(crate) async fn execute(&self) -> anyhow::Result<()> { + match &self.command { + Command::RunProbe { probe_path } => self.run_probe(probe_path).await?, + } + + Ok(()) + } + + async fn run_probe(&self, probe_path: &str) -> anyhow::Result<()> { + let server_address = format!("{}:{}", &self.server_address, self.server_port); + + let probe = GwProbe::new(probe_path.to_string()); + + let version = probe.version().await; + tracing::info!("Probe version:\n{}", version); + + let testrun = request_testrun(&server_address).await?; + + let log = probe.run_and_get_log(&Some(testrun.gateway_identity_key)); + + submit_results(&server_address, testrun.testrun_id, log).await?; + + Ok(()) + } +} + +const URL_BASE: &str = "internal/testruns"; + +#[instrument(level = "debug", skip_all)] +async fn request_testrun(server_addr: &str) -> anyhow::Result { + let target_url = format!("{}/{}", server_addr, URL_BASE); + let client = reqwest::Client::new(); + let res = client + .get(target_url) + .send() + .await + .and_then(|response| response.error_for_status())?; + res.json() + .await + .map(|testrun| { + tracing::info!("Received testrun assignment: {:?}", testrun); + testrun + }) + .map_err(|err| { + tracing::error!("err"); + err.into() + }) +} + +#[instrument(level = "debug", skip(probe_outcome))] +async fn submit_results( + server_addr: &str, + testrun_id: i64, + probe_outcome: String, +) -> anyhow::Result<()> { + let target_url = format!("{}/{}/{}", server_addr, URL_BASE, testrun_id); + let client = reqwest::Client::new(); + + let res = client + .post(target_url) + .body(probe_outcome) + .send() + .await + .and_then(|response| response.error_for_status())?; + + tracing::debug!("Submitted results: {})", res.status()); + Ok(()) +} diff --git a/nym-node-status-agent/src/probe.rs b/nym-node-status-agent/src/probe.rs new file mode 100644 index 0000000000..f779f3af53 --- /dev/null +++ b/nym-node-status-agent/src/probe.rs @@ -0,0 +1,60 @@ +use tracing::error; + +pub(crate) struct GwProbe { + path: String, +} + +impl GwProbe { + pub(crate) fn new(probe_path: String) -> Self { + Self { path: probe_path } + } + + pub(crate) async fn version(&self) -> String { + let mut command = tokio::process::Command::new(&self.path); + command.stdout(std::process::Stdio::piped()); + command.arg("--version"); + + match command.spawn() { + Ok(child) => { + if let Ok(output) = child.wait_with_output().await { + return String::from_utf8(output.stdout) + .unwrap_or("Unable to get log from test run".to_string()); + } + "Unable to get probe version".to_string() + } + Err(e) => { + error!("Failed to get probe version: {}", e); + "Failed to get probe version".to_string() + } + } + } + + pub(crate) fn run_and_get_log(&self, gateway_key: &Option) -> String { + let mut command = std::process::Command::new(&self.path); + command.stdout(std::process::Stdio::piped()); + + if let Some(gateway_id) = gateway_key { + command.arg("--gateway").arg(gateway_id); + } + + match command.spawn() { + Ok(child) => { + if let Ok(output) = child.wait_with_output() { + if !output.status.success() { + let out = String::from_utf8_lossy(&output.stdout); + let err = String::from_utf8_lossy(&output.stderr); + tracing::error!("Probe exited with {:?}:\n{}\n{}", output.status, out, err); + } + + return String::from_utf8(output.stdout) + .unwrap_or("Unable to get log from test run".to_string()); + } + "Unable to get log from test run".to_string() + } + Err(e) => { + error!("Failed to spawn test: {}", e); + "Failed to spawn test run task".to_string() + } + } + } +} diff --git a/nym-node-status-api/.sqlx/query-06b17d1e5f61201a1b7542896ba55c69cd5c1a7e7d87073c94600c783a0a3984.json b/nym-node-status-api/.sqlx/query-06b17d1e5f61201a1b7542896ba55c69cd5c1a7e7d87073c94600c783a0a3984.json new file mode 100644 index 0000000000..ad57224826 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-06b17d1e5f61201a1b7542896ba55c69cd5c1a7e7d87073c94600c783a0a3984.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "SELECT\n gateway_identity_key\n FROM\n gateways\n WHERE\n id = ?", + "describe": { + "columns": [ + { + "name": "gateway_identity_key", + "ordinal": 0, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "06b17d1e5f61201a1b7542896ba55c69cd5c1a7e7d87073c94600c783a0a3984" +} diff --git a/nym-node-status-api/.sqlx/query-1327b5118f9144dddbcf8edb11f7dc549cf503409fd6dfedcdc02dbcd61d5454.json b/nym-node-status-api/.sqlx/query-1327b5118f9144dddbcf8edb11f7dc549cf503409fd6dfedcdc02dbcd61d5454.json new file mode 100644 index 0000000000..8b69daa6a3 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-1327b5118f9144dddbcf8edb11f7dc549cf503409fd6dfedcdc02dbcd61d5454.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "SELECT\n key as \"key!\",\n value_json as \"value_json!\",\n last_updated_utc as \"last_updated_utc!\"\n FROM summary", + "describe": { + "columns": [ + { + "name": "key!", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "value_json!", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "last_updated_utc!", + "ordinal": 2, + "type_info": "Int64" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + true, + true, + false + ] + }, + "hash": "1327b5118f9144dddbcf8edb11f7dc549cf503409fd6dfedcdc02dbcd61d5454" +} diff --git a/nym-node-status-api/.sqlx/query-18abc8fde56cf86baed7b4afa38f2c63cdf90f2f3b6d81afb9000bb0968dcaea.json b/nym-node-status-api/.sqlx/query-18abc8fde56cf86baed7b4afa38f2c63cdf90f2f3b6d81afb9000bb0968dcaea.json new file mode 100644 index 0000000000..9bc2966149 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-18abc8fde56cf86baed7b4afa38f2c63cdf90f2f3b6d81afb9000bb0968dcaea.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "UPDATE mixnodes\n SET bonded = ?, last_updated_utc = ?\n WHERE id = ?;", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "18abc8fde56cf86baed7b4afa38f2c63cdf90f2f3b6d81afb9000bb0968dcaea" +} diff --git a/nym-node-status-api/.sqlx/query-2236299f9f691376db54cbd58ec5ceb89b9925cba46efcf4ed79ef0759a01129.json b/nym-node-status-api/.sqlx/query-2236299f9f691376db54cbd58ec5ceb89b9925cba46efcf4ed79ef0759a01129.json new file mode 100644 index 0000000000..d9bf0dd6aa --- /dev/null +++ b/nym-node-status-api/.sqlx/query-2236299f9f691376db54cbd58ec5ceb89b9925cba46efcf4ed79ef0759a01129.json @@ -0,0 +1,26 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT\n id,\n gateway_identity_key\n FROM gateways\n WHERE id = ?\n LIMIT 1", + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Int64" + }, + { + "name": "gateway_identity_key", + "ordinal": 1, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false + ] + }, + "hash": "2236299f9f691376db54cbd58ec5ceb89b9925cba46efcf4ed79ef0759a01129" +} diff --git a/nym-node-status-api/.sqlx/query-3c584e211d07c511644c8079187965acf3bcfb3f84ba8d24ed645d79976cf784.json b/nym-node-status-api/.sqlx/query-3c584e211d07c511644c8079187965acf3bcfb3f84ba8d24ed645d79976cf784.json new file mode 100644 index 0000000000..56898ca5ad --- /dev/null +++ b/nym-node-status-api/.sqlx/query-3c584e211d07c511644c8079187965acf3bcfb3f84ba8d24ed645d79976cf784.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO testruns (gateway_id, status, ip_address, timestamp_utc, log) VALUES (?, ?, ?, ?, ?)", + "describe": { + "columns": [], + "parameters": { + "Right": 5 + }, + "nullable": [] + }, + "hash": "3c584e211d07c511644c8079187965acf3bcfb3f84ba8d24ed645d79976cf784" +} diff --git a/nym-node-status-api/.sqlx/query-3d3a1fa429e3090741c6b6a8e82e692afc04b51e8782bcbf59f1eb4116112536.json b/nym-node-status-api/.sqlx/query-3d3a1fa429e3090741c6b6a8e82e692afc04b51e8782bcbf59f1eb4116112536.json new file mode 100644 index 0000000000..10158436b6 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-3d3a1fa429e3090741c6b6a8e82e692afc04b51e8782bcbf59f1eb4116112536.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "UPDATE gateways\n SET bonded = ?, last_updated_utc = ?\n WHERE id = ?;", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "3d3a1fa429e3090741c6b6a8e82e692afc04b51e8782bcbf59f1eb4116112536" +} diff --git a/nym-node-status-api/.sqlx/query-3d5fc502f976f5081f01352856b8632c29c81bfafb043bb8744129cf9e0266ad.json b/nym-node-status-api/.sqlx/query-3d5fc502f976f5081f01352856b8632c29c81bfafb043bb8744129cf9e0266ad.json new file mode 100644 index 0000000000..2a834fe87e --- /dev/null +++ b/nym-node-status-api/.sqlx/query-3d5fc502f976f5081f01352856b8632c29c81bfafb043bb8744129cf9e0266ad.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "SELECT\n id as \"id!\",\n gateway_identity_key as \"gateway_identity_key!\",\n self_described as \"self_described?\",\n explorer_pretty_bond as \"explorer_pretty_bond?\"\n FROM gateways\n WHERE gateway_identity_key = ?\n ORDER BY gateway_identity_key\n LIMIT 1", + "describe": { + "columns": [ + { + "name": "id!", + "ordinal": 0, + "type_info": "Int64" + }, + { + "name": "gateway_identity_key!", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "self_described?", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "explorer_pretty_bond?", + "ordinal": 3, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + true, + false, + true, + true + ] + }, + "hash": "3d5fc502f976f5081f01352856b8632c29c81bfafb043bb8744129cf9e0266ad" +} diff --git a/nym-node-status-api/.sqlx/query-418944f2eccb838cb3882f34469203c8569f03fdd39ce09d7b74177896e52a8c.json b/nym-node-status-api/.sqlx/query-418944f2eccb838cb3882f34469203c8569f03fdd39ce09d7b74177896e52a8c.json new file mode 100644 index 0000000000..f9eb3657e7 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-418944f2eccb838cb3882f34469203c8569f03fdd39ce09d7b74177896e52a8c.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "UPDATE testruns SET status = ? WHERE id = ?", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + }, + "hash": "418944f2eccb838cb3882f34469203c8569f03fdd39ce09d7b74177896e52a8c" +} diff --git a/nym-node-status-api/.sqlx/query-46d76bc6d3fba2dae3b21511a36289dd776749dd7a20cda61b0480f2fba60889.json b/nym-node-status-api/.sqlx/query-46d76bc6d3fba2dae3b21511a36289dd776749dd7a20cda61b0480f2fba60889.json new file mode 100644 index 0000000000..2cb8760009 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-46d76bc6d3fba2dae3b21511a36289dd776749dd7a20cda61b0480f2fba60889.json @@ -0,0 +1,50 @@ +{ + "db_name": "SQLite", + "query": "SELECT\n id as \"id!\",\n gateway_id as \"gateway_id!\",\n status as \"status!\",\n timestamp_utc as \"timestamp_utc!\",\n ip_address as \"ip_address!\",\n log as \"log!\"\n FROM testruns\n WHERE gateway_id = ? AND status != 2\n ORDER BY id DESC\n LIMIT 1", + "describe": { + "columns": [ + { + "name": "id!", + "ordinal": 0, + "type_info": "Int64" + }, + { + "name": "gateway_id!", + "ordinal": 1, + "type_info": "Int64" + }, + { + "name": "status!", + "ordinal": 2, + "type_info": "Int64" + }, + { + "name": "timestamp_utc!", + "ordinal": 3, + "type_info": "Int64" + }, + { + "name": "ip_address!", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "log!", + "ordinal": 5, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "46d76bc6d3fba2dae3b21511a36289dd776749dd7a20cda61b0480f2fba60889" +} diff --git a/nym-node-status-api/.sqlx/query-4afcc6673890f795c2793f1e2f8570ee787fc7daf00fcb916f18d1cb7d6c8f08.json b/nym-node-status-api/.sqlx/query-4afcc6673890f795c2793f1e2f8570ee787fc7daf00fcb916f18d1cb7d6c8f08.json new file mode 100644 index 0000000000..6a9fd9d4eb --- /dev/null +++ b/nym-node-status-api/.sqlx/query-4afcc6673890f795c2793f1e2f8570ee787fc7daf00fcb916f18d1cb7d6c8f08.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "UPDATE gateways SET last_probe_log = ? WHERE id = ?", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + }, + "hash": "4afcc6673890f795c2793f1e2f8570ee787fc7daf00fcb916f18d1cb7d6c8f08" +} diff --git a/nym-node-status-api/.sqlx/query-4b61a4bc32333c92a8f5ad4ad0017b40dc01845f554b5479f37855d89b309e6f.json b/nym-node-status-api/.sqlx/query-4b61a4bc32333c92a8f5ad4ad0017b40dc01845f554b5479f37855d89b309e6f.json new file mode 100644 index 0000000000..8b9c9699f8 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-4b61a4bc32333c92a8f5ad4ad0017b40dc01845f554b5479f37855d89b309e6f.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "SELECT\n id as \"id!\",\n gateway_identity_key as \"identity_key!\",\n bonded as \"bonded: bool\"\n FROM gateways\n WHERE bonded = ?", + "describe": { + "columns": [ + { + "name": "id!", + "ordinal": 0, + "type_info": "Int64" + }, + { + "name": "identity_key!", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "bonded: bool", + "ordinal": 2, + "type_info": "Int64" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "4b61a4bc32333c92a8f5ad4ad0017b40dc01845f554b5479f37855d89b309e6f" +} diff --git a/nym-node-status-api/.sqlx/query-670b7ed7d57a6986181b24be24ca667e8cacdf677ccb906415b3fe92be0c436b.json b/nym-node-status-api/.sqlx/query-670b7ed7d57a6986181b24be24ca667e8cacdf677ccb906415b3fe92be0c436b.json new file mode 100644 index 0000000000..38fe641a3a --- /dev/null +++ b/nym-node-status-api/.sqlx/query-670b7ed7d57a6986181b24be24ca667e8cacdf677ccb906415b3fe92be0c436b.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "SELECT count(id) FROM mixnodes", + "describe": { + "columns": [ + { + "name": "count(id)", + "ordinal": 0, + "type_info": "Int" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false + ] + }, + "hash": "670b7ed7d57a6986181b24be24ca667e8cacdf677ccb906415b3fe92be0c436b" +} diff --git a/nym-node-status-api/.sqlx/query-6d7967b831b355d5f2c77950abc56f816956b0824c66a25da611dce688105d36.json b/nym-node-status-api/.sqlx/query-6d7967b831b355d5f2c77950abc56f816956b0824c66a25da611dce688105d36.json new file mode 100644 index 0000000000..e12434a85a --- /dev/null +++ b/nym-node-status-api/.sqlx/query-6d7967b831b355d5f2c77950abc56f816956b0824c66a25da611dce688105d36.json @@ -0,0 +1,50 @@ +{ + "db_name": "SQLite", + "query": "SELECT\n id as \"id!\",\n gateway_id as \"gateway_id!\",\n status as \"status!\",\n timestamp_utc as \"timestamp_utc!\",\n ip_address as \"ip_address!\",\n log as \"log!\"\n FROM testruns\n WHERE\n id = ?\n AND\n status = ?\n ORDER BY timestamp_utc", + "describe": { + "columns": [ + { + "name": "id!", + "ordinal": 0, + "type_info": "Int64" + }, + { + "name": "gateway_id!", + "ordinal": 1, + "type_info": "Int64" + }, + { + "name": "status!", + "ordinal": 2, + "type_info": "Int64" + }, + { + "name": "timestamp_utc!", + "ordinal": 3, + "type_info": "Int64" + }, + { + "name": "ip_address!", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "log!", + "ordinal": 5, + "type_info": "Text" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "6d7967b831b355d5f2c77950abc56f816956b0824c66a25da611dce688105d36" +} diff --git a/nym-node-status-api/.sqlx/query-6eb1a682cf13205cf701590021cdf795147ac3724e89df5b2f24f7215d87dce1.json b/nym-node-status-api/.sqlx/query-6eb1a682cf13205cf701590021cdf795147ac3724e89df5b2f24f7215d87dce1.json new file mode 100644 index 0000000000..86b61631a6 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-6eb1a682cf13205cf701590021cdf795147ac3724e89df5b2f24f7215d87dce1.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO mixnodes\n (mix_id, identity_key, bonded, total_stake,\n host, http_api_port, blacklisted, full_details,\n self_described, last_updated_utc, is_dp_delegatee)\n VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT(mix_id) DO UPDATE SET\n bonded=excluded.bonded,\n total_stake=excluded.total_stake, host=excluded.host,\n http_api_port=excluded.http_api_port,blacklisted=excluded.blacklisted,\n full_details=excluded.full_details,self_described=excluded.self_described,\n last_updated_utc=excluded.last_updated_utc,\n is_dp_delegatee = excluded.is_dp_delegatee;", + "describe": { + "columns": [], + "parameters": { + "Right": 11 + }, + "nullable": [] + }, + "hash": "6eb1a682cf13205cf701590021cdf795147ac3724e89df5b2f24f7215d87dce1" +} diff --git a/nym-node-status-api/.sqlx/query-6ef3efde571d46961244cd90420f3de5949a5ff2083453cb879af8a1689efe2f.json b/nym-node-status-api/.sqlx/query-6ef3efde571d46961244cd90420f3de5949a5ff2083453cb879af8a1689efe2f.json new file mode 100644 index 0000000000..9d93b219f9 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-6ef3efde571d46961244cd90420f3de5949a5ff2083453cb879af8a1689efe2f.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "UPDATE gateways SET last_probe_result = ? WHERE id = ?", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + }, + "hash": "6ef3efde571d46961244cd90420f3de5949a5ff2083453cb879af8a1689efe2f" +} diff --git a/nym-node-status-api/.sqlx/query-71a455c705f9c25d3843ff2fb8629d1320a5eb10797cdb5a435455d22c6aeac1.json b/nym-node-status-api/.sqlx/query-71a455c705f9c25d3843ff2fb8629d1320a5eb10797cdb5a435455d22c6aeac1.json new file mode 100644 index 0000000000..f061747404 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-71a455c705f9c25d3843ff2fb8629d1320a5eb10797cdb5a435455d22c6aeac1.json @@ -0,0 +1,98 @@ +{ + "db_name": "SQLite", + "query": "SELECT\n gw.gateway_identity_key as \"gateway_identity_key!\",\n gw.bonded as \"bonded: bool\",\n gw.blacklisted as \"blacklisted: bool\",\n gw.performance as \"performance!\",\n gw.self_described as \"self_described?\",\n gw.explorer_pretty_bond as \"explorer_pretty_bond?\",\n gw.last_probe_result as \"last_probe_result?\",\n gw.last_probe_log as \"last_probe_log?\",\n gw.last_testrun_utc as \"last_testrun_utc?\",\n gw.last_updated_utc as \"last_updated_utc!\",\n COALESCE(gd.moniker, \"NA\") as \"moniker!\",\n COALESCE(gd.website, \"NA\") as \"website!\",\n COALESCE(gd.security_contact, \"NA\") as \"security_contact!\",\n COALESCE(gd.details, \"NA\") as \"details!\"\n FROM gateways gw\n LEFT JOIN gateway_description gd\n ON gw.gateway_identity_key = gd.gateway_identity_key\n ORDER BY gw.gateway_identity_key", + "describe": { + "columns": [ + { + "name": "gateway_identity_key!", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "bonded: bool", + "ordinal": 1, + "type_info": "Int64" + }, + { + "name": "blacklisted: bool", + "ordinal": 2, + "type_info": "Int64" + }, + { + "name": "performance!", + "ordinal": 3, + "type_info": "Int64" + }, + { + "name": "self_described?", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "explorer_pretty_bond?", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "last_probe_result?", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "last_probe_log?", + "ordinal": 7, + "type_info": "Text" + }, + { + "name": "last_testrun_utc?", + "ordinal": 8, + "type_info": "Int64" + }, + { + "name": "last_updated_utc!", + "ordinal": 9, + "type_info": "Int64" + }, + { + "name": "moniker!", + "ordinal": 10, + "type_info": "Text" + }, + { + "name": "website!", + "ordinal": 11, + "type_info": "Text" + }, + { + "name": "security_contact!", + "ordinal": 12, + "type_info": "Text" + }, + { + "name": "details!", + "ordinal": 13, + "type_info": "Text" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false, + false, + false, + false, + true, + true, + true, + true, + true, + false, + false, + false, + false, + false + ] + }, + "hash": "71a455c705f9c25d3843ff2fb8629d1320a5eb10797cdb5a435455d22c6aeac1" +} diff --git a/nym-node-status-api/.sqlx/query-7600823da7ce80b8ffda933608603a2752e28df775d1af8fd943a5fc8d7dc00d.json b/nym-node-status-api/.sqlx/query-7600823da7ce80b8ffda933608603a2752e28df775d1af8fd943a5fc8d7dc00d.json new file mode 100644 index 0000000000..9cba203bdb --- /dev/null +++ b/nym-node-status-api/.sqlx/query-7600823da7ce80b8ffda933608603a2752e28df775d1af8fd943a5fc8d7dc00d.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "SELECT\n id as \"id!\",\n date as \"date!\",\n timestamp_utc as \"timestamp_utc!\",\n value_json as \"value_json!\"\n FROM summary_history\n ORDER BY date DESC\n LIMIT 30", + "describe": { + "columns": [ + { + "name": "id!", + "ordinal": 0, + "type_info": "Int64" + }, + { + "name": "date!", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "timestamp_utc!", + "ordinal": 2, + "type_info": "Int64" + }, + { + "name": "value_json!", + "ordinal": 3, + "type_info": "Text" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + true, + false, + false, + true + ] + }, + "hash": "7600823da7ce80b8ffda933608603a2752e28df775d1af8fd943a5fc8d7dc00d" +} diff --git a/nym-node-status-api/.sqlx/query-788515c34588aec352773df4b6e6c5e41f3c0bb56a27648b5e25466b8634a578.json b/nym-node-status-api/.sqlx/query-788515c34588aec352773df4b6e6c5e41f3c0bb56a27648b5e25466b8634a578.json new file mode 100644 index 0000000000..5252ccf2e7 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-788515c34588aec352773df4b6e6c5e41f3c0bb56a27648b5e25466b8634a578.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO summary_history\n (date, timestamp_utc, value_json)\n VALUES (?, ?, ?)\n ON CONFLICT(date) DO UPDATE SET\n timestamp_utc=excluded.timestamp_utc,\n value_json=excluded.value_json;", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "788515c34588aec352773df4b6e6c5e41f3c0bb56a27648b5e25466b8634a578" +} diff --git a/nym-node-status-api/.sqlx/query-8571faad2f66e08f24acfbfe036d17ca6eb090df7f6d52ef89c5d51564f8b45c.json b/nym-node-status-api/.sqlx/query-8571faad2f66e08f24acfbfe036d17ca6eb090df7f6d52ef89c5d51564f8b45c.json new file mode 100644 index 0000000000..3f171170db --- /dev/null +++ b/nym-node-status-api/.sqlx/query-8571faad2f66e08f24acfbfe036d17ca6eb090df7f6d52ef89c5d51564f8b45c.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "UPDATE gateways\n SET blacklisted = true\n WHERE gateway_identity_key = ?;", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "8571faad2f66e08f24acfbfe036d17ca6eb090df7f6d52ef89c5d51564f8b45c" +} diff --git a/nym-node-status-api/.sqlx/query-86ff64db477a1d6235179b0b88d86b86d1b9be62336c9eac0eef44987a5451b5.json b/nym-node-status-api/.sqlx/query-86ff64db477a1d6235179b0b88d86b86d1b9be62336c9eac0eef44987a5451b5.json new file mode 100644 index 0000000000..60583ed8b2 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-86ff64db477a1d6235179b0b88d86b86d1b9be62336c9eac0eef44987a5451b5.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "SELECT count(id) FROM gateways", + "describe": { + "columns": [ + { + "name": "count(id)", + "ordinal": 0, + "type_info": "Int" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false + ] + }, + "hash": "86ff64db477a1d6235179b0b88d86b86d1b9be62336c9eac0eef44987a5451b5" +} diff --git a/nym-node-status-api/.sqlx/query-930a41e612b4e964ae214843da190f6c66c14d4267a2cc2ca73354becc2c8bb8.json b/nym-node-status-api/.sqlx/query-930a41e612b4e964ae214843da190f6c66c14d4267a2cc2ca73354becc2c8bb8.json new file mode 100644 index 0000000000..6c2a194e80 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-930a41e612b4e964ae214843da190f6c66c14d4267a2cc2ca73354becc2c8bb8.json @@ -0,0 +1,26 @@ +{ + "db_name": "SQLite", + "query": "SELECT\n gateway_identity_key as \"gateway_identity_key!\",\n bonded as \"bonded: bool\"\n FROM gateways\n ORDER BY last_testrun_utc", + "describe": { + "columns": [ + { + "name": "gateway_identity_key!", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "bonded: bool", + "ordinal": 1, + "type_info": "Int64" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false, + false + ] + }, + "hash": "930a41e612b4e964ae214843da190f6c66c14d4267a2cc2ca73354becc2c8bb8" +} diff --git a/nym-node-status-api/.sqlx/query-c214c001acbbf79fa499816f36ec586c4c29c03efb4cf0c40b73a5c76159cf5c.json b/nym-node-status-api/.sqlx/query-c214c001acbbf79fa499816f36ec586c4c29c03efb4cf0c40b73a5c76159cf5c.json new file mode 100644 index 0000000000..8cc95e72de --- /dev/null +++ b/nym-node-status-api/.sqlx/query-c214c001acbbf79fa499816f36ec586c4c29c03efb4cf0c40b73a5c76159cf5c.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "UPDATE gateways SET last_testrun_utc = ?, last_updated_utc = ? WHERE id = ?", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "c214c001acbbf79fa499816f36ec586c4c29c03efb4cf0c40b73a5c76159cf5c" +} diff --git a/nym-node-status-api/.sqlx/query-c5e3cd7284b334df5aa979b1627ea1f6dc2aed00cedde25f2be3567e47064351.json b/nym-node-status-api/.sqlx/query-c5e3cd7284b334df5aa979b1627ea1f6dc2aed00cedde25f2be3567e47064351.json new file mode 100644 index 0000000000..2e4f34fd85 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-c5e3cd7284b334df5aa979b1627ea1f6dc2aed00cedde25f2be3567e47064351.json @@ -0,0 +1,44 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT\n date_utc as \"date_utc!\",\n packets_received as \"total_packets_received!: i64\",\n packets_sent as \"total_packets_sent!: i64\",\n packets_dropped as \"total_packets_dropped!: i64\",\n total_stake as \"total_stake!: i64\"\n FROM (\n SELECT\n date_utc,\n SUM(packets_received) as packets_received,\n SUM(packets_sent) as packets_sent,\n SUM(packets_dropped) as packets_dropped,\n SUM(total_stake) as total_stake\n FROM mixnode_daily_stats\n GROUP BY date_utc\n ORDER BY date_utc DESC\n LIMIT 30\n )\n GROUP BY date_utc\n ORDER BY date_utc\n ", + "describe": { + "columns": [ + { + "name": "date_utc!", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "total_packets_received!: i64", + "ordinal": 1, + "type_info": "Int64" + }, + { + "name": "total_packets_sent!: i64", + "ordinal": 2, + "type_info": "Int64" + }, + { + "name": "total_packets_dropped!: i64", + "ordinal": 3, + "type_info": "Int64" + }, + { + "name": "total_stake!: i64", + "ordinal": 4, + "type_info": "Int64" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false, + true, + true, + true, + false + ] + }, + "hash": "c5e3cd7284b334df5aa979b1627ea1f6dc2aed00cedde25f2be3567e47064351" +} diff --git a/nym-node-status-api/.sqlx/query-c7ba2621becb9ac4b5dee0ce303dadfcf19095935867a51cbd5b8362d1505fcc.json b/nym-node-status-api/.sqlx/query-c7ba2621becb9ac4b5dee0ce303dadfcf19095935867a51cbd5b8362d1505fcc.json new file mode 100644 index 0000000000..4d0edd1e88 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-c7ba2621becb9ac4b5dee0ce303dadfcf19095935867a51cbd5b8362d1505fcc.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "SELECT\n id as \"id!\",\n identity_key as \"identity_key!\",\n bonded as \"bonded: bool\"\n FROM mixnodes\n WHERE bonded = ?", + "describe": { + "columns": [ + { + "name": "id!", + "ordinal": 0, + "type_info": "Int64" + }, + { + "name": "identity_key!", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "bonded: bool", + "ordinal": 2, + "type_info": "Int64" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "c7ba2621becb9ac4b5dee0ce303dadfcf19095935867a51cbd5b8362d1505fcc" +} diff --git a/nym-node-status-api/.sqlx/query-d8ea93e781666e6267902170709ee2aa37f6163525bbdce1a4cebef4a285f8d9.json b/nym-node-status-api/.sqlx/query-d8ea93e781666e6267902170709ee2aa37f6163525bbdce1a4cebef4a285f8d9.json new file mode 100644 index 0000000000..94e5513279 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-d8ea93e781666e6267902170709ee2aa37f6163525bbdce1a4cebef4a285f8d9.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO gateways\n (gateway_identity_key, bonded, blacklisted,\n self_described, explorer_pretty_bond,\n last_updated_utc, performance)\n VALUES (?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT(gateway_identity_key) DO UPDATE SET\n bonded=excluded.bonded,\n blacklisted=excluded.blacklisted,\n self_described=excluded.self_described,\n explorer_pretty_bond=excluded.explorer_pretty_bond,\n last_updated_utc=excluded.last_updated_utc,\n performance = excluded.performance;", + "describe": { + "columns": [], + "parameters": { + "Right": 7 + }, + "nullable": [] + }, + "hash": "d8ea93e781666e6267902170709ee2aa37f6163525bbdce1a4cebef4a285f8d9" +} diff --git a/nym-node-status-api/.sqlx/query-e0c76a959276e3b0f44c720af9c74a5bf4912ee73468e62e7d0d96b1d9074cbe.json b/nym-node-status-api/.sqlx/query-e0c76a959276e3b0f44c720af9c74a5bf4912ee73468e62e7d0d96b1d9074cbe.json new file mode 100644 index 0000000000..5f7443bd50 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-e0c76a959276e3b0f44c720af9c74a5bf4912ee73468e62e7d0d96b1d9074cbe.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO summary\n (key, value_json, last_updated_utc)\n VALUES (?, ?, ?)\n ON CONFLICT(key) DO UPDATE SET\n value_json=excluded.value_json,\n last_updated_utc=excluded.last_updated_utc;", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "e0c76a959276e3b0f44c720af9c74a5bf4912ee73468e62e7d0d96b1d9074cbe" +} diff --git a/nym-node-status-api/.sqlx/query-f0a4316081d1be9444a87b95d933d31cb4bcc4071d31d8d2f7755e2d2c2e3e35.json b/nym-node-status-api/.sqlx/query-f0a4316081d1be9444a87b95d933d31cb4bcc4071d31d8d2f7755e2d2c2e3e35.json new file mode 100644 index 0000000000..d3f7611ba2 --- /dev/null +++ b/nym-node-status-api/.sqlx/query-f0a4316081d1be9444a87b95d933d31cb4bcc4071d31d8d2f7755e2d2c2e3e35.json @@ -0,0 +1,86 @@ +{ + "db_name": "SQLite", + "query": "SELECT\n mn.mix_id as \"mix_id!\",\n mn.bonded as \"bonded: bool\",\n mn.blacklisted as \"blacklisted: bool\",\n mn.is_dp_delegatee as \"is_dp_delegatee: bool\",\n mn.total_stake as \"total_stake!\",\n mn.full_details as \"full_details!\",\n mn.self_described as \"self_described\",\n mn.last_updated_utc as \"last_updated_utc!\",\n COALESCE(md.moniker, \"NA\") as \"moniker!\",\n COALESCE(md.website, \"NA\") as \"website!\",\n COALESCE(md.security_contact, \"NA\") as \"security_contact!\",\n COALESCE(md.details, \"NA\") as \"details!\"\n FROM mixnodes mn\n LEFT JOIN mixnode_description md ON mn.mix_id = md.mix_id\n ORDER BY mn.mix_id", + "describe": { + "columns": [ + { + "name": "mix_id!", + "ordinal": 0, + "type_info": "Int64" + }, + { + "name": "bonded: bool", + "ordinal": 1, + "type_info": "Int64" + }, + { + "name": "blacklisted: bool", + "ordinal": 2, + "type_info": "Int64" + }, + { + "name": "is_dp_delegatee: bool", + "ordinal": 3, + "type_info": "Int64" + }, + { + "name": "total_stake!", + "ordinal": 4, + "type_info": "Int64" + }, + { + "name": "full_details!", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "self_described", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "last_updated_utc!", + "ordinal": 7, + "type_info": "Int64" + }, + { + "name": "moniker!", + "ordinal": 8, + "type_info": "Text" + }, + { + "name": "website!", + "ordinal": 9, + "type_info": "Text" + }, + { + "name": "security_contact!", + "ordinal": 10, + "type_info": "Text" + }, + { + "name": "details!", + "ordinal": 11, + "type_info": "Text" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false, + false, + false, + false, + false, + true, + true, + false, + false, + false, + false, + false + ] + }, + "hash": "f0a4316081d1be9444a87b95d933d31cb4bcc4071d31d8d2f7755e2d2c2e3e35" +} diff --git a/nym-node-status-api/.sqlx/query-f5048d9926a5f5329f7f3b96d43b925e033ceec4f8112258feb4ac9e96fc5924.json b/nym-node-status-api/.sqlx/query-f5048d9926a5f5329f7f3b96d43b925e033ceec4f8112258feb4ac9e96fc5924.json new file mode 100644 index 0000000000..06d098bebc --- /dev/null +++ b/nym-node-status-api/.sqlx/query-f5048d9926a5f5329f7f3b96d43b925e033ceec4f8112258feb4ac9e96fc5924.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "UPDATE\n testruns\n SET\n status = ?\n WHERE\n status = ?\n AND\n timestamp_utc < ?\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "f5048d9926a5f5329f7f3b96d43b925e033ceec4f8112258feb4ac9e96fc5924" +} diff --git a/nym-node-status-api/.sqlx/query-ff9334ba7b670b218b2f9100e9ab5d2f2d08b2e53203aab9f07ea9b52acbd407.json b/nym-node-status-api/.sqlx/query-ff9334ba7b670b218b2f9100e9ab5d2f2d08b2e53203aab9f07ea9b52acbd407.json new file mode 100644 index 0000000000..24e735e96d --- /dev/null +++ b/nym-node-status-api/.sqlx/query-ff9334ba7b670b218b2f9100e9ab5d2f2d08b2e53203aab9f07ea9b52acbd407.json @@ -0,0 +1,26 @@ +{ + "db_name": "SQLite", + "query": "UPDATE testruns\n SET status = ?\n WHERE rowid =\n (\n SELECT rowid\n FROM testruns\n WHERE status = ?\n ORDER BY timestamp_utc asc\n LIMIT 1\n )\n RETURNING\n id as \"id!\",\n gateway_id\n ", + "describe": { + "columns": [ + { + "name": "id!", + "ordinal": 0, + "type_info": "Int64" + }, + { + "name": "gateway_id", + "ordinal": 1, + "type_info": "Int64" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + true, + false + ] + }, + "hash": "ff9334ba7b670b218b2f9100e9ab5d2f2d08b2e53203aab9f07ea9b52acbd407" +} diff --git a/nym-node-status-api/Cargo.toml b/nym-node-status-api/Cargo.toml new file mode 100644 index 0000000000..0969d31045 --- /dev/null +++ b/nym-node-status-api/Cargo.toml @@ -0,0 +1,62 @@ +# Copyright 2024 - Nym Technologies SA +# SPDX-License-Identifier: Apache-2.0 + +[package] +name = "nym-node-status-api" +version = "0.1.5" +authors.workspace = true +repository.workspace = true +homepage.workspace = true +documentation.workspace = true +edition.workspace = true +license.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow = { workspace = true } +axum = { workspace = true, features = ["tokio", "macros"] } +chrono = { workspace = true } +clap = { workspace = true, features = ["cargo", "derive", "env", "string"] } +cosmwasm-std = { workspace = true } +envy = { workspace = true } +futures-util = { workspace = true } +moka = { workspace = true, features = ["future"] } +nym-bin-common = { path = "../common/bin-common", features = ["models"]} +nym-common-models = { path = "../common/models" } +nym-explorer-client = { path = "../explorer-api/explorer-client" } +nym-network-defaults = { path = "../common/network-defaults" } +nym-validator-client = { path = "../common/client-libs/validator-client" } +nym-task = { path = "../common/task" } +nym-node-requests = { path = "../nym-node/nym-node-requests", features = ["openapi"] } +regex = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +serde_json_path = { workspace = true } +strum = { workspace = true } +strum_macros = { workspace = true } +sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio-util = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing-log = { workspace = true } +tower-http = { workspace = true, features = ["cors", "trace"] } +utoipa = { workspace = true, features = ["axum_extras", "time"] } +utoipa-swagger-ui = { workspace = true, features = ["axum"] } +# TODO dz `cargo update async-trait` +# for automatic schema detection, which was merged, but not released yet +# https://github.com/ProbablyClem/utoipauto/pull/38 +# utoipauto = { git = "https://github.com/ProbablyClem/utoipauto", rev = "eb04cba" } +utoipauto = { workspace = true } + +[build-dependencies] +anyhow = { workspace = true } +tokio = { workspace = true, features = ["macros"] } +sqlx = { workspace = true, features = [ + "runtime-tokio-rustls", + "sqlite", + "macros", + "migrate", +] } diff --git a/nym-node-status-api/build.rs b/nym-node-status-api/build.rs new file mode 100644 index 0000000000..025e755088 --- /dev/null +++ b/nym-node-status-api/build.rs @@ -0,0 +1,45 @@ +use anyhow::{anyhow, Result}; +use sqlx::{Connection, SqliteConnection}; +use std::fs::Permissions; +use std::os::unix::fs::PermissionsExt; +use tokio::{fs::File, io::AsyncWriteExt}; + +const SQLITE_DB_FILENAME: &str = "nym-node-status-api.sqlite"; + +/// If you need to re-run migrations or reset the db, just run +/// cargo clean -p nym-node-status-api +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { + let out_dir = read_env_var("OUT_DIR")?; + let database_path = format!("{}/{}?mode=rwc", out_dir, SQLITE_DB_FILENAME); + + write_db_path_to_file(&out_dir, SQLITE_DB_FILENAME).await?; + let mut conn = SqliteConnection::connect(&database_path).await?; + sqlx::migrate!("./migrations").run(&mut conn).await?; + + #[cfg(target_family = "unix")] + println!("cargo::rustc-env=DATABASE_URL=sqlite://{}", &database_path); + + #[cfg(target_family = "windows")] + // for some strange reason we need to add a leading `/` to the windows path even though it's + // not a valid windows path... but hey, it works... + println!("cargo::rustc-env=DATABASE_URL=sqlite:///{}", &database_path); + + Ok(()) +} + +fn read_env_var(var: &str) -> Result { + std::env::var(var).map_err(|_| anyhow!("You need to set {} env var", var)) +} + +/// use `./enter_db.sh` to inspect DB +async fn write_db_path_to_file(out_dir: &str, db_filename: &str) -> anyhow::Result<()> { + let mut file = File::create("enter_db.sh").await?; + let _ = file.write(b"#!/bin/bash\n").await?; + file.write_all(format!("sqlite3 {}/{}", out_dir, db_filename).as_bytes()) + .await?; + + file.set_permissions(Permissions::from_mode(0o755)) + .await + .map_err(From::from) +} diff --git a/nym-node-status-api/launch_node_status_api.sh b/nym-node-status-api/launch_node_status_api.sh new file mode 100755 index 0000000000..5d92675412 --- /dev/null +++ b/nym-node-status-api/launch_node_status_api.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +set -e + +export RUST_LOG=${RUST_LOG:-debug} + +export NYM_API_CLIENT_TIMEOUT=60 +export EXPLORER_CLIENT_TIMEOUT=60 +export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=60 + +export ENVIRONMENT="qa.env" + +function run_bare() { + # export necessary env vars + set -a + source ../envs/$ENVIRONMENT + set +a + export RUST_LOG=debug + + # --conection-url is provided in build.rs + cargo run --package nym-node-status-api +} + +function run_docker() { + cargo build --package nym-node-status-api --release + cp ../target/release/nym-node-status-api . + + cd .. + docker build -t node-status-api -f nym-node-status-api/Dockerfile.dev . + docker run --env-file envs/${ENVIRONMENT} \ + -e EXPLORER_CLIENT_TIMEOUT=$EXPLORER_CLIENT_TIMEOUT \ + -e NYM_API_CLIENT_TIMEOUT=$NYM_API_CLIENT_TIMEOUT \ + -e DATABASE_URL="sqlite://node-status-api.sqlite?mode=rwc" \ + -e RUST_LOG=${RUST_LOG} node-status-api + +} + +run_bare + +# run_docker diff --git a/nym-node-status-api/migrations/000_init.sql b/nym-node-status-api/migrations/000_init.sql new file mode 100644 index 0000000000..4f9fd7da60 --- /dev/null +++ b/nym-node-status-api/migrations/000_init.sql @@ -0,0 +1,112 @@ +CREATE TABLE gateways +( + id INTEGER PRIMARY KEY AUTOINCREMENT, + gateway_identity_key VARCHAR NOT NULL UNIQUE, + self_described VARCHAR NOT NULL, + explorer_pretty_bond VARCHAR, + last_probe_result VARCHAR, + last_probe_log VARCHAR, + config_score INTEGER NOT NULL DEFAULT (0), + config_score_successes REAL NOT NULL DEFAULT (0), + config_score_samples REAL NOT NULL DEFAULT (0), + routing_score INTEGER NOT NULL DEFAULT (0), + routing_score_successes REAL NOT NULL DEFAULT (0), + routing_score_samples REAL NOT NULL DEFAULT (0), + test_run_samples REAL NOT NULL DEFAULT (0), + last_testrun_utc INTEGER, + last_updated_utc INTEGER NOT NULL, + bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0, + blacklisted INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0, + performance INTEGER NOT NULL DEFAULT 0 +); + +CREATE INDEX idx_gateway_description_gateway_identity_key ON gateways (gateway_identity_key); + + +CREATE TABLE mixnodes ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + identity_key VARCHAR NOT NULL UNIQUE, + mix_id INTEGER NOT NULL UNIQUE, + bonded INTEGER CHECK (bonded in (0, 1)) NOT NULL DEFAULT 0, + total_stake INTEGER NOT NULL, + host VARCHAR NOT NULL, + http_api_port INTEGER NOT NULL, + blacklisted INTEGER CHECK (blacklisted in (0, 1)) NOT NULL DEFAULT 0, + full_details VARCHAR, + self_described VARCHAR, + last_updated_utc INTEGER NOT NULL + , is_dp_delegatee INTEGER CHECK (is_dp_delegatee IN (0, 1)) NOT NULL DEFAULT 0); +CREATE INDEX idx_mixnodes_mix_id ON mixnodes (mix_id); +CREATE INDEX idx_mixnodes_identity_key ON mixnodes (identity_key); + +CREATE TABLE + mixnode_description ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + mix_id INTEGER UNIQUE NOT NULL, + moniker VARCHAR, + website VARCHAR, + security_contact VARCHAR, + details VARCHAR, + last_updated_utc INTEGER NOT NULL, + FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id) + ); + +-- Indexes for description table +CREATE INDEX idx_mixnode_description_mix_id ON mixnode_description (mix_id); + + +CREATE TABLE summary +( + key VARCHAR PRIMARY KEY, + value_json VARCHAR, + last_updated_utc INTEGER NOT NULL +); + + +CREATE TABLE summary_history +( + id INTEGER PRIMARY KEY AUTOINCREMENT, + date VARCHAR UNIQUE NOT NULL, + timestamp_utc INTEGER NOT NULL, + value_json VARCHAR +); +CREATE INDEX idx_summary_history_timestamp_utc ON summary_history (timestamp_utc); +CREATE INDEX idx_summary_history_date ON summary_history (date); + + +CREATE TABLE gateway_description ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + gateway_identity_key VARCHAR UNIQUE NOT NULL, + moniker VARCHAR, + website VARCHAR, + security_contact VARCHAR, + details VARCHAR, + last_updated_utc INTEGER NOT NULL, + FOREIGN KEY (gateway_identity_key) REFERENCES gateways (gateway_identity_key) + ); + + +CREATE TABLE + mixnode_daily_stats ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + mix_id INTEGER NOT NULL, + total_stake BIGINT NOT NULL, + date_utc VARCHAR NOT NULL, + packets_received INTEGER DEFAULT 0, + packets_sent INTEGER DEFAULT 0, + packets_dropped INTEGER DEFAULT 0, + FOREIGN KEY (mix_id) REFERENCES mixnodes (mix_id), + UNIQUE (mix_id, date_utc) -- This constraint automatically creates an index + ); + + +CREATE TABLE testruns +( + id INTEGER PRIMARY KEY AUTOINCREMENT, + gateway_id INTEGER NOT NULL, + status INTEGER NOT NULL, -- 0=pending, 1=in-progress, 2=complete + timestamp_utc INTEGER NOT NULL, + ip_address VARCHAR NOT NULL, + log VARCHAR NOT NULL, + FOREIGN KEY (gateway_id) REFERENCES gateways (id) +); diff --git a/nym-node-status-api/src/db/models.rs b/nym-node-status-api/src/db/models.rs new file mode 100644 index 0000000000..a5511787f9 --- /dev/null +++ b/nym-node-status-api/src/db/models.rs @@ -0,0 +1,335 @@ +use crate::{ + http::{self, models::SummaryHistory}, + monitor::NumericalCheckedCast, +}; +use nym_node_requests::api::v1::node::models::NodeDescription; +use serde::{Deserialize, Serialize}; +use strum_macros::{EnumString, FromRepr}; +use utoipa::ToSchema; + +pub(crate) struct GatewayRecord { + pub(crate) identity_key: String, + pub(crate) bonded: bool, + pub(crate) blacklisted: bool, + pub(crate) self_described: String, + pub(crate) explorer_pretty_bond: Option, + pub(crate) last_updated_utc: i64, + pub(crate) performance: u8, +} + +#[derive(Debug, Clone)] +pub(crate) struct GatewayDto { + pub(crate) gateway_identity_key: String, + pub(crate) bonded: bool, + pub(crate) blacklisted: bool, + pub(crate) performance: i64, + pub(crate) self_described: Option, + pub(crate) explorer_pretty_bond: Option, + pub(crate) last_probe_result: Option, + pub(crate) last_probe_log: Option, + pub(crate) last_testrun_utc: Option, + pub(crate) last_updated_utc: i64, + pub(crate) moniker: String, + pub(crate) security_contact: String, + pub(crate) details: String, + pub(crate) website: String, +} + +impl TryFrom for http::models::Gateway { + type Error = anyhow::Error; + + fn try_from(value: GatewayDto) -> Result { + // Instead of using routing_score_successes / routing_score_samples, we use the + // number of successful testruns in the last 24h. + let routing_score = 0f32; + let config_score = 0u32; + let last_updated_utc = + timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339(); + let last_testrun_utc = value + .last_testrun_utc + .and_then(|i| i.cast_checked().ok()) + .map(|t| timestamp_as_utc(t).to_rfc3339()); + + let self_described = value.self_described.clone().unwrap_or("null".to_string()); + let explorer_pretty_bond = value + .explorer_pretty_bond + .clone() + .unwrap_or("null".to_string()); + let last_probe_result = value + .last_probe_result + .clone() + .unwrap_or("null".to_string()); + let last_probe_log = value.last_probe_log.clone(); + + let self_described = serde_json::from_str(&self_described).unwrap_or(None); + let explorer_pretty_bond = serde_json::from_str(&explorer_pretty_bond).unwrap_or(None); + let last_probe_result = serde_json::from_str(&last_probe_result).unwrap_or(None); + + let bonded = value.bonded; + let blacklisted = value.blacklisted; + let performance = value.performance as u8; + + let description = NodeDescription { + moniker: value.moniker.clone(), + website: value.website.clone(), + security_contact: value.security_contact.clone(), + details: value.details.clone(), + }; + + Ok(http::models::Gateway { + gateway_identity_key: value.gateway_identity_key.clone(), + bonded, + blacklisted, + performance, + self_described, + explorer_pretty_bond, + description, + last_probe_result, + last_probe_log, + routing_score, + config_score, + last_testrun_utc, + last_updated_utc, + }) + } +} + +fn timestamp_as_utc(unix_timestamp: u64) -> chrono::DateTime { + let d = std::time::UNIX_EPOCH + std::time::Duration::from_secs(unix_timestamp); + d.into() +} + +pub(crate) struct MixnodeRecord { + pub(crate) mix_id: u32, + pub(crate) identity_key: String, + pub(crate) bonded: bool, + pub(crate) total_stake: i64, + pub(crate) host: String, + pub(crate) http_port: u16, + pub(crate) blacklisted: bool, + pub(crate) full_details: String, + pub(crate) self_described: Option, + pub(crate) last_updated_utc: i64, + pub(crate) is_dp_delegatee: bool, +} + +#[derive(Debug, Clone)] +pub(crate) struct MixnodeDto { + pub(crate) mix_id: i64, + pub(crate) bonded: bool, + pub(crate) blacklisted: bool, + pub(crate) is_dp_delegatee: bool, + pub(crate) total_stake: i64, + pub(crate) full_details: String, + pub(crate) self_described: Option, + pub(crate) last_updated_utc: i64, + pub(crate) moniker: String, + pub(crate) website: String, + pub(crate) security_contact: String, + pub(crate) details: String, +} + +impl TryFrom for http::models::Mixnode { + type Error = anyhow::Error; + + fn try_from(value: MixnodeDto) -> Result { + let mix_id = value.mix_id.cast_checked()?; + let full_details = value.full_details.clone(); + let full_details = serde_json::from_str(&full_details).unwrap_or(None); + + let self_described = value + .self_described + .clone() + .map(|v| serde_json::from_str(&v).unwrap_or(serde_json::Value::Null)); + + let last_updated_utc = + timestamp_as_utc(value.last_updated_utc.cast_checked()?).to_rfc3339(); + let blacklisted = value.blacklisted; + let is_dp_delegatee = value.is_dp_delegatee; + let moniker = value.moniker.clone(); + let website = value.website.clone(); + let security_contact = value.security_contact.clone(); + let details = value.details.clone(); + + Ok(http::models::Mixnode { + mix_id, + bonded: value.bonded, + blacklisted, + is_dp_delegatee, + total_stake: value.total_stake, + full_details, + description: NodeDescription { + moniker, + website, + security_contact, + details, + }, + self_described, + last_updated_utc, + }) + } +} + +#[allow(unused)] +#[derive(Debug, Clone)] +pub(crate) struct BondedStatusDto { + pub(crate) id: i64, + pub(crate) identity_key: String, + pub(crate) bonded: bool, +} + +#[allow(unused)] +#[derive(Debug, Clone, Default)] +pub(crate) struct SummaryDto { + pub(crate) key: String, + pub(crate) value_json: String, + pub(crate) last_updated_utc: i64, +} + +#[derive(Debug, Clone, Default)] +pub(crate) struct SummaryHistoryDto { + #[allow(dead_code)] + pub id: i64, + pub date: String, + pub value_json: String, + pub timestamp_utc: i64, +} + +impl TryFrom for SummaryHistory { + type Error = anyhow::Error; + + fn try_from(value: SummaryHistoryDto) -> Result { + let value_json = serde_json::from_str(&value.value_json).unwrap_or_default(); + Ok(SummaryHistory { + value_json, + date: value.date.clone(), + timestamp_utc: timestamp_as_utc(value.timestamp_utc.cast_checked()?).to_rfc3339(), + }) + } +} + +pub(crate) const MIXNODES_BONDED_COUNT: &str = "mixnodes.bonded.count"; +pub(crate) const MIXNODES_BONDED_ACTIVE: &str = "mixnodes.bonded.active"; +pub(crate) const MIXNODES_BONDED_INACTIVE: &str = "mixnodes.bonded.inactive"; +pub(crate) const MIXNODES_BONDED_RESERVE: &str = "mixnodes.bonded.reserve"; +pub(crate) const MIXNODES_BLACKLISTED_COUNT: &str = "mixnodes.blacklisted.count"; + +pub(crate) const GATEWAYS_BONDED_COUNT: &str = "gateways.bonded.count"; +pub(crate) const GATEWAYS_EXPLORER_COUNT: &str = "gateways.explorer.count"; +pub(crate) const GATEWAYS_BLACKLISTED_COUNT: &str = "gateways.blacklisted.count"; + +pub(crate) const MIXNODES_HISTORICAL_COUNT: &str = "mixnodes.historical.count"; +pub(crate) const GATEWAYS_HISTORICAL_COUNT: &str = "gateways.historical.count"; + +// `utoipa`` goes crazy if you use module-qualified prefix as field type so we +// have to import it +use gateway::GatewaySummary; +use mixnode::MixnodeSummary; + +#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] +pub(crate) struct NetworkSummary { + pub(crate) mixnodes: MixnodeSummary, + pub(crate) gateways: GatewaySummary, +} + +pub(crate) mod mixnode { + use super::*; + + #[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] + pub(crate) struct MixnodeSummary { + pub(crate) bonded: MixnodeSummaryBonded, + pub(crate) blacklisted: MixnodeSummaryBlacklisted, + pub(crate) historical: MixnodeSummaryHistorical, + } + + #[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] + pub(crate) struct MixnodeSummaryBonded { + pub(crate) count: i32, + pub(crate) active: i32, + pub(crate) inactive: i32, + pub(crate) reserve: i32, + pub(crate) last_updated_utc: String, + } + + #[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] + pub(crate) struct MixnodeSummaryBlacklisted { + pub(crate) count: i32, + pub(crate) last_updated_utc: String, + } + + #[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] + pub(crate) struct MixnodeSummaryHistorical { + pub(crate) count: i32, + pub(crate) last_updated_utc: String, + } +} + +pub(crate) mod gateway { + use super::*; + + #[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] + pub(crate) struct GatewaySummary { + pub(crate) bonded: GatewaySummaryBonded, + pub(crate) blacklisted: GatewaySummaryBlacklisted, + pub(crate) historical: GatewaySummaryHistorical, + pub(crate) explorer: GatewaySummaryExplorer, + } + + #[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] + pub(crate) struct GatewaySummaryExplorer { + pub(crate) count: i32, + pub(crate) last_updated_utc: String, + } + + #[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] + pub(crate) struct GatewaySummaryBonded { + pub(crate) count: i32, + pub(crate) last_updated_utc: String, + } + + #[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] + pub(crate) struct GatewaySummaryHistorical { + pub(crate) count: i32, + pub(crate) last_updated_utc: String, + } + + #[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] + pub(crate) struct GatewaySummaryBlacklisted { + pub(crate) count: i32, + pub(crate) last_updated_utc: String, + } +} + +#[allow(dead_code)] // not dead code, this is SQL data model +#[derive(Debug, Clone)] +pub struct TestRunDto { + pub id: i64, + pub gateway_id: i64, + pub status: i64, + pub timestamp_utc: i64, + pub ip_address: String, + pub log: String, +} + +#[derive(Debug, Clone, strum_macros::Display, EnumString, FromRepr, PartialEq)] +#[repr(u8)] +pub(crate) enum TestRunStatus { + Complete = 2, + InProgress = 1, + Queued = 0, +} + +#[derive(Debug, Clone)] +pub struct GatewayIdentityDto { + pub gateway_identity_key: String, + pub bonded: bool, +} + +#[allow(dead_code)] // it's not dead code but clippy doesn't detect usage in sqlx macros +#[derive(Debug, Clone)] +pub struct GatewayInfoDto { + pub id: i64, + pub gateway_identity_key: String, + pub self_described: Option, + pub explorer_pretty_bond: Option, +} diff --git a/nym-node-status-api/src/db/queries/gateways.rs b/nym-node-status-api/src/db/queries/gateways.rs new file mode 100644 index 0000000000..bcf9c2d6ca --- /dev/null +++ b/nym-node-status-api/src/db/queries/gateways.rs @@ -0,0 +1,180 @@ +use crate::{ + db::{ + models::{BondedStatusDto, GatewayDto, GatewayRecord}, + DbPool, + }, + http::models::Gateway, +}; +use futures_util::TryStreamExt; +use nym_validator_client::models::NymNodeDescription; +use sqlx::{pool::PoolConnection, Sqlite}; +use tracing::error; + +pub(crate) async fn select_gateway_identity( + conn: &mut PoolConnection, + gateway_pk: i64, +) -> anyhow::Result { + let record = sqlx::query!( + r#"SELECT + gateway_identity_key + FROM + gateways + WHERE + id = ?"#, + gateway_pk + ) + .fetch_one(conn.as_mut()) + .await?; + + Ok(record.gateway_identity_key) +} + +pub(crate) async fn insert_gateways( + pool: &DbPool, + gateways: Vec, +) -> anyhow::Result<()> { + let mut db = pool.acquire().await?; + for record in gateways { + sqlx::query!( + "INSERT INTO gateways + (gateway_identity_key, bonded, blacklisted, + self_described, explorer_pretty_bond, + last_updated_utc, performance) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(gateway_identity_key) DO UPDATE SET + bonded=excluded.bonded, + blacklisted=excluded.blacklisted, + self_described=excluded.self_described, + explorer_pretty_bond=excluded.explorer_pretty_bond, + last_updated_utc=excluded.last_updated_utc, + performance = excluded.performance;", + record.identity_key, + record.bonded, + record.blacklisted, + record.self_described, + record.explorer_pretty_bond, + record.last_updated_utc, + record.performance + ) + .execute(&mut *db) + .await?; + } + + Ok(()) +} + +pub(crate) async fn write_blacklisted_gateways_to_db<'a, I>( + pool: &DbPool, + gateways: I, +) -> anyhow::Result<()> +where + I: Iterator, +{ + let mut conn = pool.acquire().await?; + for gateway_identity_key in gateways { + sqlx::query!( + "UPDATE gateways + SET blacklisted = true + WHERE gateway_identity_key = ?;", + gateway_identity_key, + ) + .execute(&mut *conn) + .await?; + } + + Ok(()) +} + +/// Ensure all gateways that are set as bonded, are still bonded +pub(crate) async fn ensure_gateways_still_bonded( + pool: &DbPool, + gateways: &[&NymNodeDescription], +) -> anyhow::Result { + let bonded_gateways_rows = get_all_bonded_gateways_row_ids_by_status(pool, true).await?; + let unbonded_gateways_rows = bonded_gateways_rows.iter().filter(|v| { + !gateways + .iter() + .any(|bonded| *bonded.ed25519_identity_key().to_base58_string() == v.identity_key) + }); + + let recently_unbonded_gateways = unbonded_gateways_rows.to_owned().count(); + let last_updated_utc = chrono::offset::Utc::now().timestamp(); + let mut transaction = pool.begin().await?; + for row in unbonded_gateways_rows { + sqlx::query!( + "UPDATE gateways + SET bonded = ?, last_updated_utc = ? + WHERE id = ?;", + false, + last_updated_utc, + row.id, + ) + .execute(&mut *transaction) + .await?; + } + transaction.commit().await?; + + Ok(recently_unbonded_gateways) +} + +async fn get_all_bonded_gateways_row_ids_by_status( + pool: &DbPool, + status: bool, +) -> anyhow::Result> { + let mut conn = pool.acquire().await?; + let items = sqlx::query_as!( + BondedStatusDto, + r#"SELECT + id as "id!", + gateway_identity_key as "identity_key!", + bonded as "bonded: bool" + FROM gateways + WHERE bonded = ?"#, + status, + ) + .fetch(&mut *conn) + .try_collect::>() + .await?; + + Ok(items) +} + +pub(crate) async fn get_all_gateways(pool: &DbPool) -> anyhow::Result> { + let mut conn = pool.acquire().await?; + let items = sqlx::query_as!( + GatewayDto, + r#"SELECT + gw.gateway_identity_key as "gateway_identity_key!", + gw.bonded as "bonded: bool", + gw.blacklisted as "blacklisted: bool", + gw.performance as "performance!", + gw.self_described as "self_described?", + gw.explorer_pretty_bond as "explorer_pretty_bond?", + gw.last_probe_result as "last_probe_result?", + gw.last_probe_log as "last_probe_log?", + gw.last_testrun_utc as "last_testrun_utc?", + gw.last_updated_utc as "last_updated_utc!", + COALESCE(gd.moniker, "NA") as "moniker!", + COALESCE(gd.website, "NA") as "website!", + COALESCE(gd.security_contact, "NA") as "security_contact!", + COALESCE(gd.details, "NA") as "details!" + FROM gateways gw + LEFT JOIN gateway_description gd + ON gw.gateway_identity_key = gd.gateway_identity_key + ORDER BY gw.gateway_identity_key"#, + ) + .fetch(&mut *conn) + .try_collect::>() + .await?; + + let items: Vec = items + .into_iter() + .map(|item| item.try_into()) + .collect::>>() + .map_err(|e| { + error!("Conversion from DTO failed: {e}. Invalidly stored data?"); + e + })?; + tracing::trace!("Fetched {} gateways from DB", items.len()); + Ok(items) +} diff --git a/nym-node-status-api/src/db/queries/mod.rs b/nym-node-status-api/src/db/queries/mod.rs new file mode 100644 index 0000000000..fe22ec27aa --- /dev/null +++ b/nym-node-status-api/src/db/queries/mod.rs @@ -0,0 +1,15 @@ +mod gateways; +mod misc; +mod mixnodes; +mod summary; +pub(crate) mod testruns; + +pub(crate) use gateways::{ + ensure_gateways_still_bonded, get_all_gateways, insert_gateways, select_gateway_identity, + write_blacklisted_gateways_to_db, +}; +pub(crate) use misc::insert_summaries; +pub(crate) use mixnodes::{ + ensure_mixnodes_still_bonded, get_all_mixnodes, get_daily_stats, insert_mixnodes, +}; +pub(crate) use summary::{get_summary, get_summary_history}; diff --git a/nym-node-status-api/src/db/queries/testruns.rs b/nym-node-status-api/src/db/queries/testruns.rs new file mode 100644 index 0000000000..91a3a86b13 --- /dev/null +++ b/nym-node-status-api/src/db/queries/testruns.rs @@ -0,0 +1,184 @@ +use crate::db::DbPool; +use crate::http::models::TestrunAssignment; +use crate::{ + db::models::{TestRunDto, TestRunStatus}, + testruns::now_utc, +}; +use anyhow::Context; +use chrono::Duration; +use sqlx::{pool::PoolConnection, Sqlite}; + +pub(crate) async fn get_in_progress_testrun_by_id( + conn: &mut PoolConnection, + testrun_id: i64, +) -> anyhow::Result { + sqlx::query_as!( + TestRunDto, + r#"SELECT + id as "id!", + gateway_id as "gateway_id!", + status as "status!", + timestamp_utc as "timestamp_utc!", + ip_address as "ip_address!", + log as "log!" + FROM testruns + WHERE + id = ? + AND + status = ? + ORDER BY timestamp_utc"#, + testrun_id, + TestRunStatus::InProgress as i64, + ) + .fetch_one(conn.as_mut()) + .await + .context(format!("Couldn't retrieve testrun {testrun_id}")) +} + +pub(crate) async fn update_testruns_older_than(db: &DbPool, age: Duration) -> anyhow::Result { + let mut conn = db.acquire().await?; + let previous_run = now_utc() - age; + let cutoff_timestamp = previous_run.timestamp(); + + let res = sqlx::query!( + r#"UPDATE + testruns + SET + status = ? + WHERE + status = ? + AND + timestamp_utc < ? + "#, + TestRunStatus::Queued as i64, + TestRunStatus::InProgress as i64, + cutoff_timestamp + ) + .execute(conn.as_mut()) + .await?; + + let stale_testruns = res.rows_affected(); + if stale_testruns > 0 { + tracing::debug!( + "Refreshed {} stale testruns, scheduled before {} but not yet finished", + stale_testruns, + previous_run + ); + } + + Ok(stale_testruns) +} + +pub(crate) async fn get_oldest_testrun_and_make_it_pending( + conn: &mut PoolConnection, +) -> anyhow::Result> { + // find & mark as "In progress" in the same transaction to avoid race conditions + let returning = sqlx::query!( + r#"UPDATE testruns + SET status = ? + WHERE rowid = + ( + SELECT rowid + FROM testruns + WHERE status = ? + ORDER BY timestamp_utc asc + LIMIT 1 + ) + RETURNING + id as "id!", + gateway_id + "#, + TestRunStatus::InProgress as i64, + TestRunStatus::Queued as i64, + ) + .fetch_optional(conn.as_mut()) + .await?; + + if let Some(testrun) = returning { + let gw_identity = sqlx::query!( + r#" + SELECT + id, + gateway_identity_key + FROM gateways + WHERE id = ? + LIMIT 1"#, + testrun.gateway_id + ) + .fetch_one(conn.as_mut()) + .await?; + + Ok(Some(TestrunAssignment { + testrun_id: testrun.id, + gateway_identity_key: gw_identity.gateway_identity_key, + })) + } else { + Ok(None) + } +} + +pub(crate) async fn update_testrun_status( + conn: &mut PoolConnection, + testrun_id: i64, + status: TestRunStatus, +) -> anyhow::Result<()> { + let status = status as u32; + sqlx::query!( + "UPDATE testruns SET status = ? WHERE id = ?", + status, + testrun_id + ) + .execute(conn.as_mut()) + .await?; + + Ok(()) +} + +pub(crate) async fn update_gateway_last_probe_log( + conn: &mut PoolConnection, + gateway_pk: i64, + log: &str, +) -> anyhow::Result<()> { + sqlx::query!( + "UPDATE gateways SET last_probe_log = ? WHERE id = ?", + log, + gateway_pk + ) + .execute(conn.as_mut()) + .await + .map(drop) + .map_err(From::from) +} + +pub(crate) async fn update_gateway_last_probe_result( + conn: &mut PoolConnection, + gateway_pk: i64, + result: &str, +) -> anyhow::Result<()> { + sqlx::query!( + "UPDATE gateways SET last_probe_result = ? WHERE id = ?", + result, + gateway_pk + ) + .execute(conn.as_mut()) + .await + .map(drop) + .map_err(From::from) +} + +pub(crate) async fn update_gateway_score( + conn: &mut PoolConnection, + gateway_pk: i64, +) -> anyhow::Result<()> { + let now = now_utc().timestamp(); + sqlx::query!( + "UPDATE gateways SET last_testrun_utc = ?, last_updated_utc = ? WHERE id = ?", + now, + now, + gateway_pk + ) + .execute(conn.as_mut()) + .await + .map(drop) + .map_err(From::from) +} diff --git a/nym-node-status-api/src/http/api/mod.rs b/nym-node-status-api/src/http/api/mod.rs new file mode 100644 index 0000000000..4e4693ac1f --- /dev/null +++ b/nym-node-status-api/src/http/api/mod.rs @@ -0,0 +1,94 @@ +use anyhow::anyhow; +use axum::{response::Redirect, Router}; +use tokio::net::ToSocketAddrs; +use tower_http::{ + cors::CorsLayer, + trace::{DefaultOnResponse, TraceLayer}, +}; +use utoipa::OpenApi; +use utoipa_swagger_ui::SwaggerUi; + +use crate::http::{server::HttpServer, state::AppState}; + +pub(crate) mod gateways; +pub(crate) mod mixnodes; +pub(crate) mod services; +pub(crate) mod summary; +pub(crate) mod testruns; + +pub(crate) struct RouterBuilder { + unfinished_router: Router, +} + +impl RouterBuilder { + pub(crate) fn with_default_routes() -> Self { + let router = Router::new() + .merge( + SwaggerUi::new("/swagger") + .url("/api-docs/openapi.json", super::api_docs::ApiDoc::openapi()), + ) + .route( + "/", + axum::routing::get(|| async { Redirect::permanent("/swagger") }), + ) + .nest( + "/v2", + Router::new() + .nest("/gateways", gateways::routes()) + .nest("/mixnodes", mixnodes::routes()) + .nest("/services", services::routes()) + .nest("/summary", summary::routes()), + ) + .nest( + "/internal", + Router::new().nest("/testruns", testruns::routes()), + ); + + Self { + unfinished_router: router, + } + } + + pub(crate) fn with_state(self, state: AppState) -> RouterWithState { + RouterWithState { + router: self.finalize_routes().with_state(state), + } + } + + fn finalize_routes(self) -> Router { + // layers added later wrap earlier layers + self.unfinished_router + // CORS layer needs to wrap other API layers + .layer(setup_cors()) + // logger should be outermost layer + .layer( + TraceLayer::new_for_http() + .on_response(DefaultOnResponse::new().level(tracing::Level::DEBUG)), + ) + } +} + +pub(crate) struct RouterWithState { + router: Router, +} + +impl RouterWithState { + pub(crate) async fn build_server( + self, + bind_address: A, + ) -> anyhow::Result { + tokio::net::TcpListener::bind(bind_address) + .await + .map(|listener| HttpServer::new(self.router, listener)) + .map_err(|err| anyhow!("Couldn't bind to address due to {}", err)) + } +} + +fn setup_cors() -> CorsLayer { + use axum::http::Method; + CorsLayer::new() + .allow_origin(tower_http::cors::Any) + .allow_methods([Method::POST, Method::GET, Method::PATCH, Method::OPTIONS]) + .allow_headers(tower_http::cors::Any) + .allow_credentials(false) +} diff --git a/nym-node-status-api/src/http/api/testruns.rs b/nym-node-status-api/src/http/api/testruns.rs new file mode 100644 index 0000000000..e55e462110 --- /dev/null +++ b/nym-node-status-api/src/http/api/testruns.rs @@ -0,0 +1,125 @@ +use axum::extract::DefaultBodyLimit; +use axum::Json; +use axum::{ + extract::{Path, State}, + Router, +}; +use reqwest::StatusCode; + +use crate::db::models::TestRunStatus; +use crate::db::queries; +use crate::{ + db, + http::{ + error::{HttpError, HttpResult}, + models::TestrunAssignment, + state::AppState, + }, +}; + +// TODO dz consider adding endpoint to trigger testrun scan for a given gateway_id +// like in H< src/http/testruns.rs + +pub(crate) fn routes() -> Router { + Router::new() + .route("/", axum::routing::get(request_testrun)) + .route("/:testrun_id", axum::routing::post(submit_testrun)) + .layer(DefaultBodyLimit::max(1024 * 1024 * 5)) +} + +#[tracing::instrument(level = "debug", skip_all)] +async fn request_testrun(State(state): State) -> HttpResult> { + // TODO dz log agent's key + // TODO dz log agent's network probe version + tracing::debug!("Agent requested testrun"); + + let db = state.db_pool(); + let mut conn = db + .acquire() + .await + .map_err(HttpError::internal_with_logging)?; + + return match db::queries::testruns::get_oldest_testrun_and_make_it_pending(&mut conn).await { + Ok(res) => { + if let Some(testrun) = res { + tracing::debug!( + "🏃‍ Assigned testrun row_id {} gateway {} to agent", + &testrun.testrun_id, + testrun.gateway_identity_key + ); + Ok(Json(testrun)) + } else { + Err(HttpError::no_available_testruns()) + } + } + Err(err) => Err(HttpError::internal_with_logging(err)), + }; +} + +// TODO dz accept testrun_id as query parameter +#[tracing::instrument(level = "debug", skip_all)] +async fn submit_testrun( + Path(testrun_id): Path, + State(state): State, + body: String, +) -> HttpResult { + let db = state.db_pool(); + let mut conn = db + .acquire() + .await + .map_err(HttpError::internal_with_logging)?; + + let testrun = queries::testruns::get_in_progress_testrun_by_id(&mut conn, testrun_id) + .await + .map_err(|e| { + tracing::error!("{e}"); + HttpError::not_found(testrun_id) + })?; + + let gw_identity = db::queries::select_gateway_identity(&mut conn, testrun.gateway_id) + .await + .map_err(|_| { + // should never happen: + HttpError::internal_with_logging("No gateway found for testrun") + })?; + tracing::debug!( + "Agent submitted testrun {} for gateway {} ({} bytes)", + testrun_id, + gw_identity, + body.len(), + ); + + // TODO dz this should be part of a single transaction: commit after everything is done + queries::testruns::update_testrun_status(&mut conn, testrun_id, TestRunStatus::Complete) + .await + .map_err(HttpError::internal_with_logging)?; + queries::testruns::update_gateway_last_probe_log(&mut conn, testrun.gateway_id, &body) + .await + .map_err(HttpError::internal_with_logging)?; + let result = get_result_from_log(&body); + queries::testruns::update_gateway_last_probe_result(&mut conn, testrun.gateway_id, &result) + .await + .map_err(HttpError::internal_with_logging)?; + queries::testruns::update_gateway_score(&mut conn, testrun.gateway_id) + .await + .map_err(HttpError::internal_with_logging)?; + // TODO dz log gw identity key + + tracing::info!( + "✅ Testrun row_id {} for gateway {} complete", + testrun.id, + gw_identity + ); + + Ok(StatusCode::CREATED) +} + +fn get_result_from_log(log: &str) -> String { + let re = regex::Regex::new(r"\n\{\s").unwrap(); + let result: Vec<_> = re.splitn(log, 2).collect(); + if result.len() == 2 { + let res = format!("{} {}", "{", result[1]).to_string(); + return res; + } + "".to_string() +} diff --git a/nym-node-status-api/src/http/error.rs b/nym-node-status-api/src/http/error.rs new file mode 100644 index 0000000000..808ace9cec --- /dev/null +++ b/nym-node-status-api/src/http/error.rs @@ -0,0 +1,48 @@ +use std::fmt::Display; + +pub(crate) type HttpResult = Result; + +pub(crate) struct HttpError { + message: String, + status: axum::http::StatusCode, +} + +impl HttpError { + pub(crate) fn invalid_input(msg: impl Display) -> Self { + Self { + message: serde_json::json!({"message": msg.to_string()}).to_string(), + status: axum::http::StatusCode::BAD_REQUEST, + } + } + + pub(crate) fn internal_with_logging(msg: impl Display) -> Self { + tracing::error!("{}", msg.to_string()); + Self::internal() + } + + pub(crate) fn internal() -> Self { + Self { + message: serde_json::json!({"message": "Internal server error"}).to_string(), + status: axum::http::StatusCode::INTERNAL_SERVER_ERROR, + } + } + + pub(crate) fn no_available_testruns() -> Self { + Self { + message: serde_json::json!({"message": "No available testruns"}).to_string(), + status: axum::http::StatusCode::SERVICE_UNAVAILABLE, + } + } + pub(crate) fn not_found(msg: impl Display) -> Self { + Self { + message: serde_json::json!({"message": msg.to_string()}).to_string(), + status: axum::http::StatusCode::NOT_FOUND, + } + } +} + +impl axum::response::IntoResponse for HttpError { + fn into_response(self) -> axum::response::Response { + (self.status, self.message).into_response() + } +} diff --git a/nym-node-status-api/src/http/models.rs b/nym-node-status-api/src/http/models.rs new file mode 100644 index 0000000000..82011fc286 --- /dev/null +++ b/nym-node-status-api/src/http/models.rs @@ -0,0 +1,76 @@ +use nym_node_requests::api::v1::node::models::NodeDescription; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +pub(crate) use nym_common_models::ns_api::TestrunAssignment; + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct Gateway { + pub gateway_identity_key: String, + pub bonded: bool, + pub blacklisted: bool, + pub performance: u8, + pub self_described: Option, + pub explorer_pretty_bond: Option, + pub description: NodeDescription, + pub last_probe_result: Option, + pub last_probe_log: Option, + pub last_testrun_utc: Option, + pub last_updated_utc: String, + pub routing_score: f32, + pub config_score: u32, +} + +#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] +pub struct GatewaySkinny { + pub gateway_identity_key: String, + pub self_described: Option, + pub explorer_pretty_bond: Option, + pub last_probe_result: Option, + pub last_testrun_utc: Option, + pub last_updated_utc: String, + pub routing_score: f32, + pub config_score: u32, + pub performance: u8, +} + +#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] +pub struct Mixnode { + pub mix_id: u32, + pub bonded: bool, + pub blacklisted: bool, + pub is_dp_delegatee: bool, + pub total_stake: i64, + pub full_details: Option, + pub self_described: Option, + pub description: NodeDescription, + pub last_updated_utc: String, +} + +#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] +pub struct DailyStats { + pub date_utc: String, + pub total_packets_received: i64, + pub total_packets_sent: i64, + pub total_packets_dropped: i64, + pub total_stake: i64, +} + +#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] +pub struct Service { + pub gateway_identity_key: String, + pub last_updated_utc: String, + pub routing_score: f32, + pub service_provider_client_id: Option, + pub ip_address: Option, + pub hostname: Option, + pub mixnet_websockets: Option, + pub last_successful_ping_utc: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, ToSchema)] +pub(crate) struct SummaryHistory { + pub date: String, + pub value_json: serde_json::Value, + pub timestamp_utc: String, +} diff --git a/nym-node-status-api/src/monitor/mod.rs b/nym-node-status-api/src/monitor/mod.rs new file mode 100644 index 0000000000..a9f3761f35 --- /dev/null +++ b/nym-node-status-api/src/monitor/mod.rs @@ -0,0 +1,511 @@ +use crate::db::models::{ + gateway, mixnode, GatewayRecord, MixnodeRecord, NetworkSummary, GATEWAYS_BLACKLISTED_COUNT, + GATEWAYS_BONDED_COUNT, GATEWAYS_EXPLORER_COUNT, GATEWAYS_HISTORICAL_COUNT, + MIXNODES_BLACKLISTED_COUNT, MIXNODES_BONDED_ACTIVE, MIXNODES_BONDED_COUNT, + MIXNODES_BONDED_INACTIVE, MIXNODES_BONDED_RESERVE, MIXNODES_HISTORICAL_COUNT, +}; +use crate::db::{queries, DbPool}; +use anyhow::anyhow; +use cosmwasm_std::Decimal; +use nym_explorer_client::{ExplorerClient, PrettyDetailedGatewayBond}; +use nym_network_defaults::NymNetworkDetails; +use nym_validator_client::client::NymApiClientExt; +use nym_validator_client::models::{ + LegacyDescribedMixNode, MixNodeBondAnnotated, NymNodeDescription, +}; +use nym_validator_client::nym_nodes::SkimmedNode; +use nym_validator_client::nyxd::contract_traits::PagedMixnetQueryClient; +use nym_validator_client::nyxd::{AccountId, NyxdClient}; +use nym_validator_client::NymApiClient; +use reqwest::Url; +use std::collections::HashSet; +use std::str::FromStr; +use tokio::time::Duration; +use tracing::instrument; + +// TODO dz should be configurable +const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60); + +static DELEGATION_PROGRAM_WALLET: &str = "n1rnxpdpx3kldygsklfft0gech7fhfcux4zst5lw"; + +// TODO dz: query many NYM APIs: +// multiple instances running directory cache, ask sachin +#[instrument(level = "debug", name = "data_monitor", skip_all)] +pub(crate) async fn spawn_in_background( + db_pool: DbPool, + explorer_client_timeout: Duration, + nym_api_client_timeout: Duration, + nyxd_addr: &Url, + refresh_interval: Duration, +) { + let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env(); + + loop { + tracing::info!("Refreshing node info..."); + + if let Err(e) = run( + &db_pool, + &network_defaults, + explorer_client_timeout, + nym_api_client_timeout, + nyxd_addr, + ) + .await + { + tracing::error!( + "Monitor run failed: {e}, retrying in {}s...", + FAILURE_RETRY_DELAY.as_secs() + ); + // TODO dz implement some sort of backoff + tokio::time::sleep(FAILURE_RETRY_DELAY).await; + } else { + tracing::info!( + "Info successfully collected, sleeping for {}s...", + refresh_interval.as_secs() + ); + tokio::time::sleep(refresh_interval).await; + } + } +} + +async fn run( + pool: &DbPool, + network_details: &NymNetworkDetails, + explorer_client_timeout: Duration, + nym_api_client_timeout: Duration, + nyxd_addr: &Url, +) -> anyhow::Result<()> { + let default_api_url = network_details + .endpoints + .first() + .expect("rust sdk mainnet default incorrectly configured") + .api_url() + .clone() + .expect("rust sdk mainnet default missing api_url"); + let default_explorer_url = network_details.explorer_api.clone().map(|url| { + url.parse() + .expect("rust sdk mainnet default explorer url not parseable") + }); + + // TODO dz replace explorer api with ipinfo.io + let default_explorer_url = + default_explorer_url.expect("explorer url missing in network config"); + let explorer_client = + ExplorerClient::new_with_timeout(default_explorer_url, explorer_client_timeout)?; + let explorer_gateways = explorer_client + .unstable_get_gateways() + .await + .log_error("unstable_get_gateways")?; + + let api_client = NymApiClient::new_with_timeout(default_api_url, nym_api_client_timeout); + + let all_nodes = api_client + .get_all_described_nodes() + .await + .log_error("get_all_described_nodes")?; + tracing::debug!("Fetched {} total nodes", all_nodes.len()); + + let gateways = all_nodes + .iter() + .filter(|node| node.description.declared_role.entry) + .collect::>(); + tracing::debug!("Of those, {} gateways", gateways.len()); + for gw in gateways.iter() { + tracing::debug!("{}", gw.ed25519_identity_key().to_base58_string()); + } + + let mixnodes = all_nodes + .iter() + .filter(|node| node.description.declared_role.mixnode) + .collect::>(); + tracing::debug!("Of those, {} mixnodes", mixnodes.len()); + + log_gw_in_explorer_not_api(explorer_gateways.as_slice(), gateways.as_slice()); + + let all_skimmed_nodes = api_client + .get_all_basic_nodes(None) + .await + .log_error("get_all_basic_nodes")?; + + let mixnodes = api_client + .get_cached_mixnodes() + .await + .log_error("get_cached_mixnodes")?; + tracing::debug!("Fetched {} mixnodes", mixnodes.len()); + + // let gateways_blacklisted = gateways.iter().filter(|gw|gw.) + let gateways_blacklisted = all_skimmed_nodes + .iter() + .filter_map(|node| { + if node.performance.round_to_integer() <= 50 && node.supported_roles.entry { + Some(node.ed25519_identity_pubkey.to_base58_string()) + } else { + None + } + }) + .collect::>(); + + // Cached mixnodes don't include blacklisted nodes + // We need that to calculate the total locked tokens later + let mixnodes = api_client + .nym_api + .get_mixnodes_detailed_unfiltered() + .await + .log_error("get_mixnodes_detailed_unfiltered")?; + let mixnodes_described = api_client + .nym_api + .get_mixnodes_described() + .await + .log_error("get_mixnodes_described")?; + let mixnodes_active = api_client + .nym_api + .get_active_mixnodes() + .await + .log_error("get_active_mixnodes")?; + let delegation_program_members = + get_delegation_program_details(network_details, nyxd_addr).await?; + + // keep stats for later + let count_bonded_mixnodes = mixnodes.len(); + let count_bonded_gateways = gateways.len(); + let count_explorer_gateways = explorer_gateways.len(); + let count_bonded_mixnodes_active = mixnodes_active.len(); + + let gateway_records = prepare_gateway_data( + &gateways, + &gateways_blacklisted, + explorer_gateways, + all_skimmed_nodes, + )?; + queries::insert_gateways(pool, gateway_records) + .await + .map(|_| { + tracing::debug!("Gateway info written to DB!"); + })?; + + // instead of counting blacklisted GWs returned from API cache, count from the active set + let count_gateways_blacklisted = gateways + .iter() + .filter(|gw| { + let gw_identity = gw.ed25519_identity_key().to_base58_string(); + gateways_blacklisted.contains(&gw_identity) + }) + .count(); + + if count_gateways_blacklisted > 0 { + queries::write_blacklisted_gateways_to_db(pool, gateways_blacklisted.iter()) + .await + .map(|_| { + tracing::debug!( + "Gateway blacklist info written to DB! {} blacklisted by Nym API", + count_gateways_blacklisted + ) + })?; + } + + let mixnode_records = + prepare_mixnode_data(&mixnodes, mixnodes_described, delegation_program_members)?; + queries::insert_mixnodes(pool, mixnode_records) + .await + .map(|_| { + tracing::debug!("Mixnode info written to DB!"); + })?; + + let count_mixnodes_blacklisted = mixnodes.iter().filter(|elem| elem.blacklisted).count(); + + let recently_unbonded_gateways = queries::ensure_gateways_still_bonded(pool, &gateways).await?; + let recently_unbonded_mixnodes = queries::ensure_mixnodes_still_bonded(pool, &mixnodes).await?; + + let count_bonded_mixnodes_reserve = 0; // TODO: NymAPI doesn't report the reserve set size + let count_bonded_mixnodes_inactive = count_bonded_mixnodes - count_bonded_mixnodes_active; + + let (all_historical_gateways, all_historical_mixnodes) = calculate_stats(pool).await?; + + // + // write summary keys and values to table + // + + let nodes_summary = vec![ + (MIXNODES_BONDED_COUNT, &count_bonded_mixnodes), + (MIXNODES_BONDED_ACTIVE, &count_bonded_mixnodes_active), + (MIXNODES_BONDED_INACTIVE, &count_bonded_mixnodes_inactive), + (MIXNODES_BONDED_RESERVE, &count_bonded_mixnodes_reserve), + (MIXNODES_BLACKLISTED_COUNT, &count_mixnodes_blacklisted), + (GATEWAYS_BONDED_COUNT, &count_bonded_gateways), + (GATEWAYS_EXPLORER_COUNT, &count_explorer_gateways), + (MIXNODES_HISTORICAL_COUNT, &all_historical_mixnodes), + (GATEWAYS_HISTORICAL_COUNT, &all_historical_gateways), + (GATEWAYS_BLACKLISTED_COUNT, &count_gateways_blacklisted), + ]; + + let last_updated = chrono::offset::Utc::now(); + let last_updated_utc = last_updated.timestamp().to_string(); + let network_summary = NetworkSummary { + mixnodes: mixnode::MixnodeSummary { + bonded: mixnode::MixnodeSummaryBonded { + count: count_bonded_mixnodes.cast_checked()?, + active: count_bonded_mixnodes_active.cast_checked()?, + inactive: count_bonded_mixnodes_inactive.cast_checked()?, + reserve: count_bonded_mixnodes_reserve.cast_checked()?, + last_updated_utc: last_updated_utc.to_owned(), + }, + blacklisted: mixnode::MixnodeSummaryBlacklisted { + count: count_mixnodes_blacklisted.cast_checked()?, + last_updated_utc: last_updated_utc.to_owned(), + }, + historical: mixnode::MixnodeSummaryHistorical { + count: all_historical_mixnodes.cast_checked()?, + last_updated_utc: last_updated_utc.to_owned(), + }, + }, + gateways: gateway::GatewaySummary { + bonded: gateway::GatewaySummaryBonded { + count: count_bonded_gateways.cast_checked()?, + last_updated_utc: last_updated_utc.to_owned(), + }, + blacklisted: gateway::GatewaySummaryBlacklisted { + count: count_gateways_blacklisted.cast_checked()?, + last_updated_utc: last_updated_utc.to_owned(), + }, + historical: gateway::GatewaySummaryHistorical { + count: all_historical_gateways.cast_checked()?, + last_updated_utc: last_updated_utc.to_owned(), + }, + explorer: gateway::GatewaySummaryExplorer { + count: count_explorer_gateways.cast_checked()?, + last_updated_utc: last_updated_utc.to_owned(), + }, + }, + }; + + queries::insert_summaries(pool, &nodes_summary, &network_summary, last_updated).await?; + + let mut log_lines: Vec = vec![]; + for (key, value) in nodes_summary.iter() { + log_lines.push(format!("{} = {}", key, value)); + } + log_lines.push(format!( + "recently_unbonded_mixnodes = {}", + recently_unbonded_mixnodes + )); + log_lines.push(format!( + "recently_unbonded_gateways = {}", + recently_unbonded_gateways + )); + + tracing::info!("Directory summary: \n{}", log_lines.join("\n")); + + Ok(()) +} + +fn prepare_gateway_data( + gateways: &[&NymNodeDescription], + gateways_blacklisted: &HashSet, + explorer_gateways: Vec, + skimmed_gateways: Vec, +) -> anyhow::Result> { + let mut gateway_records = Vec::new(); + + for gateway in gateways { + let identity_key = gateway.ed25519_identity_key().to_base58_string(); + let bonded = true; + let last_updated_utc = chrono::offset::Utc::now().timestamp(); + let blacklisted = gateways_blacklisted.contains(&identity_key); + + let self_described = serde_json::to_string(&gateway.description)?; + + let explorer_pretty_bond = explorer_gateways + .iter() + .find(|g| g.gateway.identity_key.eq(&identity_key)); + let explorer_pretty_bond = explorer_pretty_bond.and_then(|g| serde_json::to_string(g).ok()); + + let performance = skimmed_gateways + .iter() + .find(|g| { + g.ed25519_identity_pubkey + .to_base58_string() + .eq(&identity_key) + }) + .map(|g| g.performance) + .unwrap_or_default() + .round_to_integer(); + + gateway_records.push(GatewayRecord { + identity_key: identity_key.to_owned(), + bonded, + blacklisted, + self_described, + explorer_pretty_bond, + last_updated_utc, + performance, + }); + } + + Ok(gateway_records) +} + +fn prepare_mixnode_data( + mixnodes: &[MixNodeBondAnnotated], + mixnodes_described: Vec, + delegation_program_members: Vec, +) -> anyhow::Result> { + let mut mixnode_records = Vec::new(); + + for mixnode in mixnodes { + let mix_id = mixnode.mix_id(); + let identity_key = mixnode.identity_key(); + let bonded = true; + let total_stake = decimal_to_i64(mixnode.mixnode_details.total_stake()); + let blacklisted = mixnode.blacklisted; + let node_info = mixnode.mix_node(); + let host = node_info.host.clone(); + let http_port = node_info.http_api_port; + // Contains all the information including what's above + let full_details = serde_json::to_string(&mixnode)?; + + let mixnode_described = mixnodes_described.iter().find(|m| m.bond.mix_id == mix_id); + let self_described = mixnode_described.and_then(|v| serde_json::to_string(v).ok()); + let is_dp_delegatee = delegation_program_members.contains(&mix_id); + + let last_updated_utc = chrono::offset::Utc::now().timestamp(); + + mixnode_records.push(MixnodeRecord { + mix_id, + identity_key: identity_key.to_owned(), + bonded, + total_stake, + host, + http_port, + blacklisted, + full_details, + self_described, + last_updated_utc, + is_dp_delegatee, + }); + } + + Ok(mixnode_records) +} + +fn log_gw_in_explorer_not_api( + explorer: &[PrettyDetailedGatewayBond], + api_gateways: &[&NymNodeDescription], +) { + let api_gateways = api_gateways + .iter() + .map(|gw| gw.ed25519_identity_key().to_base58_string()) + .collect::>(); + let explorer_only = explorer + .iter() + .filter(|gw| !api_gateways.contains(&gw.gateway.identity_key.to_string())) + .collect::>(); + + tracing::debug!( + "Gateways listed by explorer but not by Nym API: {}", + explorer_only.len() + ); + for gw in explorer_only.iter() { + tracing::debug!("{}", gw.gateway.identity_key.to_string()); + } +} + +// TODO dz is there a common monorepo place this can be put? +pub trait NumericalCheckedCast +where + T: TryFrom, + >::Error: std::error::Error, + Self: std::fmt::Display + Copy, +{ + fn cast_checked(self) -> anyhow::Result { + T::try_from(self).map_err(|e| { + anyhow::anyhow!( + "Couldn't cast {} to {}: {}", + self, + std::any::type_name::(), + e + ) + }) + } +} + +impl NumericalCheckedCast for T +where + U: TryFrom, + >::Error: std::error::Error, + T: std::fmt::Display + Copy, +{ +} + +async fn calculate_stats(pool: &DbPool) -> anyhow::Result<(usize, usize)> { + let mut conn = pool.acquire().await?; + + let all_historical_gateways = sqlx::query_scalar!(r#"SELECT count(id) FROM gateways"#) + .fetch_one(&mut *conn) + .await? + .cast_checked()?; + + let all_historical_mixnodes = sqlx::query_scalar!(r#"SELECT count(id) FROM mixnodes"#) + .fetch_one(&mut *conn) + .await? + .cast_checked()?; + + Ok((all_historical_gateways, all_historical_mixnodes)) +} + +async fn get_delegation_program_details( + network_details: &NymNetworkDetails, + nyxd_addr: &Url, +) -> anyhow::Result> { + let config = nym_validator_client::nyxd::Config::try_from_nym_network_details(network_details)?; + + let client = NyxdClient::connect(config, nyxd_addr.as_str()) + .map_err(|err| anyhow::anyhow!("Couldn't connect: {}", err))?; + + let account_id = AccountId::from_str(DELEGATION_PROGRAM_WALLET) + .map_err(|e| anyhow!("Invalid bech32 address: {}", e))?; + + let delegations = client.get_all_delegator_delegations(&account_id).await?; + + let mix_ids: Vec = delegations + .iter() + .map(|delegation| delegation.node_id) + .collect(); + + Ok(mix_ids) +} + +fn decimal_to_i64(decimal: Decimal) -> i64 { + // Convert the underlying Uint128 to a u128 + let atomics = decimal.atomics().u128(); + let precision = 1_000_000_000_000_000_000u128; + + // Get the fractional part + let fractional = atomics % precision; + + // Get the integer part + let integer = atomics / precision; + + // Combine them into a float + let float_value = integer as f64 + (fractional as f64 / 1_000_000_000_000_000_000_f64); + + // Limit to 6 decimal places + let rounded_value = (float_value * 1_000_000.0).round() / 1_000_000.0; + + rounded_value as i64 +} + +trait LogError { + fn log_error(self, msg: &str) -> Result; +} + +impl LogError for anyhow::Result +where + E: std::error::Error, +{ + fn log_error(self, msg: &str) -> Result { + if let Err(e) = &self { + tracing::error!("[{msg}]:\t{e}"); + } + self + } +} diff --git a/nym-node-status-api/src/testruns/mod.rs b/nym-node-status-api/src/testruns/mod.rs new file mode 100644 index 0000000000..f487523f36 --- /dev/null +++ b/nym-node-status-api/src/testruns/mod.rs @@ -0,0 +1,86 @@ +use crate::db::models::GatewayIdentityDto; +use crate::db::DbPool; +use futures_util::TryStreamExt; +use std::time::Duration; +use tracing::instrument; + +pub(crate) mod models; +mod queue; +pub(crate) use queue::now_utc; + +pub(crate) async fn spawn(pool: DbPool, refresh_interval: Duration) { + tokio::spawn(async move { + loop { + if let Err(e) = refresh_stale_testruns(&pool, refresh_interval).await { + tracing::error!("{e}"); + } + + if let Err(e) = run(&pool).await { + tracing::error!("Assigning testruns failed: {}", e); + } + tracing::debug!("Sleeping for {}s...", refresh_interval.as_secs()); + tokio::time::sleep(refresh_interval).await; + } + }); +} + +// TODO dz make number of max agents configurable + +#[instrument(level = "debug", name = "testrun_queue", skip_all)] +async fn run(pool: &DbPool) -> anyhow::Result<()> { + tracing::info!("Spawning testruns..."); + if pool.is_closed() { + tracing::debug!("DB pool closed, returning early"); + return Ok(()); + } + + let mut conn = pool.acquire().await?; + + let gateways = sqlx::query_as!( + GatewayIdentityDto, + r#"SELECT + gateway_identity_key as "gateway_identity_key!", + bonded as "bonded: bool" + FROM gateways + ORDER BY last_testrun_utc"#, + ) + .fetch(&mut *conn) + .try_collect::>() + .await?; + + // TODO dz this filtering could be done in SQL + let gateways: Vec = gateways.into_iter().filter(|g| g.bonded).collect(); + + tracing::debug!("Trying to queue {} testruns", gateways.len()); + let mut testruns_created = 0; + for gateway in gateways { + if let Err(e) = queue::try_queue_testrun( + &mut conn, + gateway.gateway_identity_key.clone(), + // TODO dz read from config + "127.0.0.1".to_string(), + ) + .await + // TODO dz measure how many were actually inserted and how many were skipped + { + tracing::debug!( + "Skipping test for identity {} with error {}", + &gateway.gateway_identity_key, + e + ); + } else { + testruns_created += 1; + } + } + tracing::debug!("{} testruns queued in total", testruns_created); + + Ok(()) +} + +#[instrument(level = "debug", skip_all)] +async fn refresh_stale_testruns(pool: &DbPool, refresh_interval: Duration) -> anyhow::Result<()> { + let chrono_duration = chrono::Duration::from_std(refresh_interval)?; + crate::db::queries::testruns::update_testruns_older_than(pool, chrono_duration).await?; + + Ok(()) +} diff --git a/nym-node-status-api/src/testruns/queue.rs b/nym-node-status-api/src/testruns/queue.rs new file mode 100644 index 0000000000..88804fff1b --- /dev/null +++ b/nym-node-status-api/src/testruns/queue.rs @@ -0,0 +1,118 @@ +use crate::db::models::{GatewayInfoDto, TestRunDto, TestRunStatus}; +use crate::testruns::models::TestRun; +use anyhow::anyhow; +use chrono::DateTime; +use futures_util::TryStreamExt; +use sqlx::pool::PoolConnection; +use sqlx::Sqlite; +use std::time::SystemTime; + +pub(crate) async fn try_queue_testrun( + conn: &mut PoolConnection, + identity_key: String, + ip_address: String, +) -> anyhow::Result { + let timestamp = now_utc().timestamp(); + let timestamp_pretty = now_utc_as_rfc3339(); + + let items = sqlx::query_as!( + GatewayInfoDto, + r#"SELECT + id as "id!", + gateway_identity_key as "gateway_identity_key!", + self_described as "self_described?", + explorer_pretty_bond as "explorer_pretty_bond?" + FROM gateways + WHERE gateway_identity_key = ? + ORDER BY gateway_identity_key + LIMIT 1"#, + identity_key, + ) + // TODO dz shoudl call .fetch_one + // TODO dz replace this in other queries as well + .fetch(conn.as_mut()) + .try_collect::>() + .await?; + + let gateway = items + .iter() + .find(|g| g.gateway_identity_key == identity_key); + + // TODO dz if let Some() = gateway.first() ... + if gateway.is_none() { + return Err(anyhow!("Unknown gateway {identity_key}")); + } + let gateway_id = gateway.unwrap().id; + + // + // check if there is already a test run for this gateway + // + let items = sqlx::query_as!( + TestRunDto, + r#"SELECT + id as "id!", + gateway_id as "gateway_id!", + status as "status!", + timestamp_utc as "timestamp_utc!", + ip_address as "ip_address!", + log as "log!" + FROM testruns + WHERE gateway_id = ? AND status != 2 + ORDER BY id DESC + LIMIT 1"#, + gateway_id, + ) + .fetch(conn.as_mut()) + .try_collect::>() + .await?; + + if !items.is_empty() { + let testrun = items.first().unwrap(); + return Ok(TestRun { + id: testrun.id as u32, + identity_key, + status: format!( + "{}", + TestRunStatus::from_repr(testrun.status as u8).unwrap() + ), + log: testrun.log.clone(), + }); + } + + // + // save test run + // + let status = TestRunStatus::Queued as u32; + let log = format!( + "Test for {identity_key} requested at {} UTC\n\n", + timestamp_pretty + ); + + let id = sqlx::query!( + "INSERT INTO testruns (gateway_id, status, ip_address, timestamp_utc, log) VALUES (?, ?, ?, ?, ?)", + gateway_id, + status, + ip_address, + timestamp, + log, + ) + .execute(conn.as_mut()) + .await? + .last_insert_rowid(); + + Ok(TestRun { + id: id as u32, + identity_key, + status: format!("{}", TestRunStatus::Queued), + log, + }) +} + +// TODO dz do we need these? +pub fn now_utc() -> DateTime { + SystemTime::now().into() +} + +pub fn now_utc_as_rfc3339() -> String { + now_utc().to_rfc3339() +}