Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Collator node workflow (#280)
Browse files Browse the repository at this point in the history
* arbitrary application logic in CLI

* collation work

* split up exit and work futures in application

* collation node workflow

* typo

* indentation fix

* doc grumbles

* rename Application to Worker

* refactor Worker::exit to exit_only
  • Loading branch information
rphmeier authored and pepyakin committed Jul 6, 2018
1 parent 6f7f8bf commit ddc8d70
Show file tree
Hide file tree
Showing 15 changed files with 293 additions and 96 deletions.
10 changes: 7 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ authors = ["Parity Technologies <[email protected]>"]
[dependencies]
error-chain = "0.12"
polkadot-cli = { path = "polkadot/cli" }
futures = "0.1"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }

[workspace]
members = [
Expand Down
1 change: 0 additions & 1 deletion polkadot/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ ed25519 = { path = "../../substrate/ed25519" }
app_dirs = "1.2"
tokio = "0.1.7"
futures = "0.1.17"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
fdlimit = "0.1"
parking_lot = "0.4"
serde_json = "1.0"
Expand Down
95 changes: 60 additions & 35 deletions polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ extern crate atty;
extern crate ansi_term;
extern crate regex;
extern crate time;
extern crate fdlimit;
extern crate futures;
extern crate tokio;
extern crate ctrlc;
extern crate fdlimit;
extern crate ed25519;
extern crate triehash;
extern crate parking_lot;
Expand Down Expand Up @@ -66,6 +65,11 @@ mod informant;
mod chain_spec;

pub use chain_spec::ChainSpec;
pub use client::error::Error as ClientError;
pub use client::backend::Backend as ClientBackend;
pub use state_machine::Backend as StateMachineBackend;
pub use polkadot_primitives::Block as PolkadotBlock;
pub use service::{Components as ServiceComponents, Service};

use std::io::{self, Write, Read, stdin, stdout};
use std::fs::File;
Expand Down Expand Up @@ -117,6 +121,26 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
.unwrap_or_else(default_base_path)
}

/// Additional worker making use of the node, to run asynchronously before shutdown.
///
/// This will be invoked with the service and spawn a future that resolves
/// when complete.
pub trait Worker {
/// A future that resolves when the work is done or the node should exit.
/// This will be run on a tokio runtime.
type Work: Future<Item=(),Error=()>;

/// An exit scheduled for the future.
type Exit: Future<Item=(),Error=()> + Send + 'static;

/// Don't work, but schedule an exit.
fn exit_only(self) -> Self::Exit;

/// Do work and schedule exit.
fn work<C: ServiceComponents>(self, service: &Service<C>) -> Self::Work
where ClientError: From<<<<C as ServiceComponents>::Backend as ClientBackend<PolkadotBlock>>::State as StateMachineBackend>::Error>;
}

/// Parse command line arguments and start the node.
///
/// IANA unassigned port ranges that we could use:
Expand All @@ -125,9 +149,10 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
/// 9556-9591 Unassigned
/// 9803-9874 Unassigned
/// 9926-9949 Unassigned
pub fn run<I, T>(args: I) -> error::Result<()> where
pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
I: IntoIterator<Item = T>,
T: Into<std::ffi::OsString> + Clone,
W: Worker,
{
let yaml = load_yaml!("./cli.yml");
let matches = match clap::App::from_yaml(yaml).version(&(crate_version!().to_owned() + "\n")[..]).get_matches_from_safe(args) {
Expand All @@ -154,11 +179,11 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
}

if let Some(matches) = matches.subcommand_matches("export-blocks") {
return export_blocks(matches);
return export_blocks(matches, worker.exit_only());
}

if let Some(matches) = matches.subcommand_matches("import-blocks") {
return import_blocks(matches);
return import_blocks(matches, worker.exit_only());
}

let spec = load_spec(&matches)?;
Expand Down Expand Up @@ -255,8 +280,8 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
};

match role == service::Role::LIGHT {
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf)?,
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf, worker)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf, worker)?,
}

// TODO: hard exit if this stalls?
Expand All @@ -272,16 +297,19 @@ fn build_spec(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}

fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
fn export_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
where E: Future<Item=(),Error=()> + Send + 'static
{
let base_path = base_path(matches);
let spec = load_spec(&matches)?;
let mut config = service::Configuration::default_with_spec(spec);
config.database_path = db_path(&base_path).to_string_lossy().into();
info!("DB path: {}", config.database_path);
let client = service::new_client(config)?;
let (exit_send, exit) = std::sync::mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).expect("Error sending exit notification");
let (exit_send, exit_recv) = std::sync::mpsc::channel();
::std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});
info!("Exporting blocks");
let mut block: u32 = match matches.value_of("from") {
Expand Down Expand Up @@ -310,7 +338,7 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
}

loop {
if exit.try_recv().is_ok() {
if exit_recv.try_recv().is_ok() {
break;
}
match client.block(&BlockId::number(block as u64))? {
Expand All @@ -334,15 +362,19 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}

fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
fn import_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
where E: Future<Item=(),Error=()> + Send + 'static
{
let spec = load_spec(&matches)?;
let base_path = base_path(matches);
let mut config = service::Configuration::default_with_spec(spec);
config.database_path = db_path(&base_path).to_string_lossy().into();
let client = service::new_client(config)?;
let (exit_send, exit) = std::sync::mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).expect("Error sending exit notification");
let (exit_send, exit_recv) = std::sync::mpsc::channel();

::std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});

let mut file: Box<Read> = match matches.value_of("INPUT") {
Expand All @@ -354,7 +386,7 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
let count: u32 = Slicable::decode(&mut file).ok_or("Error reading file")?;
let mut block = 0;
for _ in 0 .. count {
if exit.try_recv().is_ok() {
if exit_recv.try_recv().is_ok() {
break;
}
match SignedBlock::decode(&mut file) {
Expand All @@ -377,27 +409,19 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}

fn run_until_exit<C>(runtime: &mut Runtime, service: service::Service<C>, matches: &clap::ArgMatches, sys_conf: SystemConfiguration) -> error::Result<()>
fn run_until_exit<C, W>(
runtime: &mut Runtime,
service: service::Service<C>,
matches: &clap::ArgMatches,
sys_conf: SystemConfiguration,
worker: W,
) -> error::Result<()>
where
C: service::Components,
W: Worker,
client::error::Error: From<<<<C as service::Components>::Backend as client::backend::Backend<Block>>::State as state_machine::Backend>::Error>,
{
let exit = {
let (exit_send, exit) = exit_future::signal();
let exit_send = ::std::cell::RefCell::new(Some(exit_send));
ctrlc::CtrlC::set_handler(move || {
let exit_send = exit_send
.try_borrow_mut()
.expect("only borrowed in non-reetrant signal handler; qed")
.take();

if let Some(signal) = exit_send {
signal.fire();
}
});

exit
};
let (exit_send, exit) = exit_future::signal();

let executor = runtime.executor();
informant::start(&service, exit.clone(), executor.clone());
Expand All @@ -422,7 +446,8 @@ fn run_until_exit<C>(runtime: &mut Runtime, service: service::Service<C>, matche
)
};

let _ = exit.wait();
let _ = worker.work(&service).wait();
exit_send.fire();
Ok(())
}

Expand Down
8 changes: 6 additions & 2 deletions polkadot/collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
name = "polkadot-collator"
version = "0.1.0"
authors = ["Parity Technologies <[email protected]>"]
description = "Abstract collation logic"
description = "Collator node implementation"

[dependencies]
futures = "0.1.17"
substrate-client = { path = "../../substrate/client" }
substrate-codec = { path = "../../substrate/codec", version = "0.1" }
substrate-primitives = { path = "../../substrate/primitives", version = "0.1" }
polkadot-api = { path = "../api" }
polkadot-runtime = { path = "../runtime", version = "0.1" }
polkadot-primitives = { path = "../primitives", version = "0.1" }
polkadot-parachain = { path = "../parachain", version = "0.1" }
polkadot-cli = { path = "../cli" }
log = "0.4"
ed25519 = { path = "../../substrate/ed25519" }
Loading

0 comments on commit ddc8d70

Please sign in to comment.