Skip to content

Commit

Permalink
refactor(node): impl a custom builder for Node
Browse files Browse the repository at this point in the history
Implements NodeBuilder which provides more ergonomic usage for callers
than the derive_builder version, and also keeps Node clean without
a lot of derive_builder annotations.

builder.rs: new file added

node.rs:
 * remove derive_builder annotations
 * remove NodeBuilder impl
 * remove ParseError variant from BlockBodyError
 * add comment that Node should be built with NodeBuilder

util.rs
 * make EpochError pub. It can bereturned by NodeBuilder::build()

lib.rs:
 * add builder mod

tests, bench, kindelia/src/main:
 * adjust usage of NodeBuilder
  • Loading branch information
dan-da committed Dec 24, 2022
1 parent 0b1afb4 commit c72f842
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 153 deletions.
23 changes: 12 additions & 11 deletions kindelia/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ use kindelia_client::ApiClient;
use kindelia_common::{crypto, Name};
use kindelia_core::api::{Hash, HexStatement};
use kindelia_core::bits::ProtoSerialize;
use kindelia_core::builder::NodeBuilder;
use kindelia_core::config::{
ApiConfig, MineConfig, NodeConfig, UiConfig, WsConfig,
};
use kindelia_core::net::{Address, ProtoComm};
use kindelia_core::node::{
spawn_miner, NodeBuilder, Transaction, MAX_TRANSACTION_SIZE,
};
use kindelia_core::node::{spawn_miner, Transaction, MAX_TRANSACTION_SIZE};
use kindelia_core::persistence::{
get_ordered_blocks_path, SimpleFileStorage, BLOCKS_DIR,
};
Expand Down Expand Up @@ -836,19 +835,21 @@ pub fn start_node<C: ProtoComm + 'static>(

// Node state object
let genesis_code = include_str!("../../kindelia_core/genesis.kdl");
let (node_query_sender, node) = NodeBuilder::default()
let builder = NodeBuilder::default()
.network_id(node_config.network_id)
.comm(comm)
.miner_comm(miner_comm)
.addr(addr)
.storage(file_writter)
.genesis_code(node_config.data_path, genesis_code)?
.build(
&initial_peers,
#[cfg(feature = "events")]
Some(event_tx),
)?;

.genesis_code(genesis_code.to_string())
.data_path(node_config.data_path)
.peers(initial_peers);

#[cfg(feature = "events")]
let builder = builder.event_emitter(event_tx);

let (node_query_sender, node) = builder.build()?;

// WebSocket API router
let ws_router = kindelia_ws::ws_router::<
events::NodeEventType,
Expand Down
15 changes: 5 additions & 10 deletions kindelia_core/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use kindelia_common::crypto::Keccakable;
use kindelia_core::persistence;
use kindelia_core::persistence::BlockStorage;
use kindelia_core::builder::NodeBuilder;
use kindelia_lang::parser;
use primitive_types::U256;

