Skip to content

Commit

Permalink
feat: implement WAL persistence (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Nov 11, 2024
1 parent 4b9b2cf commit 31ddcbb
Show file tree
Hide file tree
Showing 14 changed files with 686 additions and 162 deletions.
347 changes: 311 additions & 36 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion balius-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ tracing = "0.1.40"
hex = "0.4.3"
itertools = "0.13.0"
async-trait = "0.1.83"
utxorpc = { version = "0.7.1", optional = true }
utxorpc = { version = "0.8.0" }
# utxorpc = { path = "../../../utxorpc/rust-sdk" }
tokio-util = "0.7.12"
prost = "0.13"

[dev-dependencies]
tokio = "1.40.0"
85 changes: 59 additions & 26 deletions balius-runtime/src/drivers/chainsync.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use pallas::codec::minicbor::decode::info;
use serde::{Deserialize, Serialize};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use utxorpc::CardanoSyncClient;

use crate::{ChainPoint, Error, Runtime};
use crate::{Block, ChainPoint, Error, Runtime};

impl From<ChainPoint> for utxorpc::spec::sync::BlockRef {
fn from(point: ChainPoint) -> Self {
utxorpc::spec::sync::BlockRef {
index: point.0,
hash: point.1.to_vec().into(),
match point {
ChainPoint::Cardano(x) => x.clone(),
_ => todo!(),
}
}
}
Expand All @@ -22,10 +21,41 @@ pub struct Config {
pub api_key: String,
}

pub async fn run(config: Config, runtime: Runtime, cancel: CancellationToken) -> Result<(), Error> {
pub type UndoBlocks = Vec<Block>;
pub type NextBlock = Block;

/// Gather undo blocks from the tip until the next block is encountered.
async fn gather_blocks(
tip: &mut utxorpc::LiveTip<utxorpc::Cardano>,
) -> Result<(NextBlock, UndoBlocks), Error> {
let mut undos = vec![];

loop {
let event = tip.event().await?;

match event {
utxorpc::TipEvent::Apply(chain_block) => {
let next = Block::Cardano(chain_block.parsed.unwrap());
break Ok((next, undos));
}
utxorpc::TipEvent::Undo(chain_block) => {
undos.push(Block::Cardano(chain_block.parsed.unwrap()));
}
utxorpc::TipEvent::Reset(_) => unreachable!(),
}
}
}

pub async fn run(
config: Config,
mut runtime: Runtime,
cancel: CancellationToken,
) -> Result<(), Error> {
let mut sync = utxorpc::ClientBuilder::new()
.uri(&config.endpoint_url)?
.metadata("dmtr-api-key", config.api_key)?
.uri(&config.endpoint_url)
.map_err(|e| Error::Driver(e.to_string()))?
.metadata("dmtr-api-key", config.api_key)
.map_err(|e| Error::Driver(e.to_string()))?
.build::<CardanoSyncClient>()
.await;

Expand All @@ -36,34 +66,37 @@ pub async fn run(config: Config, runtime: Runtime, cancel: CancellationToken) ->
.into_iter()
.collect();

info!(cursor = ?cursor, "found runtime cursor");

// TODO: handle disconnections and retry logic

let mut tip = sync.follow_tip(cursor).await?;
let mut tip = sync
.follow_tip(cursor)
.await
.map_err(|e| Error::Driver(e.to_string()))?;

// confirm first event is a reset to the requested chain point
match tip.event().await? {
utxorpc::TipEvent::Reset(point) => {
warn!(
slot = point.index,
"TODO: check that reset is to the requested chain point"
);
}
_ => return Err(Error::Driver("unexpected event".to_string())),
}

info!("starting follow-tip loop");

loop {
select! {
_ = cancel.cancelled() => {
warn!("chainsync driver cancelled");
warn!("chain-sync driver cancelled");
break Ok(())
},
event = tip.event() => {
match event {
Ok(utxorpc::TipEvent::Apply(block)) => {
let block = pallas::ledger::traverse::MultiEraBlock::decode(&block.native).unwrap();
runtime.apply_block(&block).await?;
}
Ok(utxorpc::TipEvent::Undo(block)) => {
let block = pallas::ledger::traverse::MultiEraBlock::decode(&block.native).unwrap();
runtime.undo_block(&block).await?;
}
Ok(utxorpc::TipEvent::Reset(point)) => {
warn!(slot=point.index, "TODO: handle reset");
continue;
},
Err(_) => todo!(),
}
batch = gather_blocks(&mut tip) => {
let (next, undos) = batch?;
runtime.handle_chain(&undos, &next).await?;
}
}
}
Expand Down
9 changes: 0 additions & 9 deletions balius-runtime/src/ledgers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use crate::wit::balius::app::ledger as wit;

pub mod mock;

#[cfg(feature = "utxorpc")]
pub mod u5c;

#[derive(Clone)]
pub enum Ledger {
Mock(mock::Ledger),

#[cfg(feature = "utxorpc")]
U5C(u5c::Ledger),
}

Expand All @@ -19,7 +15,6 @@ impl From<mock::Ledger> for Ledger {
}
}

#[cfg(feature = "utxorpc")]
impl From<u5c::Ledger> for Ledger {
fn from(ledger: u5c::Ledger) -> Self {
Ledger::U5C(ledger)
Expand All @@ -34,8 +29,6 @@ impl wit::Host for Ledger {
) -> Result<Vec<wit::Utxo>, wit::LedgerError> {
match self {
Ledger::Mock(ledger) => ledger.read_utxos(refs).await,

#[cfg(feature = "utxorpc")]
Ledger::U5C(ledger) => ledger.read_utxos(refs).await,
}
}
Expand All @@ -48,8 +41,6 @@ impl wit::Host for Ledger {
) -> Result<wit::UtxoPage, wit::LedgerError> {
match self {
Ledger::Mock(ledger) => ledger.search_utxos(pattern, start, max_items).await,

#[cfg(feature = "utxorpc")]
Ledger::U5C(ledger) => ledger.search_utxos(pattern, start, max_items).await,
}
}
Expand Down
Loading

0 comments on commit 31ddcbb

Please sign in to comment.