Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Backport updates of substrate (#415)
Browse files Browse the repository at this point in the history
* Update to latest Substrate (#407)

* Update to latest Substrate

* Fix main.rs

* Update substrate master (#411)

* in progress impl

* im_online authorityid

* fix

* fix

* use polkadot-master

* trigger CI

* trigger CI

* fix removal

* storage reorganize included

* lock version

* remove unused

* increment spec version

* ICMP message-routing gossip (#304)

* core logic for ICMP gossip

* refactor gossip to make more extension friendly

* move files aroun

* extract attestation-gossip logic to its own module

* message validation and broadcast logic

* fix upstream crates' compilation

* add a test

* another test for overlapping

* Some grammar and phrasing tweaks

Co-Authored-By: Luke Schoen <[email protected]>

* add since parameter to ingress runtime API

* broadcast out known unrouted message queues

* fix compilation of service and collator

* remove useless index_mapping

* some tests for icmp propagation

* fix decoding bug and test icmp queue validation

* simplify engine-id definition

Co-Authored-By: Sergei Pepyakin <[email protected]>

* address some grumbles

* some cleanup of old circulation code

* give network a handle to extrinsic store on startup

* an honest collator ensures data available as well

* address some grumbles

* add docs; rename the attestation session to "leaf work"

* module docs

* move gossip back to gossip.rs

* clean up and document attestation-gossip a bit

* some more docs on the availability store

* store all outgoing message queues in the availability store

* filter `Extrinsic` out of validation crate

* expunge Extrinsic from network

* expunge Extrinsic from erasure-coding

* expunge Extrinsic from collator

* expunge from adder-collator

* rename ExtrinsicStore to AvailabilityStore everywhere

* annotate and clean up message-routing tests
  • Loading branch information
gui1117 authored and gavofyork committed Sep 3, 2019
1 parent e086465 commit 60155e8
Show file tree
Hide file tree
Showing 32 changed files with 2,459 additions and 1,283 deletions.
273 changes: 167 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ edition = "2018"
cli = { package = "polkadot-cli", path = "cli" }
futures = "0.1"
ctrlc = { version = "3.0", features = ["termination"] }
service = { package = "polkadot-service", path = "service" }

[build-dependencies]
vergen = "3"
Expand Down
102 changes: 74 additions & 28 deletions availability-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

//! Persistent database for parachain data.
//! Persistent database for parachain data: PoV block data and outgoing messages.
//!
//! This will be written into during the block validation pipeline, and queried
//! by networking code in order to circulate required data and maintain availability
//! of it.

use codec::{Encode, Decode};
use kvdb::{KeyValueDB, DBTransaction};
use kvdb_rocksdb::{Database, DatabaseConfig};
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Message};
use log::warn;

use std::collections::HashSet;
Expand All @@ -42,7 +46,7 @@ pub struct Config {
pub path: PathBuf,
}

/// Some data to keep available.
/// Some data to keep available about a parachain block candidate.
pub struct Data {
/// The relay chain parent hash this should be localized to.
pub relay_parent: Hash,
Expand All @@ -52,18 +56,16 @@ pub struct Data {
pub candidate_hash: Hash,
/// Block data.
pub block_data: BlockData,
/// Extrinsic data.
pub extrinsic: Option<Extrinsic>,
/// Outgoing message queues from execution of the block, if any.
///
/// The tuple pairs the message queue root and the queue data.
pub outgoing_queues: Option<Vec<(Hash, Vec<Message>)>>,
}

fn block_data_key(relay_parent: &Hash, candidate_hash: &Hash) -> Vec<u8> {
(relay_parent, candidate_hash, 0i8).encode()
}

fn extrinsic_key(relay_parent: &Hash, candidate_hash: &Hash) -> Vec<u8> {
(relay_parent, candidate_hash, 1i8).encode()
}

/// Handle to the availability store.
#[derive(Clone)]
pub struct Store {
Expand Down Expand Up @@ -96,6 +98,16 @@ impl Store {
}

/// Make some data available provisionally.
///
/// Validators with the responsibility of maintaining availability
/// for a block or collators collating a block will call this function
/// in order to persist that data to disk and so it can be queried and provided
/// to other nodes in the network.
///
/// The message data of `Data` is optional but is expected
/// to be present with the exception of the case where there is no message data
/// due to the block's invalidity. Determination of invalidity is beyond the
/// scope of this function.
pub fn make_available(&self, data: Data) -> io::Result<()> {
let mut tx = DBTransaction::new();

Expand All @@ -118,12 +130,16 @@ impl Store {
data.block_data.encode()
);

if let Some(extrinsic) = data.extrinsic {
tx.put_vec(
columns::DATA,
extrinsic_key(&data.relay_parent, &data.candidate_hash).as_slice(),
extrinsic.encode(),
);
if let Some(outgoing_queues) = data.outgoing_queues {
// This is kept forever and not pruned.
for (root, messages) in outgoing_queues {
tx.put_vec(
columns::DATA,
root.as_ref(),
messages.encode(),
);
}

}

self.inner.write(tx)
Expand All @@ -146,7 +162,6 @@ impl Store {
for candidate_hash in v {
if !finalized_candidates.contains(&candidate_hash) {
tx.delete(columns::DATA, block_data_key(&parent, &candidate_hash).as_slice());
tx.delete(columns::DATA, extrinsic_key(&parent, &candidate_hash).as_slice());
}
}

Expand All @@ -168,12 +183,11 @@ impl Store {
}
}

/// Query extrinsic data.
pub fn extrinsic(&self, relay_parent: Hash, candidate_hash: Hash) -> Option<Extrinsic> {
let encoded_key = extrinsic_key(&relay_parent, &candidate_hash);
match self.inner.get(columns::DATA, &encoded_key[..]) {
/// Query message queue data by message queue root hash.
pub fn queue_by_root(&self, queue_root: &Hash) -> Option<Vec<Message>> {
match self.inner.get(columns::DATA, queue_root.as_ref()) {
Ok(Some(raw)) => Some(
Extrinsic::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed")
<_>::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed")
),
Ok(None) => None,
Err(e) => {
Expand Down Expand Up @@ -207,29 +221,61 @@ mod tests {
parachain_id: para_id_1,
candidate_hash: candidate_1,
block_data: block_data_1.clone(),
extrinsic: Some(Extrinsic { outgoing_messages: Vec::new() }),
outgoing_queues: None,
}).unwrap();

store.make_available(Data {
relay_parent,
parachain_id: para_id_2,
candidate_hash: candidate_2,
block_data: block_data_2.clone(),
extrinsic: Some(Extrinsic { outgoing_messages: Vec::new() }),
outgoing_queues: None,
}).unwrap();

assert_eq!(store.block_data(relay_parent, candidate_1).unwrap(), block_data_1);
assert_eq!(store.block_data(relay_parent, candidate_2).unwrap(), block_data_2);

assert!(store.extrinsic(relay_parent, candidate_1).is_some());
assert!(store.extrinsic(relay_parent, candidate_2).is_some());

store.candidates_finalized(relay_parent, [candidate_1].iter().cloned().collect()).unwrap();

assert_eq!(store.block_data(relay_parent, candidate_1).unwrap(), block_data_1);
assert!(store.block_data(relay_parent, candidate_2).is_none());
}

#[test]
fn queues_available_by_queue_root() {
let relay_parent = [1; 32].into();
let para_id = 5.into();
let candidate = [2; 32].into();
let block_data = BlockData(vec![1, 2, 3]);

let message_queue_root_1 = [0x42; 32].into();
let message_queue_root_2 = [0x43; 32].into();

assert!(store.extrinsic(relay_parent, candidate_1).is_some());
assert!(store.extrinsic(relay_parent, candidate_2).is_none());
let message_a = Message(vec![1, 2, 3, 4]);
let message_b = Message(vec![4, 5, 6, 7]);

let outgoing_queues = vec![
(message_queue_root_1, vec![message_a.clone()]),
(message_queue_root_2, vec![message_b.clone()]),
];

let store = Store::new_in_memory();
store.make_available(Data {
relay_parent,
parachain_id: para_id,
candidate_hash: candidate,
block_data: block_data.clone(),
outgoing_queues: Some(outgoing_queues),
}).unwrap();

assert_eq!(
store.queue_by_root(&message_queue_root_1),
Some(vec![message_a]),
);

assert_eq!(
store.queue_by_root(&message_queue_root_2),
Some(vec![message_b]),
);
}
}
39 changes: 23 additions & 16 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@

mod chain_spec;

use std::ops::Deref;
use chain_spec::ChainSpec;
use futures::Future;
use tokio::runtime::Runtime;
use service::{Service as BareService, Error as ServiceError};
use std::sync::Arc;
use log::{info, error};
use structopt::StructOpt;

pub use service::{
Components as ServiceComponents, PolkadotService, CustomConfiguration, ServiceFactory, Factory,
AbstractService, CustomConfiguration,
ProvideRuntimeApi, CoreApi, ParachainHost,
};

Expand Down Expand Up @@ -63,7 +61,13 @@ pub trait Worker: IntoExit {
fn configuration(&self) -> service::CustomConfiguration { Default::default() }

/// Do work and schedule exit.
fn work<S: PolkadotService>(self, service: &S, executor: TaskExecutor) -> Self::Work;
fn work<S, SC, B, CE>(self, service: &S, executor: TaskExecutor) -> Self::Work
where S: AbstractService<Block = service::Block, RuntimeApi = service::RuntimeApi,
Backend = B, SelectChain = SC,
NetworkSpecialization = service::PolkadotProtocol, CallExecutor = CE>,
SC: service::SelectChain<service::Block> + 'static,
B: service::Backend<service::Block, service::Blake2Hasher> + 'static,
CE: service::CallExecutor<service::Block, service::Blake2Hasher> + Clone + Send + Sync + 'static;
}

#[derive(Debug, StructOpt, Clone)]
Expand Down Expand Up @@ -108,39 +112,42 @@ pub fn run<W>(worker: W, version: cli::VersionInfo) -> error::Result<()> where
service::Roles::LIGHT =>
run_until_exit(
runtime,
Factory::new_light(config).map_err(|e| format!("{:?}", e))?,
service::new_light(config).map_err(|e| format!("{:?}", e))?,
worker
),
_ => run_until_exit(
runtime,
Factory::new_full(config).map_err(|e| format!("{:?}", e))?,
service::new_full(config).map_err(|e| format!("{:?}", e))?,
worker
),
}.map_err(|e| format!("{:?}", e))
}),
cli::ParseAndPrepare::BuildSpec(cmd) => cmd.run(load_spec),
cli::ParseAndPrepare::ExportBlocks(cmd) =>
cmd.run::<service::Factory, _, _>(load_spec, worker),
cli::ParseAndPrepare::ImportBlocks(cmd) =>
cmd.run::<service::Factory, _, _>(load_spec, worker),
cli::ParseAndPrepare::ExportBlocks(cmd) => cmd.run_with_builder::<(), _, _, _, _, _>(|config|
Ok(service::new_chain_ops(config)?), load_spec, worker),
cli::ParseAndPrepare::ImportBlocks(cmd) => cmd.run_with_builder::<(), _, _, _, _, _>(|config|
Ok(service::new_chain_ops(config)?), load_spec, worker),
cli::ParseAndPrepare::PurgeChain(cmd) => cmd.run(load_spec),
cli::ParseAndPrepare::RevertChain(cmd) => cmd.run::<service::Factory, _>(load_spec),
cli::ParseAndPrepare::RevertChain(cmd) => cmd.run_with_builder::<(), _, _, _, _>(|config|
Ok(service::new_chain_ops(config)?), load_spec),
cli::ParseAndPrepare::CustomCommand(PolkadotSubCommands::ValidationWorker(args)) => {
service::run_validation_worker(&args.mem_id)?;
Ok(())
}
}
}

fn run_until_exit<T, C, W>(
fn run_until_exit<T, SC, B, CE, W>(
mut runtime: Runtime,
service: T,
worker: W,
) -> error::Result<()>
where
T: Deref<Target=BareService<C>> + Future<Item = (), Error = ServiceError> + Send + 'static,
C: service::Components,
BareService<C>: PolkadotService,
T: AbstractService<Block = service::Block, RuntimeApi = service::RuntimeApi,
SelectChain = SC, Backend = B, NetworkSpecialization = service::PolkadotProtocol, CallExecutor = CE>,
SC: service::SelectChain<service::Block> + 'static,
B: service::Backend<service::Block, service::Blake2Hasher> + 'static,
CE: service::CallExecutor<service::Block, service::Blake2Hasher> + Clone + Send + Sync + 'static,
W: Worker,
{
let (exit_send, exit) = exit_future::signal();
Expand All @@ -153,7 +160,7 @@ fn run_until_exit<T, C, W>(
// but we need to keep holding a reference to the global telemetry guard
let _telemetry = service.telemetry();

let work = worker.work(&*service, Arc::new(executor));
let work = worker.work(&service, Arc::new(executor));
let service = service.map_err(|err| error!("Error while running Service: {}", err));
let _ = runtime.block_on(service.select(work));
exit_send.fire();
Expand Down
1 change: 1 addition & 0 deletions collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ polkadot-primitives = { path = "../primitives" }
polkadot-cli = { path = "../cli" }
polkadot-network = { path = "../network" }
polkadot-validation = { path = "../validation" }
polkadot-service = { path = "../service" }
log = "0.4"
tokio = "0.1.7"

Expand Down
Loading

0 comments on commit 60155e8

Please sign in to comment.