Skip to content

Commit

Permalink
feat: introduce chain-sync driver (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Nov 6, 2024
1 parent 50389ad commit 4b9b2cf
Show file tree
Hide file tree
Showing 18 changed files with 416 additions and 60 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ members = [
# "examples/sundae-stop-loss-strategy",
# "examples/ticket-vending-machine",
"examples/minter/offchain",
"examples/wallet/offchain",
"balius-sdk",
"balius-runtime",
"balius-macros", "baliusd",
"balius-macros",
"baliusd",
]
70 changes: 70 additions & 0 deletions balius-runtime/src/drivers/chainsync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use pallas::codec::minicbor::decode::info;
use serde::{Deserialize, Serialize};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use utxorpc::CardanoSyncClient;

use crate::{ChainPoint, Error, Runtime};

impl From<ChainPoint> for utxorpc::spec::sync::BlockRef {
fn from(point: ChainPoint) -> Self {
utxorpc::spec::sync::BlockRef {
index: point.0,
hash: point.1.to_vec().into(),
}
}
}

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct Config {
pub endpoint_url: String,
pub api_key: String,
}

pub async fn run(config: Config, runtime: Runtime, cancel: CancellationToken) -> Result<(), Error> {
let mut sync = utxorpc::ClientBuilder::new()
.uri(&config.endpoint_url)?
.metadata("dmtr-api-key", config.api_key)?
.build::<CardanoSyncClient>()
.await;

let cursor = runtime
.chain_cursor()
.await?
.map(Into::into)
.into_iter()
.collect();

// TODO: handle disconnections and retry logic

let mut tip = sync.follow_tip(cursor).await?;

info!("starting follow-tip loop");

loop {
select! {
_ = cancel.cancelled() => {
warn!("chainsync driver cancelled");
break Ok(())
},
event = tip.event() => {
match event {
Ok(utxorpc::TipEvent::Apply(block)) => {
let block = pallas::ledger::traverse::MultiEraBlock::decode(&block.native).unwrap();
runtime.apply_block(&block).await?;
}
Ok(utxorpc::TipEvent::Undo(block)) => {
let block = pallas::ledger::traverse::MultiEraBlock::decode(&block.native).unwrap();
runtime.undo_block(&block).await?;
}
Ok(utxorpc::TipEvent::Reset(point)) => {
warn!(slot=point.index, "TODO: handle reset");
continue;
},
Err(_) => todo!(),
}
}
}
}
}
93 changes: 60 additions & 33 deletions balius-runtime/src/drivers/jsonrpc.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
//! Driver to serve JSON-RPC requests.
//!
//! This driver implements an HTTP server that listens for JSON-RPC requests
//! and funnels them into the Runtime. The path of the request is used as the
//! key to identify the worker that should handle the request. The JSON-RPC
//! method field is used as the key to identify the particular Balius request
//! for the worker. JSON-RPC params are mapped directly into Balius request
//! params.
//!
//! The JSON-RPC server is implemented as a Warp application and adheres to
//! the JSON-RPC 2.0 spec.

use serde::{Deserialize, Serialize};
use serde_json::json;
use std::net::SocketAddr;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use warp::Filter as _;

use crate::{Error, Runtime};
use crate::{wit, Error, Runtime};

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Config {
Expand Down Expand Up @@ -32,6 +45,51 @@ fn parse_request(body: serde_json::Value) -> Result<Request, ErrorResponse> {
}
}

pub async fn handle_request(
runtime: Runtime,
worker: String,
body: serde_json::Value,
) -> warp::reply::Json {
let request = match parse_request(body) {
Ok(x) => x,
Err(err) => return warp::reply::json(&err),
};

debug!(
worker,
id = request.id,
method = request.method,
"handling request"
);

let params = serde_json::to_vec(&request.params).unwrap();

let reply = runtime
.handle_request(&worker, &request.method, params)
.await;

match reply {
Ok(x) => {
debug!(worker, id = request.id, "request successful");

let x = match x {
wit::Response::Acknowledge => json!({}),
wit::Response::Json(x) => serde_json::from_slice(&x).unwrap(),
wit::Response::Cbor(x) => json!({ "cbor": x }),
wit::Response::PartialTx(x) => json!({ "tx": x }),
};

warp::reply::json(&x)
}
Err(err) => {
error!(worker, id = request.id, "request failed");
warp::reply::json(&ErrorResponse {
error: err.to_string(),
})
}
}
}

pub async fn serve(
config: Config,
runtime: Runtime,
Expand All @@ -42,38 +100,7 @@ pub async fn serve(
.and(warp::path::param())
.and(warp::post())
.and(warp::body::json())
.then(
|runtime: Runtime, worker: String, body: serde_json::Value| async move {
let request = match parse_request(body) {
Ok(x) => x,
Err(err) => return warp::reply::json(&err),
};

debug!(
worker,
id = request.id,
method = request.method,
"handling request"
);

let reply = runtime
.handle_request(&worker, &request.method, request.params)
.await;

match reply {
Ok(x) => {
debug!(worker, id = request.id, "request successful");
warp::reply::json(&x)
}
Err(err) => {
error!(worker, id = request.id, "request failed");
warp::reply::json(&ErrorResponse {
error: err.to_string(),
})
}
}
},
);
.then(handle_request);

let address: SocketAddr = config
.listen_address
Expand Down
10 changes: 10 additions & 0 deletions balius-runtime/src/drivers/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
//! Control-flow components on top of the Runtime.
//!
//! Drivers are responsible for implementing the control-flows that
//! handle external interactions (e.g. handling requests, syncing from the
//! blockchain, etc) and funnels them into the Runtime.
//!
//! Each of these drivers has a way to trigger a forever-loop that should be
//! spawn as an independent tokio task running on the background.

pub mod chainsync;
pub mod jsonrpc;
47 changes: 34 additions & 13 deletions balius-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use std::{
path::Path,
sync::Arc,
};
use store::AtomicUpdate;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use utxorpc::ChainBlock;

mod wit {
wasmtime::component::bindgen!({
Expand All @@ -29,6 +32,8 @@ pub mod submit;
pub use store::Store;

pub type WorkerId = String;
// pub type Block = utxorpc::ChainBlock<utxorpc::spec::cardano::Block>;
pub type Block<'a> = pallas::ledger::traverse::MultiEraBlock<'a>;

#[derive(Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -138,6 +143,7 @@ impl wit::balius::app::driver::Host for WorkerState {
struct LoadedWorker {
wasm_store: wasmtime::Store<WorkerState>,
instance: wit::Worker,
cursor: Option<LogSeq>,
}

impl LoadedWorker {
Expand Down Expand Up @@ -171,11 +177,7 @@ impl LoadedWorker {
Ok(())
}

async fn apply_block(
&mut self,
block: &MultiEraBlock<'_>,
log_seq: LogSeq,
) -> Result<(), Error> {
async fn apply_block(&mut self, block: &Block<'_>) -> Result<(), Error> {
for tx in block.txs() {
for (_, utxo) in tx.produces() {
let event = wit::Event::Utxo(utxo.encode());
Expand Down Expand Up @@ -210,9 +212,22 @@ impl Runtime {
RuntimeBuilder::new(store)
}

pub fn chain_cursor(&self) -> Result<Option<ChainPoint>, Error> {
// TODO: iterate over all workers and find the lowest cursor
todo!()
pub async fn chain_cursor(&self) -> Result<Option<ChainPoint>, Error> {
let lowest_seq = self
.loaded
.lock()
.await
.values()
.map(|w| w.cursor)
.flatten()
.min();

if let Some(seq) = lowest_seq {
//TODO: map seq to chain point by searching the wal
warn!(seq, "TODO: map seq to chain point by searching the wal");
}

Ok(None)
}

pub async fn register_worker(
Expand Down Expand Up @@ -240,27 +255,33 @@ impl Runtime {
let config = serde_json::to_vec(&config).unwrap();
instance.call_init(&mut wasm_store, &config).await?;

let cursor = self.store.get_worker_cursor(id)?;
debug!(cursor, id, "found cursor for worker");

self.loaded.lock().await.insert(
id.to_owned(),
LoadedWorker {
wasm_store,
instance,
cursor,
},
);

Ok(())
}

pub async fn apply_block(&self, block: &MultiEraBlock<'_>) -> Result<(), Error> {
pub async fn apply_block(&self, block: &Block<'_>) -> Result<(), Error> {
info!(slot = block.slot(), "applying block");

let log_seq = self.store.write_ahead(block)?;

let mut lock = self.loaded.lock().await;

let mut atomic_update = self.store.start_atomic_update()?;
let mut atomic_update = self.store.start_atomic_update(log_seq)?;

for (_, worker) in lock.iter_mut() {
worker.apply_block(block, log_seq).await?;
atomic_update.set_worker_cursor(&worker.wasm_store.data().worker_id, log_seq)?;
worker.apply_block(block).await?;
atomic_update.update_worker_cursor(&worker.wasm_store.data().worker_id)?;
}

atomic_update.commit()?;
Expand All @@ -269,7 +290,7 @@ impl Runtime {
}

// TODO: implement undo once we have "apply" working
pub async fn undo_block(&self, block: &MultiEraBlock<'_>) -> Result<(), Error> {
pub async fn undo_block(&self, block: &Block<'_>) -> Result<(), Error> {
Ok(())
}

Expand Down
9 changes: 7 additions & 2 deletions balius-runtime/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use pallas::ledger::traverse::MultiEraOutput;

use crate::wit::balius::app::driver::{Event, EventPattern};
use crate::wit::balius::app::driver::{Event, EventPattern, UtxoPattern};

type WorkerId = String;
type ChannelId = u32;
Expand All @@ -15,13 +15,18 @@ type AddressBytes = Vec<u8>;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
enum MatchKey {
RequestMethod(Method),
EveryUtxo,
UtxoAddress(AddressBytes),
}

fn infer_match_keys(pattern: &EventPattern) -> Vec<MatchKey> {
match pattern {
EventPattern::Request(x) => vec![MatchKey::RequestMethod(x.to_owned())],
EventPattern::Utxo(_) => todo!(),
EventPattern::Utxo(UtxoPattern { address, token }) => match (address, token) {
(None, None) => vec![MatchKey::EveryUtxo],
(Some(address), None) => vec![MatchKey::UtxoAddress(address.to_vec())],
_ => todo!(),
},
EventPattern::UtxoUndo(_) => todo!(),
EventPattern::Timer(_) => todo!(),
EventPattern::Message(_) => todo!(),
Expand Down
Loading

0 comments on commit 4b9b2cf

Please sign in to comment.