Expand Down Expand Up @@ -155,19 +156,13 @@ fn block_loading(c: &mut Criterion) {

// create Node
let genesis_code = include_str!("../genesis.kdl");
let (_query_sender, mut node) = node::NodeBuilder::default()
// .network_id(node_config.network_id)
let (_query_sender, mut node) = NodeBuilder::default()
.comm(comm)
// .miner_comm(miner_comm)
.addr(addr)
.storage(storage)
.genesis_code(dir.clone(), genesis_code)
.unwrap()
.build(
&[],
#[cfg(feature = "events")]
None,
)
.genesis_code(genesis_code.to_string())
.data_path(dir.clone())
.build()
.unwrap();

// benchmark block loading
Expand Down
221 changes: 221 additions & 0 deletions kindelia_core/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
use std::path::PathBuf;
use std::sync::mpsc;

use priority_queue::PriorityQueue;
use thiserror::Error;

use kindelia_common::crypto::Keccakable;
use kindelia_lang::parser;

use crate::api::NodeRequest;
use crate::net::ProtoComm;
use crate::node::{
build_genesis_block, initial_target, BlockBodyError, MinerCommunication,
Node, Peer, PeersStore,
};
use crate::persistence::BlockStorage;
use crate::runtime::*;
use crate::util::*;

use crate::events::NodeEventEmittedInfo;

/// Errors associated with NodeBuilder
#[derive(Error, Debug)]
pub enum NodeBuilderError {
#[error("Parse error in block statements: {0}")]
Parse(String),
#[error(transparent)]
Epoch(#[from] EpochError),
#[error(transparent)]
BlockBody(#[from] BlockBodyError),
#[error("Missing required field {field}")]
MissingRequired { field: String },
}

/// A Builder to simplify creation of a Node
///
/// Example:
///
/// let (node_query_sender, node) = NodeBuilder::default()
/// .network_id(network_id)
/// .comm(comm)
/// .miner_comm(miner_comm)
/// .addr(addr)
/// .storage(file_writer)
/// .genesis_code(genesis_code)
/// .data_path(data_path)
/// .build()?;
pub struct NodeBuilder<C: ProtoComm, S: BlockStorage> {
network_id: u32, // Network ID / magic number
comm: Option<C>, // UDP socket
addr: Option<C::Address>, // UDP port
storage: Option<S>, // A `BlockStorage` implementation
peers: Vec<C::Address>, // peers store and state control

#[cfg(feature = "events")]
event_emitter: Option<mpsc::Sender<NodeEventEmittedInfo>>,
miner_comm: Option<MinerCommunication>,

// these do not exist in Node. they are "transient"
data_path: PathBuf, // path to ~/.kindelia/state/<network_id>
genesis_code: String, // statements in genesis block body
}

impl<C: ProtoComm, S: BlockStorage> NodeBuilder<C, S> {
/// set network identifier
pub fn network_id(mut self, network_id: u32) -> Self {
self.network_id = network_id;
self
}

/// set comm channel, eg std::net::UdpSocket + ProtoComm
pub fn comm(mut self, comm: C) -> Self {
self.comm = Some(comm);
self
}

/// set comm addr, eg net::Address
pub fn addr(mut self, addr: C::Address) -> Self {
self.addr = Some(addr);
self
}

/// set storage, eg persistence::SimpleFileStorage
pub fn storage(mut self, storage: S) -> Self {
self.storage = Some(storage);
self
}

/// set source code statements for the genesis block
pub fn genesis_code(mut self, genesis_code: String) -> Self {
self.genesis_code = genesis_code;
self
}

/// set data path, eg ~/.kindelia/state/<network_id>
pub fn data_path(mut self, data_path: PathBuf) -> Self {
self.data_path = data_path;
self
}

/// set initial peers for this node to contact
pub fn peers(mut self, peers: Vec<C::Address>) -> Self {
self.peers = peers;
self
}

/// set event emitter
pub fn event_emitter(
mut self,
emitter: mpsc::Sender<NodeEventEmittedInfo>,
) -> Self {
self.event_emitter = Some(emitter);
self
}

/// set miner communication channel, or None if not mining.
pub fn miner_comm(mut self, miner_comm: Option<MinerCommunication>) -> Self {
self.miner_comm = miner_comm;
self
}

/// validates builder inputs and builds a Node instance.
pub fn build(
self,
) -> Result<(mpsc::SyncSender<NodeRequest<C>>, Node<C, S>), NodeBuilderError>
{
let genesis_stmts = parser::parse_code(&self.genesis_code)
.map_err(|e| NodeBuilderError::Parse(e))?;
let genesis_block = build_genesis_block(&genesis_stmts)?;
let genesis_block = genesis_block.hashed();
let genesis_hash = genesis_block.get_hash().into();

let runtime = init_runtime(self.data_path.join("heaps"), &genesis_stmts);

let tip = genesis_hash;
let block = u256map_from([(genesis_hash, genesis_block)]);
let pending = u256map_new();
let ancestor = u256map_new();
let wait_list = u256map_new();
let children = u256map_from([(genesis_hash, vec![])]);
let work = u256map_from([(genesis_hash, u256(0))]);
let height = u256map_from([(genesis_hash, 0)]);
let target = u256map_from([(genesis_hash, initial_target())]);
let results = u256map_from([(genesis_hash, vec![])]);

let (query_sender, query_recv) = mpsc::sync_channel(1);

let pool = PriorityQueue::new();
let peers = PeersStore::new();

let now = try_get_time()?;

let comm = self
.comm
.ok_or(NodeBuilderError::MissingRequired { field: "comm".to_string() })?;

let addr = self
.addr
.ok_or(NodeBuilderError::MissingRequired { field: "addr".to_string() })?;

let storage = self.storage.ok_or(NodeBuilderError::MissingRequired {
field: "storage".to_string(),
})?;

// finally we can instantiate a Node!
let mut node = Node {
network_id: self.network_id,
comm,
addr,
storage,
runtime,
query_recv,
pool,
peers,
genesis_hash,
tip,
block,
pending,
ancestor,
wait_list,
children,
work,
target,
height,
results,
miner_comm: self.miner_comm,

#[cfg(feature = "events")]
event_emitter: self.event_emitter,
};

// make node aware of each initial peer
self.peers.iter().for_each(|address| {
return node.peers.see_peer(
Peer { address: *address, seen_at: now },
#[cfg(feature = "events")] // TODO: remove (implement on Node)
node.event_emitter.clone(),
);
});

Ok((query_sender, node))
}
}

impl<C: ProtoComm, S: BlockStorage> Default for NodeBuilder<C, S> {
fn default() -> Self {
Self {
network_id: Default::default(),
comm: Default::default(),
addr: Default::default(),
storage: Default::default(),
peers: Default::default(),
data_path: Default::default(),
genesis_code: Default::default(),

#[cfg(feature = "events")]
event_emitter: Default::default(),
miner_comm: Default::default(),
}
}
}
1 change: 1 addition & 0 deletions kindelia_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#[allow(non_snake_case)]
pub mod api;
pub mod bits;
pub mod builder;
pub mod config;
pub mod constants;
pub mod runtime;
Expand Down
Loading

0 comments on commit c72f842

Please sign in to comment.