Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Collator node workflow #280

Merged
merged 11 commits into from
Jul 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ authors = ["Parity Technologies <[email protected]>"]
[dependencies]
error-chain = "0.12"
polkadot-cli = { path = "polkadot/cli" }
futures = "0.1"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }

[workspace]
members = [
Expand Down
1 change: 0 additions & 1 deletion polkadot/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ ed25519 = { path = "../../substrate/ed25519" }
app_dirs = "1.2"
tokio = "0.1.7"
futures = "0.1.17"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
fdlimit = "0.1"
parking_lot = "0.4"
serde_json = "1.0"
Expand Down
95 changes: 60 additions & 35 deletions polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ extern crate atty;
extern crate ansi_term;
extern crate regex;
extern crate time;
extern crate fdlimit;
extern crate futures;
extern crate tokio;
extern crate ctrlc;
extern crate fdlimit;
extern crate ed25519;
extern crate triehash;
extern crate parking_lot;
Expand Down Expand Up @@ -66,6 +65,11 @@ mod informant;
mod chain_spec;

pub use chain_spec::ChainSpec;
pub use client::error::Error as ClientError;
pub use client::backend::Backend as ClientBackend;
pub use state_machine::Backend as StateMachineBackend;
pub use polkadot_primitives::Block as PolkadotBlock;
pub use service::{Components as ServiceComponents, Service};

use std::io::{self, Write, Read, stdin, stdout};
use std::fs::File;
Expand Down Expand Up @@ -117,6 +121,26 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
.unwrap_or_else(default_base_path)
}

/// Additional worker making use of the node, to run asynchronously before shutdown.
///
/// This will be invoked with the service and spawn a future that resolves
/// when complete.
pub trait Worker {
/// A future that resolves when the work is done or the node should exit.
/// This will be run on a tokio runtime.
type Work: Future<Item=(),Error=()>;

/// An exit scheduled for the future.
type Exit: Future<Item=(),Error=()> + Send + 'static;

/// Don't work, but schedule an exit.
fn exit_only(self) -> Self::Exit;

/// Do work and schedule exit.
fn work<C: ServiceComponents>(self, service: &Service<C>) -> Self::Work
where ClientError: From<<<<C as ServiceComponents>::Backend as ClientBackend<PolkadotBlock>>::State as StateMachineBackend>::Error>;
}

/// Parse command line arguments and start the node.
///
/// IANA unassigned port ranges that we could use:
Expand All @@ -125,9 +149,10 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
/// 9556-9591 Unassigned
/// 9803-9874 Unassigned
/// 9926-9949 Unassigned
pub fn run<I, T>(args: I) -> error::Result<()> where
pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
I: IntoIterator<Item = T>,
T: Into<std::ffi::OsString> + Clone,
W: Worker,
{
let yaml = load_yaml!("./cli.yml");
let matches = match clap::App::from_yaml(yaml).version(&(crate_version!().to_owned() + "\n")[..]).get_matches_from_safe(args) {
Expand All @@ -154,11 +179,11 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
}

if let Some(matches) = matches.subcommand_matches("export-blocks") {
return export_blocks(matches);
return export_blocks(matches, worker.exit_only());
}

if let Some(matches) = matches.subcommand_matches("import-blocks") {
return import_blocks(matches);
return import_blocks(matches, worker.exit_only());
}

let spec = load_spec(&matches)?;
Expand Down Expand Up @@ -255,8 +280,8 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
};

match role == service::Role::LIGHT {
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf)?,
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf, worker)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf, worker)?,
}

// TODO: hard exit if this stalls?
Expand All @@ -272,16 +297,19 @@ fn build_spec(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}

fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
fn export_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
where E: Future<Item=(),Error=()> + Send + 'static
{
let base_path = base_path(matches);
let spec = load_spec(&matches)?;
let mut config = service::Configuration::default_with_spec(spec);
config.database_path = db_path(&base_path).to_string_lossy().into();
info!("DB path: {}", config.database_path);
let client = service::new_client(config)?;
let (exit_send, exit) = std::sync::mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).expect("Error sending exit notification");
let (exit_send, exit_recv) = std::sync::mpsc::channel();
::std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});
info!("Exporting blocks");
let mut block: u32 = match matches.value_of("from") {
Expand Down Expand Up @@ -310,7 +338,7 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
}

loop {
if exit.try_recv().is_ok() {
if exit_recv.try_recv().is_ok() {
break;
}
match client.block(&BlockId::number(block as u64))? {
Expand All @@ -334,15 +362,19 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}

fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
fn import_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
where E: Future<Item=(),Error=()> + Send + 'static
{
let spec = load_spec(&matches)?;
let base_path = base_path(matches);
let mut config = service::Configuration::default_with_spec(spec);
config.database_path = db_path(&base_path).to_string_lossy().into();
let client = service::new_client(config)?;
let (exit_send, exit) = std::sync::mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).expect("Error sending exit notification");
let (exit_send, exit_recv) = std::sync::mpsc::channel();

::std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});

let mut file: Box<Read> = match matches.value_of("INPUT") {
Expand All @@ -354,7 +386,7 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
let count: u32 = Slicable::decode(&mut file).ok_or("Error reading file")?;
let mut block = 0;
for _ in 0 .. count {
if exit.try_recv().is_ok() {
if exit_recv.try_recv().is_ok() {
break;
}
match SignedBlock::decode(&mut file) {
Expand All @@ -377,27 +409,19 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}

fn run_until_exit<C>(runtime: &mut Runtime, service: service::Service<C>, matches: &clap::ArgMatches, sys_conf: SystemConfiguration) -> error::Result<()>
fn run_until_exit<C, W>(
runtime: &mut Runtime,
service: service::Service<C>,
matches: &clap::ArgMatches,
sys_conf: SystemConfiguration,
worker: W,
) -> error::Result<()>
where
C: service::Components,
W: Worker,
client::error::Error: From<<<<C as service::Components>::Backend as client::backend::Backend<Block>>::State as state_machine::Backend>::Error>,
{
let exit = {
let (exit_send, exit) = exit_future::signal();
let exit_send = ::std::cell::RefCell::new(Some(exit_send));
ctrlc::CtrlC::set_handler(move || {
let exit_send = exit_send
.try_borrow_mut()
.expect("only borrowed in non-reetrant signal handler; qed")
.take();

if let Some(signal) = exit_send {
signal.fire();
}
});

exit
};
let (exit_send, exit) = exit_future::signal();

let executor = runtime.executor();
informant::start(&service, exit.clone(), executor.clone());
Expand All @@ -422,7 +446,8 @@ fn run_until_exit<C>(runtime: &mut Runtime, service: service::Service<C>, matche
)
};

let _ = exit.wait();
let _ = worker.work(&service).wait();
exit_send.fire();
Ok(())
}

Expand Down
8 changes: 6 additions & 2 deletions polkadot/collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
name = "polkadot-collator"
version = "0.1.0"
authors = ["Parity Technologies <[email protected]>"]
description = "Abstract collation logic"
description = "Collator node implementation"

[dependencies]
futures = "0.1.17"
substrate-client = { path = "../../substrate/client" }
substrate-codec = { path = "../../substrate/codec", version = "0.1" }
substrate-primitives = { path = "../../substrate/primitives", version = "0.1" }
polkadot-api = { path = "../api" }
polkadot-runtime = { path = "../runtime", version = "0.1" }
polkadot-primitives = { path = "../primitives", version = "0.1" }
polkadot-parachain = { path = "../parachain", version = "0.1" }
polkadot-cli = { path = "../cli" }
log = "0.4"
ed25519 = { path = "../../substrate/ed25519" }
Loading