Skip to content

Commit

Permalink
feat: introduce standalone runtime daemon (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Oct 27, 2024
1 parent 8b086df commit dc7c9a9
Show file tree
Hide file tree
Showing 15 changed files with 790 additions and 60 deletions.
496 changes: 442 additions & 54 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ members = [
"examples/minter/offchain",
"balius-sdk",
"balius-runtime",
"balius-macros",
"balius-macros", "baliusd",
]
1 change: 1 addition & 0 deletions balius-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ hex = "0.4.3"
itertools = "0.13.0"
async-trait = "0.1.83"
utxorpc = { version = "0.7.1", optional = true }
tokio-util = "0.7.12"

[dev-dependencies]
tokio = "1.40.0"
91 changes: 91 additions & 0 deletions balius-runtime/src/drivers/jsonrpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use warp::Filter as _;

use crate::{Error, Runtime};

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Config {
pub listen_address: String,
}

#[derive(Deserialize)]
struct Request {
pub id: Option<String>,
pub method: String,
pub params: serde_json::Value,
}

#[derive(Serialize)]
struct ErrorResponse {
error: String,
}

fn parse_request(body: serde_json::Value) -> Result<Request, ErrorResponse> {
match serde_json::from_value(body) {
Ok(x) => Ok(x),
Err(x) => Err(ErrorResponse {
error: x.to_string(),
}),
}
}

pub async fn serve(
config: Config,
runtime: Runtime,
cancel: CancellationToken,
) -> Result<(), Error> {
let filter = warp::any()
.map(move || runtime.clone())
.and(warp::path::param())
.and(warp::post())
.and(warp::body::json())
.then(
|runtime: Runtime, worker: String, body: serde_json::Value| async move {
let request = match parse_request(body) {
Ok(x) => x,
Err(err) => return warp::reply::json(&err),
};

debug!(
worker,
id = request.id,
method = request.method,
"handling request"
);

let reply = runtime
.handle_request(&worker, &request.method, request.params)
.await;

match reply {
Ok(x) => {
debug!(worker, id = request.id, "request successful");
warp::reply::json(&x)
}
Err(err) => {
error!(worker, id = request.id, "request failed");
warp::reply::json(&ErrorResponse {
error: err.to_string(),
})
}
}
},
);

let address: SocketAddr = config
.listen_address
.parse()
.map_err(|x: std::net::AddrParseError| Error::Config(x.to_string()))?;

let (addr, server) =
warp::serve(filter).bind_with_graceful_shutdown(address, cancel.cancelled_owned());

tracing::info!(%addr, "Json-RPC server listening");

server.await;

Ok(())
}
1 change: 1 addition & 0 deletions balius-runtime/src/drivers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod jsonrpc;
2 changes: 2 additions & 0 deletions balius-runtime/src/ledgers/u5c.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use serde::{Deserialize, Serialize};
use utxorpc::CardanoQueryClient;

use crate::wit::balius::app::ledger as wit;
Expand Down Expand Up @@ -77,6 +78,7 @@ impl From<utxorpc::UtxoPage<utxorpc::Cardano>> for wit::UtxoPage {
}
}

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Config {
pub endpoint_url: String,
pub api_key: String,
Expand Down
10 changes: 7 additions & 3 deletions balius-runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use pallas::ledger::traverse::MultiEraBlock;
use serde_json::json;
use tokio::sync::Mutex;
use std::{
collections::{HashMap, HashSet},
path::Path,
sync::Arc,
};
use thiserror::Error;
use tokio::sync::Mutex;

mod wit {
wasmtime::component::bindgen!({
Expand All @@ -20,6 +20,7 @@ mod router;
mod store;

// implementations
pub mod drivers;
pub mod kv;
pub mod ledgers;
pub mod submit;
Expand Down Expand Up @@ -53,6 +54,9 @@ pub enum Error {

#[error("ledger error: {0}")]
Ledger(String),

#[error("config error: {0}")]
Config(String),
}

impl From<wasmtime::Error> for Error {
Expand Down Expand Up @@ -159,8 +163,8 @@ impl Runtime {
&mut self,
id: &str,
wasm_path: impl AsRef<Path>,
config: serde_json::Value,
) -> wasmtime::Result<()> {
config: Option<serde_json::Value>,
) -> Result<(), Error> {
let component = wasmtime::component::Component::from_file(&self.engine, wasm_path)?;

let mut store = wasmtime::Store::new(
Expand Down
2 changes: 1 addition & 1 deletion balius-runtime/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn faucet_claim() {
});

runtime
.register_worker("faucet", "tests/faucet.wasm", config)
.register_worker("faucet", "tests/faucet.wasm", Some(config))
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion balius-runtime/tests/u5c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn faucet_claim() {
});

runtime
.register_worker("faucet", "tests/faucet.wasm", config)
.register_worker("faucet", "tests/faucet.wasm", Some(config))
.await
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions baliusd/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
baliusd.db
19 changes: 19 additions & 0 deletions baliusd/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "baliusd"
description = "A standalone Balius runtime that can be used as a daemon"
version = "0.1.0"
edition = "2021"

[dependencies]
balius-runtime = { version = "0.1.0", path = "../balius-runtime", features = ["utxorpc"] }
gasket = { version = "0.8.0", features = ["derive"] }
miette = { version = "7.2.0", features = ["fancy"] }
serde = { version = "1.0.213", features = ["derive"] }
serde_json = "1.0.132"
serde_with = "3.11.0"
tokio = { version = "1.41.0", features = ["rt-multi-thread", "signal"] }
tokio-macros = "2.4.0"
tokio-util = "0.7.12"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
config = { version = "0.13.3", default-features = false, features = ["toml", "json"] }
19 changes: 19 additions & 0 deletions baliusd/example/baliusd.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[rpc]
listen_address = "0.0.0.0:3000"

[logging]
max_level = "debug"
include_tokio = true

[ledger]
endpoint_url = "https://mainnet.utxorpc-v0.demeter.run"
api_key = "dmtr_utxorpc1wgnnj0qcfj32zxsz2uc8d4g7uclm2s2w"

[[workers]]
name = "faucet"
module = "faucet.wasm"

[workers.config.validator]
ref_txo = { transaction_id = "f7d3837715680f3a170e99cd202b726842d97f82c05af8fcd18053c64e33ec4f", index = 0 }
hash = "ef7a1cebb2dc7de884ddf82f8fcbc91fe9750dcd8c12ec7643a99bbe"
address = "addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x"
Binary file added baliusd/example/faucet.wasm
Binary file not shown.
124 changes: 124 additions & 0 deletions baliusd/src/boilerplate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use serde::de::DeserializeOwned;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use tracing_subscriber::{filter::Targets, prelude::*};

use crate::LoggingConfig;

pub fn setup_tracing(config: &LoggingConfig) -> miette::Result<()> {
let level = config.max_level;

let mut filter = Targets::new()
.with_target("baliusd", level)
.with_target("balius_runtime", level)
.with_target("gasket", level);

if config.include_tokio {
filter = filter
.with_target("tokio", level)
.with_target("runtime", level);
}

#[cfg(not(feature = "debug"))]
{
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(filter)
.init();
}

#[cfg(feature = "debug")]
{
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(console_subscriber::spawn())
.with(filter)
.init();
}

Ok(())
}

#[inline]
#[cfg(unix)]
async fn wait_for_exit_signal() {
let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();

tokio::select! {
_ = tokio::signal::ctrl_c() => {
warn!("SIGINT detected");
}
_ = sigterm.recv() => {
warn!("SIGTERM detected");
}
};
}

#[inline]
#[cfg(windows)]
async fn wait_for_exit_signal() {
tokio::signal::ctrl_c().await.unwrap()
}

pub fn hook_exit_token() -> CancellationToken {
let cancel = CancellationToken::new();

let cancel2 = cancel.clone();
tokio::spawn(async move {
wait_for_exit_signal().await;
debug!("notifying exit");
cancel2.cancel();
});

cancel
}

pub async fn run_pipeline(pipeline: gasket::daemon::Daemon, exit: CancellationToken) {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5000)) => {
if pipeline.should_stop() {
break;
}
}
_ = exit.cancelled() => {
debug!("exit requested");
break;
}
}
}

debug!("shutting down pipeline");
pipeline.teardown();
}

#[allow(dead_code)]
pub fn spawn_pipeline(pipeline: gasket::daemon::Daemon, exit: CancellationToken) -> JoinHandle<()> {
tokio::spawn(run_pipeline(pipeline, exit))
}

pub fn load_config<T>(explicit_file: &Option<std::path::PathBuf>) -> Result<T, config::ConfigError>
where
T: DeserializeOwned,
{
let mut s = config::Config::builder();

// our base config will always be in /etc/dolos
s = s.add_source(config::File::with_name("/etc/baliusd/daemon.toml").required(false));

// but we can override it by having a file in the working dir
s = s.add_source(config::File::with_name("baliusd.toml").required(false));

// if an explicit file was passed, then we load it as mandatory
if let Some(explicit) = explicit_file.as_ref().and_then(|x| x.to_str()) {
s = s.add_source(config::File::with_name(explicit).required(true));
}

// finally, we use env vars to make some last-step overrides
s = s.add_source(config::Environment::with_prefix("BALIUSD").separator("_"));

s.build()?.try_deserialize()
}
Loading

0 comments on commit dc7c9a9

Please sign in to comment.