diff --git a/Cargo.lock b/Cargo.lock index 7a35237..4e872be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -156,6 +156,12 @@ dependencies = [ "syn 2.0.85", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.3.0" @@ -169,13 +175,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.30", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 0.1.2", + "tower 0.4.13", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", "itoa", "matchit", "memchr", @@ -184,8 +217,8 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "sync_wrapper", - "tower", + "sync_wrapper 1.0.1", + "tower 0.5.1", "tower-layer", "tower-service", ] @@ -200,13 +233,33 @@ dependencies = [ "bytes", "futures-util", "http 0.2.12", - "http-body", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -256,6 +309,7 @@ dependencies = [ "hex", "itertools 0.13.0", "pallas", + "prost 0.13.3", "redb", "serde", "serde_json", @@ -280,10 +334,12 @@ dependencies = [ "pallas-crypto 0.30.2 (git+https://github.com/txpipe/pallas.git)", "pallas-primitives 0.30.2 (git+https://github.com/txpipe/pallas.git)", "pallas-traverse 0.30.2 (git+https://github.com/txpipe/pallas.git)", + "prost 0.13.3", "serde", "serde_json", "serde_with", "thiserror", + "utxorpc-spec 0.12.0", "wit-bindgen", ] @@ -1141,6 +1197,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.6.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "1.8.3" @@ -1262,6 +1337,29 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.9.5" @@ -1284,9 +1382,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1298,18 +1396,71 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-timeout" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.30", "pin-project-lite", "tokio", "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.5.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.5.0", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.61" @@ -2030,7 +2181,7 @@ dependencies = [ "pallas-primitives 0.30.2 (registry+https://github.com/rust-lang/crates.io-index)", "pallas-traverse 0.30.2 (registry+https://github.com/rust-lang/crates.io-index)", "prost-types 0.13.3", - "utxorpc-spec", + "utxorpc-spec 0.10.0", ] [[package]] @@ -2070,6 +2221,16 @@ dependencies = [ "serde", ] +[[package]] +name = "pbjson" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7e6349fa080353f4a597daffd05cb81572a9c031a6d4fff7e504947496fcc68" +dependencies = [ + "base64 0.21.7", + "serde", +] + [[package]] name = "pbjson-build" version = "0.6.2" @@ -2082,6 +2243,18 @@ dependencies = [ "prost-types 0.12.6", ] +[[package]] +name = "pbjson-build" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eea3058763d6e656105d1403cb04e0a41b7bbac6362d413e7c33be0c32279c9" +dependencies = [ + "heck 0.5.0", + "itertools 0.13.0", + "prost 0.13.3", + "prost-types 0.13.3", +] + [[package]] name = "pbjson-types" version = "0.6.0" @@ -2090,10 +2263,25 @@ checksum = "18f596653ba4ac51bdecbb4ef6773bc7f56042dc13927910de1684ad3d32aa12" dependencies = [ "bytes", "chrono", - "pbjson", - "pbjson-build", + "pbjson 0.6.0", + "pbjson-build 0.6.2", "prost 0.12.6", - "prost-build", + "prost-build 0.12.6", + "serde", +] + +[[package]] +name = "pbjson-types" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e54e5e7bfb1652f95bc361d76f3c780d8e526b134b85417e774166ee941f0887" +dependencies = [ + "bytes", + "chrono", + "pbjson 0.7.0", + "pbjson-build 0.7.0", + "prost 0.13.3", + "prost-build 0.13.3", "serde", ] @@ -2238,6 +2426,27 @@ dependencies = [ "tempfile", ] +[[package]] +name = "prost-build" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.13.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost 0.13.3", + "prost-types 0.13.3", + "regex", + "syn 2.0.85", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.12.6" @@ -2472,11 +2681,12 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.4" +version = "0.23.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" dependencies = [ "log", + "once_cell", "ring", "rustls-pki-types", "rustls-webpki", @@ -2486,9 +2696,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.7.3" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" dependencies = [ "openssl-probe", "rustls-pemfile", @@ -2862,6 +3072,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "tar" version = "0.4.42" @@ -3000,9 +3216,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.41.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", @@ -3038,9 +3254,9 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.25.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ "rustls", "rustls-pki-types", @@ -3134,24 +3350,53 @@ checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes", - "h2", + "h2 0.3.26", "http 0.2.12", - "http-body", - "hyper", - "hyper-timeout", + "http-body 0.4.6", + "hyper 0.14.30", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project", "prost 0.12.6", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.7", + "base64 0.22.1", + "bytes", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.0", + "hyper-timeout 0.5.2", + "hyper-util", + "percent-encoding", + "pin-project", + "prost 0.13.3", "rustls-native-certs", "rustls-pemfile", - "rustls-pki-types", + "socket2", "tokio", "tokio-rustls", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -3177,6 +3422,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -3357,15 +3616,15 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "utxorpc" -version = "0.7.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8e5451435987e21c49b423626dfa2a84f8b2d30532e07a7fb8383186523fc0c" +checksum = "d1b94946745af6df920d4dc6e14f2cc73417520c9a7823133e5436660ee3fac9" dependencies = [ "bytes", "thiserror", "tokio", - "tonic", - "utxorpc-spec", + "tonic 0.12.3", + "utxorpc-spec 0.12.0", ] [[package]] @@ -3376,11 +3635,27 @@ checksum = "343c17df63049cee293d3262cb741d1c97dc0c73f080336132b16d48f264b885" dependencies = [ "bytes", "futures-core", - "pbjson", - "pbjson-types", + "pbjson 0.6.0", + "pbjson-types 0.6.0", "prost 0.12.6", "serde", - "tonic", + "tonic 0.11.0", +] + +[[package]] +name = "utxorpc-spec" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f5bb265be0e071adf7675ac8003a1c94772516a7a62d4fb1005f61ee288f3d3" +dependencies = [ + "bytes", + "futures-core", + "pbjson 0.7.0", + "pbjson-types 0.7.0", + "prost 0.13.3", + "prost-types 0.13.3", + "serde", + "tonic 0.12.3", ] [[package]] @@ -3447,7 +3722,7 @@ dependencies = [ "futures-util", "headers", "http 0.2.12", - "hyper", + "hyper 0.14.30", "log", "mime", "mime_guess", diff --git a/balius-runtime/Cargo.toml b/balius-runtime/Cargo.toml index 2686b38..fbe68e4 100644 --- a/balius-runtime/Cargo.toml +++ b/balius-runtime/Cargo.toml @@ -18,8 +18,10 @@ tracing = "0.1.40" hex = "0.4.3" itertools = "0.13.0" async-trait = "0.1.83" -utxorpc = { version = "0.7.1", optional = true } +utxorpc = { version = "0.8.0" } +# utxorpc = { path = "../../../utxorpc/rust-sdk" } tokio-util = "0.7.12" +prost = "0.13" [dev-dependencies] tokio = "1.40.0" diff --git a/balius-runtime/src/drivers/chainsync.rs b/balius-runtime/src/drivers/chainsync.rs index aee5737..29197ce 100644 --- a/balius-runtime/src/drivers/chainsync.rs +++ b/balius-runtime/src/drivers/chainsync.rs @@ -1,17 +1,16 @@ -use pallas::codec::minicbor::decode::info; use serde::{Deserialize, Serialize}; use tokio::select; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; use utxorpc::CardanoSyncClient; -use crate::{ChainPoint, Error, Runtime}; +use crate::{Block, ChainPoint, Error, Runtime}; impl From for utxorpc::spec::sync::BlockRef { fn from(point: ChainPoint) -> Self { - utxorpc::spec::sync::BlockRef { - index: point.0, - hash: point.1.to_vec().into(), + match point { + ChainPoint::Cardano(x) => x.clone(), + _ => todo!(), } } } @@ -22,10 +21,41 @@ pub struct Config { pub api_key: String, } -pub async fn run(config: Config, runtime: Runtime, cancel: CancellationToken) -> Result<(), Error> { +pub type UndoBlocks = Vec; +pub type NextBlock = Block; + +/// Gather undo blocks from the tip until the next block is encountered. +async fn gather_blocks( + tip: &mut utxorpc::LiveTip, +) -> Result<(NextBlock, UndoBlocks), Error> { + let mut undos = vec![]; + + loop { + let event = tip.event().await?; + + match event { + utxorpc::TipEvent::Apply(chain_block) => { + let next = Block::Cardano(chain_block.parsed.unwrap()); + break Ok((next, undos)); + } + utxorpc::TipEvent::Undo(chain_block) => { + undos.push(Block::Cardano(chain_block.parsed.unwrap())); + } + utxorpc::TipEvent::Reset(_) => unreachable!(), + } + } +} + +pub async fn run( + config: Config, + mut runtime: Runtime, + cancel: CancellationToken, +) -> Result<(), Error> { let mut sync = utxorpc::ClientBuilder::new() - .uri(&config.endpoint_url)? - .metadata("dmtr-api-key", config.api_key)? + .uri(&config.endpoint_url) + .map_err(|e| Error::Driver(e.to_string()))? + .metadata("dmtr-api-key", config.api_key) + .map_err(|e| Error::Driver(e.to_string()))? .build::() .await; @@ -36,34 +66,37 @@ pub async fn run(config: Config, runtime: Runtime, cancel: CancellationToken) -> .into_iter() .collect(); + info!(cursor = ?cursor, "found runtime cursor"); + // TODO: handle disconnections and retry logic - let mut tip = sync.follow_tip(cursor).await?; + let mut tip = sync + .follow_tip(cursor) + .await + .map_err(|e| Error::Driver(e.to_string()))?; + + // confirm first event is a reset to the requested chain point + match tip.event().await? { + utxorpc::TipEvent::Reset(point) => { + warn!( + slot = point.index, + "TODO: check that reset is to the requested chain point" + ); + } + _ => return Err(Error::Driver("unexpected event".to_string())), + } info!("starting follow-tip loop"); loop { select! { _ = cancel.cancelled() => { - warn!("chainsync driver cancelled"); + warn!("chain-sync driver cancelled"); break Ok(()) }, - event = tip.event() => { - match event { - Ok(utxorpc::TipEvent::Apply(block)) => { - let block = pallas::ledger::traverse::MultiEraBlock::decode(&block.native).unwrap(); - runtime.apply_block(&block).await?; - } - Ok(utxorpc::TipEvent::Undo(block)) => { - let block = pallas::ledger::traverse::MultiEraBlock::decode(&block.native).unwrap(); - runtime.undo_block(&block).await?; - } - Ok(utxorpc::TipEvent::Reset(point)) => { - warn!(slot=point.index, "TODO: handle reset"); - continue; - }, - Err(_) => todo!(), - } + batch = gather_blocks(&mut tip) => { + let (next, undos) = batch?; + runtime.handle_chain(&undos, &next).await?; } } } diff --git a/balius-runtime/src/ledgers/mod.rs b/balius-runtime/src/ledgers/mod.rs index 9bce499..5eb3855 100644 --- a/balius-runtime/src/ledgers/mod.rs +++ b/balius-runtime/src/ledgers/mod.rs @@ -1,15 +1,11 @@ use crate::wit::balius::app::ledger as wit; pub mod mock; - -#[cfg(feature = "utxorpc")] pub mod u5c; #[derive(Clone)] pub enum Ledger { Mock(mock::Ledger), - - #[cfg(feature = "utxorpc")] U5C(u5c::Ledger), } @@ -19,7 +15,6 @@ impl From for Ledger { } } -#[cfg(feature = "utxorpc")] impl From for Ledger { fn from(ledger: u5c::Ledger) -> Self { Ledger::U5C(ledger) @@ -34,8 +29,6 @@ impl wit::Host for Ledger { ) -> Result, wit::LedgerError> { match self { Ledger::Mock(ledger) => ledger.read_utxos(refs).await, - - #[cfg(feature = "utxorpc")] Ledger::U5C(ledger) => ledger.read_utxos(refs).await, } } @@ -48,8 +41,6 @@ impl wit::Host for Ledger { ) -> Result { match self { Ledger::Mock(ledger) => ledger.search_utxos(pattern, start, max_items).await, - - #[cfg(feature = "utxorpc")] Ledger::U5C(ledger) => ledger.search_utxos(pattern, start, max_items).await, } } diff --git a/balius-runtime/src/lib.rs b/balius-runtime/src/lib.rs index 378cb9b..3d43009 100644 --- a/balius-runtime/src/lib.rs +++ b/balius-runtime/src/lib.rs @@ -1,16 +1,9 @@ -use pallas::ledger::traverse::MultiEraBlock; use router::Router; -use serde_json::json; -use std::{ - collections::{HashMap, HashSet}, - path::Path, - sync::Arc, -}; -use store::AtomicUpdate; +use std::{collections::HashMap, path::Path, sync::Arc}; use thiserror::Error; use tokio::sync::Mutex; use tracing::{debug, info, warn}; -use utxorpc::ChainBlock; +use utxorpc::spec::sync::BlockRef; mod wit { wasmtime::component::bindgen!({ @@ -32,8 +25,6 @@ pub mod submit; pub use store::Store; pub type WorkerId = String; -// pub type Block = utxorpc::ChainBlock; -pub type Block<'a> = pallas::ledger::traverse::MultiEraBlock<'a>; #[derive(Error, Debug)] pub enum Error { @@ -63,6 +54,9 @@ pub enum Error { #[error("config error: {0}")] Config(String), + + #[error("driver error: {0}")] + Driver(String), } impl From for Error { @@ -116,11 +110,83 @@ impl From for Error { pub type BlockSlot = u64; pub type BlockHash = pallas::crypto::hash::Hash<32>; -#[derive(Debug, Eq, PartialEq, Hash)] -pub struct ChainPoint(pub BlockSlot, pub BlockHash); +pub enum ChainPoint { + Cardano(utxorpc::spec::sync::BlockRef), +} pub type LogSeq = u64; +pub enum Utxo { + Cardano(utxorpc::spec::cardano::TxOutput), +} + +impl Utxo { + pub fn to_bytes(&self) -> Vec { + use prost::Message; + + match self { + Self::Cardano(utxo) => utxo.encode_to_vec(), + } + } +} + +pub enum Tx { + Cardano(utxorpc::spec::cardano::Tx), +} + +impl Tx { + pub fn outputs(&self) -> Vec { + match self { + Self::Cardano(tx) => tx + .outputs + .iter() + .map(|o| Utxo::Cardano(o.clone())) + .collect(), + } + } +} + +#[derive(Debug, Clone)] +pub enum Block { + Cardano(utxorpc::spec::cardano::Block), +} + +impl Block { + pub fn txs(&self) -> Vec { + match self { + Self::Cardano(block) => block + .body + .iter() + .flat_map(|b| b.tx.iter()) + .map(|t| Tx::Cardano(t.clone())) + .collect(), + } + } + + pub fn chain_point(&self) -> ChainPoint { + match self { + Self::Cardano(block) => ChainPoint::Cardano(BlockRef { + index: block.header.as_ref().unwrap().slot, + hash: block.header.as_ref().unwrap().hash.clone(), + }), + } + } + + pub fn to_bytes(&self) -> Vec { + use prost::Message; + + match self { + Self::Cardano(block) => block.encode_to_vec(), + } + } + + pub fn from_bytes(data: &[u8]) -> Self { + use prost::Message; + + Self::Cardano(utxorpc::spec::cardano::Block::decode(data).unwrap()) + } +} + struct WorkerState { pub worker_id: String, pub router: router::Router, @@ -177,12 +243,29 @@ impl LoadedWorker { Ok(()) } - async fn apply_block(&mut self, block: &Block<'_>) -> Result<(), Error> { + async fn apply_block(&mut self, block: &Block) -> Result<(), Error> { + for tx in block.txs() { + for utxo in tx.outputs() { + let channels = self.wasm_store.data().router.find_utxo_targets(&utxo)?; + + let event = wit::Event::Utxo(utxo.to_bytes()); + + for channel in channels { + self.acknowledge_event(channel, &event).await?; + } + } + } + + Ok(()) + } + + async fn undo_block(&mut self, block: &Block) -> Result<(), Error> { for tx in block.txs() { - for (_, utxo) in tx.produces() { - let event = wit::Event::Utxo(utxo.encode()); + for utxo in tx.outputs() { let channels = self.wasm_store.data().router.find_utxo_targets(&utxo)?; + let event = wit::Event::UtxoUndo(utxo.to_bytes()); + for channel in channels { self.acknowledge_event(channel, &event).await?; } @@ -191,6 +274,18 @@ impl LoadedWorker { Ok(()) } + + async fn apply_chain( + &mut self, + undo_blocks: &Vec, + next_block: &Block, + ) -> Result<(), Error> { + for block in undo_blocks { + self.undo_block(block).await?; + } + + self.apply_block(next_block).await + } } type WorkerMap = HashMap; @@ -223,8 +318,8 @@ impl Runtime { .min(); if let Some(seq) = lowest_seq { - //TODO: map seq to chain point by searching the wal - warn!(seq, "TODO: map seq to chain point by searching the wal"); + debug!(lowest_seq, "found lowest seq"); + return self.store.find_chain_point(seq); } Ok(None) @@ -270,27 +365,26 @@ impl Runtime { Ok(()) } - pub async fn apply_block(&self, block: &Block<'_>) -> Result<(), Error> { - info!(slot = block.slot(), "applying block"); + pub async fn handle_chain( + &mut self, + undo_blocks: &Vec, + next_block: &Block, + ) -> Result<(), Error> { + info!("applying block"); - let log_seq = self.store.write_ahead(block)?; + let log_seq = self.store.write_ahead(undo_blocks, next_block)?; - let mut lock = self.loaded.lock().await; + let mut workers = self.loaded.lock().await; - let mut atomic_update = self.store.start_atomic_update(log_seq)?; + let mut store_update = self.store.start_atomic_update(log_seq)?; - for (_, worker) in lock.iter_mut() { - worker.apply_block(block).await?; - atomic_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?; + for (_, worker) in workers.iter_mut() { + worker.apply_chain(undo_blocks, next_block).await?; + store_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?; } - atomic_update.commit()?; - - Ok(()) - } + store_update.commit()?; - // TODO: implement undo once we have "apply" working - pub async fn undo_block(&self, block: &Block<'_>) -> Result<(), Error> { Ok(()) } diff --git a/balius-runtime/src/router.rs b/balius-runtime/src/router.rs index 4a935de..9f368e9 100644 --- a/balius-runtime/src/router.rs +++ b/balius-runtime/src/router.rs @@ -1,11 +1,9 @@ -use std::{ - collections::{HashMap, HashSet}, - sync::{Arc, RwLock}, -}; - -use pallas::ledger::traverse::MultiEraOutput; +use std::collections::{HashMap, HashSet}; -use crate::wit::balius::app::driver::{Event, EventPattern, UtxoPattern}; +use crate::{ + wit::balius::app::driver::{EventPattern, UtxoPattern}, + Utxo, +}; type WorkerId = String; type ChannelId = u32; @@ -55,11 +53,9 @@ impl Router { } } - pub fn find_utxo_targets( - &self, - utxo: &MultiEraOutput, - ) -> Result, super::Error> { - let key = MatchKey::UtxoAddress(utxo.address()?.to_vec()); + pub fn find_utxo_targets(&self, utxo: &Utxo) -> Result, super::Error> { + let key = MatchKey::EveryUtxo; + let targets: HashSet<_> = self .routes .get(&key) @@ -68,7 +64,7 @@ impl Router { .cloned() .collect(); - // TODO: match by policy / asset + // TODO: match by address / policy / asset Ok(targets) } diff --git a/balius-runtime/src/store.rs b/balius-runtime/src/store.rs index 40c547f..6850620 100644 --- a/balius-runtime/src/store.rs +++ b/balius-runtime/src/store.rs @@ -1,17 +1,59 @@ -use std::{path::Path, sync::Arc}; - use itertools::Itertools; +use prost::Message; use redb::{ReadableTable as _, TableDefinition, WriteTransaction}; +use std::{path::Path, sync::Arc}; use tracing::warn; -use crate::Error; +use crate::{Block, ChainPoint, Error}; pub type WorkerId = String; pub type LogSeq = u64; -// pub type Block = utxorpc::ChainBlock; -pub type Block<'a> = pallas::ledger::traverse::MultiEraBlock<'a>; + +#[derive(Message)] +pub struct LogEntry { + #[prost(bytes, tag = "1")] + pub next_block: Vec, + #[prost(bytes, repeated, tag = "2")] + pub undo_blocks: Vec>, +} + +impl redb::Value for LogEntry { + type SelfType<'a> + = LogEntry + where + Self: 'a; + + type AsBytes<'a> + = Vec + where + Self: 'a; + + fn fixed_width() -> Option { + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + prost::Message::decode(data).unwrap() + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + value.encode_to_vec() + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("LogEntry") + } +} const CURSORS: TableDefinition = TableDefinition::new("cursors"); +const WAL: TableDefinition = TableDefinition::new("wal"); const DEFAULT_CACHE_SIZE_MB: usize = 50; @@ -37,6 +79,7 @@ impl AtomicUpdate { #[derive(Clone)] pub struct Store { db: Arc, + log_seq: LogSeq, } impl Store { @@ -48,16 +91,63 @@ impl Store { .set_cache_size(1024 * 1024 * cache_size.unwrap_or(DEFAULT_CACHE_SIZE_MB)) .create(path)?; + let log_seq = Self::load_log_seq(&inner)?.unwrap_or_default(); + let out = Self { db: Arc::new(inner), + log_seq, }; Ok(out) } - pub fn write_ahead(&self, block: &Block<'_>) -> Result { - // TODO: write event to WAL table and return log sequence - Ok(0) + fn load_log_seq(db: &redb::Database) -> Result, Error> { + let rx = db.begin_read()?; + + match rx.open_table(WAL) { + Ok(table) => { + let last = table.last()?; + Ok(last.map(|(k, _)| k.value())) + } + Err(redb::TableError::TableDoesNotExist(_)) => Ok(None), + Err(e) => return Err(e.into()), + } + } + + fn get_entry(&self, seq: LogSeq) -> Result, Error> { + let rx = self.db.begin_read()?; + let table = rx.open_table(WAL)?; + let entry = table.get(seq)?; + Ok(entry.map(|x| x.value())) + } + + pub fn find_chain_point(&self, seq: LogSeq) -> Result, Error> { + let entry = self.get_entry(seq)?; + let block = Block::from_bytes(&entry.unwrap().next_block); + + Ok(Some(block.chain_point())) + } + + pub fn write_ahead( + &mut self, + undo_blocks: &Vec, + next_block: &Block, + ) -> Result { + self.log_seq += 1; + + let wx = self.db.begin_write()?; + { + wx.open_table(WAL)?.insert( + self.log_seq, + LogEntry { + next_block: next_block.to_bytes(), + undo_blocks: undo_blocks.iter().map(|x| x.to_bytes()).collect(), + }, + )?; + } + + wx.commit()?; + Ok(self.log_seq) } // TODO: see if loading in batch is worth it diff --git a/balius-sdk/Cargo.toml b/balius-sdk/Cargo.toml index cce843c..448de42 100644 --- a/balius-sdk/Cargo.toml +++ b/balius-sdk/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +utxorpc-spec = { version = "0.12.0", features = ["ledgers"], default-features = false } balius-macros = { version = "0.1.0", path = "../balius-macros" } hex = "0.4.3" pallas-addresses = { version = "0.30.2", git = "https://github.com/txpipe/pallas.git" } @@ -11,6 +12,7 @@ pallas-codec = { version = "0.30.2", git = "https://github.com/txpipe/pallas.git pallas-crypto = { version = "0.30.2", git = "https://github.com/txpipe/pallas.git" } pallas-primitives = { version = "0.30.2", git = "https://github.com/txpipe/pallas.git" } pallas-traverse = { version = "0.30.2", git = "https://github.com/txpipe/pallas.git" } +prost = "0.13.3" serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" serde_with = "3.9.0" diff --git a/balius-sdk/src/qol.rs b/balius-sdk/src/qol.rs index c495514..ab7f572 100644 --- a/balius-sdk/src/qol.rs +++ b/balius-sdk/src/qol.rs @@ -208,7 +208,7 @@ impl std::ops::Deref for Json { } pub struct Utxo { - pub utxo: pallas_traverse::MultiEraOutput<'static>, + pub utxo: utxorpc_spec::utxorpc::v1alpha::cardano::TxOutput, pub datum: Option, } @@ -216,16 +216,15 @@ impl TryFrom for Utxo { type Error = Error; fn try_from(value: wit::Event) -> Result { + use prost::Message; + let bytes = match value { wit::Event::Utxo(x) => x, - _ => return Err(Error::EventMismatch("utxo".to_owned())), + wit::Event::UtxoUndo(x) => x, + _ => return Err(Error::EventMismatch("utxo|utxoundo".to_owned())), }; - // TODO: remove this once we have a way to keep the bytes around - let bytes: &'static [u8] = bytes.leak(); - - let utxo = pallas_traverse::MultiEraOutput::decode(pallas_traverse::Era::Conway, bytes) - .map_err(|_| Self::Error::BadUtxo)?; + let utxo = Message::decode(bytes.as_slice()).map_err(|_| Self::Error::BadUtxo)?; Ok(Utxo { utxo, datum: None }) } diff --git a/baliusd/Cargo.toml b/baliusd/Cargo.toml index 0fc2287..ff6ed83 100644 --- a/baliusd/Cargo.toml +++ b/baliusd/Cargo.toml @@ -5,7 +5,7 @@ version = "0.1.0" edition = "2021" [dependencies] -balius-runtime = { version = "0.1.0", path = "../balius-runtime", features = ["utxorpc"] } +balius-runtime = { version = "0.1.0", path = "../balius-runtime" } gasket = { version = "0.8.0", features = ["derive"] } miette = { version = "7.2.0", features = ["fancy"] } serde = { version = "1.0.213", features = ["derive"] } diff --git a/baliusd/example/wallet.wasm b/baliusd/example/wallet.wasm index 3b99186..efa3ae5 100644 Binary files a/baliusd/example/wallet.wasm and b/baliusd/example/wallet.wasm differ diff --git a/baliusd/src/main.rs b/baliusd/src/main.rs index 51f3e04..203c6df 100644 --- a/baliusd/src/main.rs +++ b/baliusd/src/main.rs @@ -72,6 +72,7 @@ async fn main() -> miette::Result<()> { let mut runtime = Runtime::builder(store) .with_ledger(ledger.into()) + .with_kv(balius_runtime::kv::Kv::Mock) .build() .into_diagnostic() .context("setting up runtime")?; @@ -102,7 +103,10 @@ async fn main() -> miette::Result<()> { cancel.clone(), )); - let tasks = tokio::try_join!(jsonrpc_server, chainsync_driver); + let (jsonrpc, chainsync) = tokio::try_join!(jsonrpc_server, chainsync_driver).unwrap(); + + jsonrpc.unwrap(); + chainsync.unwrap(); Ok(()) } diff --git a/examples/wallet/offchain/Cargo.toml b/examples/wallet/offchain/Cargo.toml index cc0f951..81e344f 100644 --- a/examples/wallet/offchain/Cargo.toml +++ b/examples/wallet/offchain/Cargo.toml @@ -7,13 +7,13 @@ edition = "2021" balius-sdk = { path = "../../../balius-sdk" } serde = { version = "1.0.204", features = ["derive"] } serde_with = "3.9.0" +pallas-codec = "0.30.2" +hex = "0.4.3" [lib] crate-type = ["cdylib"] [dev-dependencies] -hex = "0.4.3" serde_json = "1.0.128" pallas-traverse = "0.30.2" -pallas-codec = "0.30.2" pallas-primitives = "0.30.2" diff --git a/examples/wallet/offchain/src/lib.rs b/examples/wallet/offchain/src/lib.rs index be6952d..664a05a 100644 --- a/examples/wallet/offchain/src/lib.rs +++ b/examples/wallet/offchain/src/lib.rs @@ -1,3 +1,5 @@ +use std::marker::PhantomData; + use balius_sdk::{Ack, WorkerResult}; use balius_sdk::{Config, FnHandler, Params, Utxo, Worker}; use serde::{Deserialize, Serialize}; @@ -12,7 +14,39 @@ struct BalanceRequest {} #[derive(Serialize, Deserialize, Clone)] struct Datum {} +struct KvTable { + namespace: String, + _value: PhantomData, +} + +impl KvTable +where + V: pallas_codec::minicbor::Encode<()>, +{ + fn new(namespace: String) -> Self { + Self { + namespace, + _value: PhantomData, + } + } + + fn set(&self, key: &str, value: V) -> Result<(), balius_sdk::Error> { + let key = format!("{}/{}", self.namespace, key); + let value = pallas_codec::minicbor::to_vec(&value).unwrap(); + balius_sdk::wit::balius::app::kv::set_value(&key, &value).unwrap(); + Ok(()) + } +} + +type BalanceTable = KvTable; + fn handle_utxo(config: Config, utxo: Utxo) -> WorkerResult { + let balances = BalanceTable::new("balances".to_string()); + + balances + .set(&hex::encode(&utxo.utxo.address), utxo.utxo.coin) + .unwrap(); + Ok(Ack) } @@ -35,30 +69,34 @@ mod tests { use std::{collections::HashMap, str::FromStr as _}; - #[test] - fn test_happy_path() { - let output = primitives::MintedTransactionOutput::PostAlonzo(primitives::MintedPostAlonzoTransactionOutput { - address: Address::from_bech32("addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x").unwrap().to_vec().into(), - value: primitives::Value::Coin(5_000_000), - datum_option: None, - script_ref: None, - }); - - let cbor = pallas_codec::minicbor::to_vec(&output).unwrap(); - - let test_utxos: HashMap<_, _> = vec![( - "f7d3837715680f3a170e99cd202b726842d97f82c05af8fcd18053c64e33ec4f#0" - .parse() - .unwrap(), - cbor, - )] - .into_iter() - .collect(); - - let config = WalletConfig { - address: "addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x".into(), - }; - - handle_utxo(config, utxo).unwrap(); - } + // #[test] + // fn test_happy_path() { + // let output = + // primitives::MintedTransactionOutput::PostAlonzo(primitives::MintedPostAlonzoTransactionOutput + // { address: + // Address::from_bech32(" + // addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x" + // ).unwrap().to_vec().into(), value: + // primitives::Value::Coin(5_000_000), datum_option: None, + // script_ref: None, + // }); + + // let cbor = pallas_codec::minicbor::to_vec(&output).unwrap(); + + // let test_utxos: HashMap<_, _> = vec![( + // "f7d3837715680f3a170e99cd202b726842d97f82c05af8fcd18053c64e33ec4f#0" + // .parse() + // .unwrap(), + // cbor, + // )] + // .into_iter() + // .collect(); + + // let config = WalletConfig { + // address: + // "addr1qx2fxv2umyhttkxyxp8x0dlpdt3k6cwng5pxj3jhsydzer3n0d3vllmyqwsx5wktcd8cc3sq835lu7drv2xwl2wywfgse35a3x" + // .into(), }; + + // handle_utxo(config, utxo).unwrap(); + // } }