Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(node): add builder for Node #265

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions kindelia/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use kindelia_core::config::{
};
use kindelia_core::net::{Address, ProtoComm};
use kindelia_core::node::{
spawn_miner, Node, Transaction, MAX_TRANSACTION_SIZE,
spawn_miner, NodeBuilder, Transaction, MAX_TRANSACTION_SIZE,
};
use kindelia_core::persistence::{
get_ordered_blocks_path, SimpleFileStorage, BLOCKS_DIR,
Expand Down Expand Up @@ -835,18 +835,20 @@ pub fn start_node<C: ProtoComm + 'static>(
let file_writter = SimpleFileStorage::new(node_config.data_path.clone())?;

// Node state object
let (node_query_sender, node) = Node::new(
node_config.data_path,
node_config.network_id,
addr,
initial_peers,
comm,
miner_comm,
file_writter,
#[cfg(feature = "events")]
Some(event_tx),
);

let genesis_code = include_str!("../../kindelia_core/genesis.kdl");
let (node_query_sender, node) = NodeBuilder::default()
.network_id(node_config.network_id)
.comm(comm)
.miner_comm(miner_comm)
.addr(addr)
.storage(file_writter)
.genesis_code(node_config.data_path, genesis_code)?
.build(
&initial_peers,
#[cfg(feature = "events")]
Some(event_tx),
)?;

// WebSocket API router
let ws_router = kindelia_ws::ws_router::<
events::NodeEventType,
Expand Down
19 changes: 16 additions & 3 deletions kindelia_core/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use kindelia_lang::parser;
use primitive_types::U256;

use kindelia_core::bits::ProtoSerialize;
use kindelia_core::{constants, runtime, net, node, util};
use kindelia_core::net::ProtoComm;
use kindelia_core::{constants, net, node, runtime, util};

// KHVM
// ====
Expand Down Expand Up @@ -154,8 +154,21 @@ fn block_loading(c: &mut Criterion) {
let addr = comm.get_addr().unwrap();

// create Node
let (_, mut node) =
node::Node::new(dir.clone(), 0, addr, vec![], comm, None, storage, None);
let genesis_code = include_str!("../genesis.kdl");
let (_query_sender, mut node) = node::NodeBuilder::default()
// .network_id(node_config.network_id)
.comm(comm)
// .miner_comm(miner_comm)
.addr(addr)
.storage(storage)
.genesis_code(dir.clone(), genesis_code)
.unwrap()
.build(
&[],
#[cfg(feature = "events")]
None,
)
.unwrap();

// benchmark block loading
c.bench_function("block_loading", |b| b.iter(|| node.load_blocks()));
Expand Down
197 changes: 123 additions & 74 deletions kindelia_core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::{mpsc, Arc, Mutex};
use std::thread::JoinHandle;

use bit_vec::BitVec;
use derive_builder::Builder;
use primitive_types::U256;
use priority_queue::PriorityQueue;
use rand::seq::IteratorRandom;
Expand All @@ -24,7 +25,6 @@ use crate::api::{BlockInfo, FuncInfo, NodeRequest};
use crate::bits;
use crate::bits::ProtoSerialize;
use crate::config::MineConfig;
use crate::constants;
use crate::net::{ProtoAddr, ProtoComm};
use crate::persistence::{BlockStorage, BlockStorageError};
use crate::runtime::*;
Expand Down Expand Up @@ -157,6 +157,8 @@ pub enum BlockBodyError {
TooManyTx { count: usize, limit: usize },
#[error(transparent)]
Transaction(#[from] TransactionError),
#[error("Parse error in statements: {0}")]
ParseError(String),
}

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -290,27 +292,74 @@ pub enum InclusionState {
INCLUDED,
}

/// A Builder to simplify creation of a Node
///
/// Example:
///
/// let (node_query_sender, node) = NodeBuilder::default()
/// .network_id(network_id)
/// .comm(comm)
/// .miner_comm(miner_comm)
/// .addr(addr)
/// .storage(file_writer)
/// .genesis_code(data_path, genesis_code)?
/// .build(
/// &initial_peers,
/// #[cfg(feature = "events")]
/// Some(event_tx),
/// )?;
//
// note: Properties that are annotated with builder(setter(custom))
// are not exposed by NodeBuilder. Instead they are set
// by the custom ::genesis_code() and ::build() methods.
//
// note: The custom ::build() must be called instead of the
// default derived build method, which is renamed to
// ::build_private() and made private.
//
// Todo: impl builder validation function.
//
// Todo: use "transient" properties to simplify API further.
// (might require a fully custom Builder)
// see: https://github.com/colin-kiegel/rust-derive-builder/issues/279
//
// TODO: refactor .block as map to struct? Better safety. Why not?
#[rustfmt::skip]
#[derive(Builder)]
#[builder(pattern = "owned")]
#[builder(build_fn(name = "build_private", private))]
pub struct Node<C: ProtoComm, S: BlockStorage> {
pub network_id : u32, // Network ID / magic number
pub comm : C, // UDP socket
pub addr : C::Address, // UDP port
pub storage : S, // A `BlockStorage` implementation
#[builder(setter(custom))]
pub runtime : Runtime, // Kindelia's runtime
pub query_recv : mpsc::Receiver<NodeRequest<C>>, // Receives an API request
pub pool : PriorityQueue<Transaction, u64>, // transactions to be mined
pub peers : PeersStore<C::Address>, // peers store and state control

#[builder(setter(custom))]
pub genesis_hash : U256,
#[builder(setter(custom))]
pub tip : U256, // current tip
#[builder(setter(custom))]
pub block : U256Map<HashedBlock>, // block hash -> block
#[builder(setter(custom))]
pub pending : U256Map<HashedBlock>, // block hash -> downloaded block, waiting for ancestors
#[builder(setter(custom))]
pub ancestor : U256Map<U256>, // block hash -> hash of its most recent missing ancestor (shortcut jump table)
#[builder(setter(custom))]
pub wait_list : U256Map<Vec<U256>>, // block hash -> hashes of blocks that are waiting for this one
#[builder(setter(custom))]
pub children : U256Map<Vec<U256>>, // block hash -> hashes of this block's children
#[builder(setter(custom))]
pub work : U256Map<U256>, // block hash -> accumulated work
#[builder(setter(custom))]
pub target : U256Map<U256>, // block hash -> this block's target
#[builder(setter(custom))]
pub height : U256Map<u128>, // block hash -> cached height
#[builder(setter(custom))]
pub results : U256Map<Vec<StatementResult>>, // block hash -> results of the statements in this block

#[cfg(feature = "events")]
Expand Down Expand Up @@ -795,6 +844,79 @@ pub fn miner_loop(
// Node
// ----

impl<C: ProtoComm, S: BlockStorage> NodeBuilder<C, S> {
/// This is a setter that parses the genesis_code statements and
/// sets multiple Node fields:
/// runtime, genesis_hash, tip, block, pending, ancestor,
/// wait_list, children, work, height, target, results,
///
/// note: ideally data_path and genesis_code would be separate
/// infallible setters of 'transient' properties,
/// and the fallible work would be done inside ::build().
/// see: https://github.com/colin-kiegel/rust-derive-builder/issues/279
pub fn genesis_code(
mut self,
data_path: PathBuf,
genesis_code: &str,
) -> Result<Self, BlockBodyError> {
let genesis_stmts = parser::parse_code(genesis_code)
.map_err(|e| BlockBodyError::ParseError(e))?;
let genesis_block = build_genesis_block(&genesis_stmts)?;
let genesis_block = genesis_block.hashed();
let genesis_hash = genesis_block.get_hash().into();

self.runtime = Some(init_runtime(data_path.join("heaps"), &genesis_stmts));

self.genesis_hash = Some(genesis_hash);
self.tip = Some(genesis_hash);
self.block = Some(u256map_from([(genesis_hash, genesis_block)]));
self.pending = Some(u256map_new());
self.ancestor = Some(u256map_new());
self.wait_list = Some(u256map_new());
self.children = Some(u256map_from([(genesis_hash, vec![])]));
self.work = Some(u256map_from([(genesis_hash, u256(0))]));
self.height = Some(u256map_from([(genesis_hash, 0)]));
self.target = Some(u256map_from([(genesis_hash, initial_target())]));
self.results = Some(u256map_from([(genesis_hash, vec![])]));
Ok(self)
}

/// This is a custom build method that performs additional work
/// including setting up an mpsc channel and adding initial peers.
///
/// note: ideally most of genesis_code() would be in this method.
/// see: https://github.com/colin-kiegel/rust-derive-builder/issues/279
#[allow(clippy::type_complexity)]
pub fn build(
mut self,
peers: &[C::Address],
#[cfg(feature = "events")] event_emitter: Option<
mpsc::Sender<NodeEventEmittedInfo>,
>,
) -> Result<(mpsc::SyncSender<NodeRequest<C>>, Node<C, S>), NodeBuilderError>
{
let (query_sender, query_recv) = mpsc::sync_channel(1);

self.query_recv = Some(query_recv);
self.pool = Some(PriorityQueue::new());
self.peers = Some(PeersStore::new());

let now = get_time();

let mut node = self.build_private()?;

peers.iter().for_each(|address| {
return node.peers.see_peer(
Peer { address: *address, seen_at: now },
#[cfg(feature = "events")] // TODO: remove (implement on Node)
event_emitter.clone(),
);
});

Ok((query_sender, node))
}
}

/// Errors associated with Node.
#[derive(Error, Debug)]
pub enum NodeError {
Expand All @@ -816,79 +938,6 @@ pub enum BlockLookupError {
}

impl<C: ProtoComm, S: BlockStorage> Node<C, S> {
#[allow(clippy::too_many_arguments)]
pub fn new(
data_path: PathBuf,
network_id: u32,
addr: C::Address, // todo: review? https://github.com/Kindelia/Kindelia-Chain/pull/252#discussion_r1037732536
initial_peers: Vec<C::Address>,
comm: C,
miner_comm: Option<MinerCommunication>,
storage: S,
#[cfg(feature = "events")] event_emitter: Option<
mpsc::Sender<NodeEventEmittedInfo>,
>,
) -> (mpsc::SyncSender<NodeRequest<C>>, Self) {
let (query_sender, query_receiver) = mpsc::sync_channel(1);

let genesis_stmts =
parser::parse_code(constants::GENESIS_CODE).expect("Genesis code parses");
let genesis_block =
build_genesis_block(&genesis_stmts).expect("Genesis block builds");
let genesis_block = genesis_block.hashed();
let genesis_hash = genesis_block.get_hash().into();

let runtime = init_runtime(data_path.join("heaps"), &genesis_stmts);

#[rustfmt::skip]
let mut node = Node {
network_id,
addr,
comm,
runtime,
pool : PriorityQueue:: new(),
peers : PeersStore:: new(),

genesis_hash,
tip : genesis_hash,
block : u256map_from([(genesis_hash, genesis_block)]),
pending : u256map_new(),
ancestor : u256map_new(),
wait_list: u256map_new(),
children : u256map_from([(genesis_hash, vec![] )]),
work : u256map_from([(genesis_hash, u256(0) )]),
height : u256map_from([(genesis_hash, 0 )]),
target : u256map_from([(genesis_hash, initial_target())]),
results : u256map_from([(genesis_hash, vec![] )]),

#[cfg(feature = "events")]
event_emitter: event_emitter.clone(),
storage,
query_recv : query_receiver,
miner_comm,
};

let now = get_time();

initial_peers.iter().for_each(|address| {
return node.peers.see_peer(
Peer { address: *address, seen_at: now },
#[cfg(feature = "events")] // TODO: remove (implement on Node)
event_emitter.clone(),
);
});

// TODO: For testing purposes. Remove later.
// for &peer_port in try_ports.iter() {
// if peer_port != port {
// let address = Address::IPv4 { val0: 127, val1: 0, val2: 0, val3: 1, port: peer_port };
// node.peers.see_peer(Peer { address: address, seen_at: now })
// }
// }

(query_sender, node)
}

pub fn add_transaction(
&mut self,
transaction: Transaction,
Expand Down
26 changes: 15 additions & 11 deletions kindelia_core/src/test/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,21 @@ fn start_simulation<C: ProtoComm + 'static>(

// Node
let node_thread = {
let (node_query_sender, node) = node::Node::new(
node_config.data_path,
node_config.network_id,
addr,
initial_peers,
comm,
miner_comm,
storage,
#[cfg(feature = "events")]
Some(event_tx),
);
let genesis_code = include_str!("../../genesis.kdl");
let (node_query_sender, node) = node::NodeBuilder::default()
.network_id(node_config.network_id)
.comm(comm)
.miner_comm(miner_comm)
.addr(addr)
.storage(storage)
.genesis_code(node_config.data_path, genesis_code)
.unwrap()
.build(
&initial_peers,
#[cfg(feature = "events")]
Some(event_tx),
)
.unwrap();

// Spawns the node thread
std::thread::spawn(move || {
Expand Down