Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
start working on building the real overseer (#1795)
Browse files Browse the repository at this point in the history
* start working on building the real overseer

Unfortunately, this fails to compile right now due to an upstream
failure to compile which is probably brought on by a recent upgrade
to rustc v1.47.

* fill in AllSubsystems internal constructors

* replace fn make_metrics with Metrics::attempt_to_register

* update to account for #1740

* remove Metrics::register, rename Metrics::attempt_to_register

* add 'static bounds to real_overseer type params

* pass authority_discovery and network_service to real_overseer

It's not straightforwardly obvious that this is the best way to handle
the case when there is no authority discovery service, but it seems
to be the best option available at the moment.

* select a proper database configuration for the availability store db

* use subdirectory for av-store database path

* apply Basti's patch which avoids needing to parameterize everything on Block

* simplify path extraction

* get all tests to compile

* Fix Prometheus double-registry error

for debugging purposes, added this to node/subsystem-util/src/lib.rs:472-476:

```rust
Some(registry) => Self::try_register(registry).map_err(|err| {
	eprintln!("PrometheusError calling {}::register: {:?}", std::any::type_name::<Self>(), err);
	err
}),
```

That pointed out where the registration was failing, which led to
this fix. The test still doesn't pass, but it now fails in a new
and different way!

* authorities must have authority discovery, but not necessarily overseer handlers

* fix broken SpawnedSubsystem impls

detailed logging determined that using the `Box::new` style of
future generation, the `self.run` method was never being called,
leading to dropped receivers / closed senders for those subsystems,
causing the overseer to shut down immediately.

This is not the final fix needed to get things working properly,
but it's a good start.

* use prometheus properly

Prometheus lets us register simple counters, which aren't very
interesting. It also allows us to register CounterVecs, which are.
With a CounterVec, you can provide a set of labels, which can
later be used to filter the counts.

We were using them wrong, though. This pattern was repeated in a
variety of places in the code:

```rust
// panics with an cardinality mismatch
let my_counter = register(CounterVec::new(opts, &["succeeded", "failed"])?, registry)?;
my_counter.with_label_values(&["succeeded"]).inc()
```

The problem is that the labels provided in the constructor are not
the set of legal values which can be annotated, but a set of individual
label names which can have individual, arbitrary values.

This commit fixes that.

* get av-store subsystem to actually run properly and not die on first signal

* typo fix: incomming -> incoming

* don't disable authority discovery in test nodes

* Fix rococo-v1 missing session keys

* Update node/core/av-store/Cargo.toml

* try dummying out av-store on non-full-nodes

* overseer and subsystems are required only for full nodes

* Reduce the amount of warnings on browser target

* Fix two more warnings

* InclusionInherent should actually have an Inherent module on rococo

* Ancestry: don't return genesis' parent hash

* Update Cargo.lock

* fix broken test

* update test script: specify chainspec as script argument

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <[email protected]>

* Update node/service/src/lib.rs

Co-authored-by: Bastian Köcher <[email protected]>

* node/service/src/lib: Return error via ? operator

* post-merge blues

* add is_collator flag

* prevent occasional av-store test panic

* simplify fix; expand application

* run authority_discovery in Role::Discover when collating

* distinguish between proposer closed channel errors

* add IsCollator enum, remove is_collator CLI flag

* improve formatting

* remove nop loop

* Fix some stuff

Co-authored-by: Andronik Ordian <[email protected]>
Co-authored-by: Bastian Köcher <[email protected]>
Co-authored-by: Fedor Sakharov <[email protected]>
Co-authored-by: Robert Habermeier <[email protected]>
Co-authored-by: Bastian Köcher <[email protected]>
Co-authored-by: Max Inden <[email protected]>
  • Loading branch information
7 people authored Oct 28, 2020
1 parent 80e3a7e commit 798f781
Show file tree
Hide file tree
Showing 29 changed files with 414 additions and 174 deletions.
20 changes: 19 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion cli/src/browser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use browser_utils::{
Client,
browser_configuration, set_console_error_panic_hook, init_console_log,
};
use std::str::FromStr;

/// Starts the client.
#[wasm_bindgen]
Expand Down
1 change: 1 addition & 0 deletions cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ pub fn run() -> Result<()> {
_ => service::build_full(
config,
authority_discovery_disabled,
service::IsCollator::No,
grandpa_pause,
).map(|full| full.task_manager),
}
Expand Down
2 changes: 2 additions & 0 deletions node/core/av-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-overseer = { path = "../../overseer" }
polkadot-primitives = { path = "../../../primitives" }

sc-service = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }

[dev-dependencies]
env_logger = "0.7.1"
assert_matches = "1.3.0"
Expand Down
43 changes: 31 additions & 12 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,23 @@ pub struct Config {
pub path: PathBuf,
}

impl std::convert::TryFrom<sc_service::config::DatabaseConfig> for Config {
type Error = &'static str;

fn try_from(config: sc_service::config::DatabaseConfig) -> Result<Self, Self::Error> {
let path = config.path().ok_or("custom databases are not supported")?;

Ok(Self {
// substrate cache size is improper here; just use the default
cache_size: None,
// DB path is a sub-directory of substrate db path to give two properties:
// 1: column numbers don't conflict with substrate
// 2: commands like purge-chain work without further changes
path: path.join("parachains").join("av-store"),
})
}
}

impl AvailabilityStoreSubsystem {
/// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result<Self> {
Expand Down Expand Up @@ -449,7 +466,6 @@ async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Contex
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
{
let ctx = &mut ctx;
loop {
// Every time the following two methods are called a read from DB is performed.
// But given that these are very small values which are essentially a newtype
Expand All @@ -470,16 +486,19 @@ where
ActiveLeavesUpdate { activated, .. })
)) => {
for activated in activated.into_iter() {
process_block_activated(ctx, &subsystem.inner, activated).await?;
process_block_activated(&mut ctx, &subsystem.inner, activated).await?;
}
}
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(hash))) => {
process_block_finalized(&subsystem, ctx, &subsystem.inner, hash).await?;
process_block_finalized(&subsystem, &mut ctx, &subsystem.inner, hash).await?;
}
Ok(FromOverseer::Communication { msg }) => {
process_message(&mut subsystem, ctx, msg).await?;
process_message(&mut subsystem, &mut ctx, msg).await?;
}
Err(_) => break,
Err(e) => {
log::error!("AvailabilityStoreSubsystem err: {:#?}", e);
break
},
}
}
pov_pruning_time = pov_pruning_time => {
Expand Down Expand Up @@ -945,15 +964,15 @@ fn query_inner<D: Decode>(db: &Arc<dyn KeyValueDB>, column: u32, key: &[u8]) ->
}

impl<Context> Subsystem<Context> for AvailabilityStoreSubsystem
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
where
Context: SubsystemContext<Message = AvailabilityStoreMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(async move {
if let Err(e) = run(self, ctx).await {
log::error!(target: LOG_TARGET, "Subsystem exited with an error {:?}", e);
}
});
let future = run(self, ctx)
.map(|r| if let Err(e) = r {
log::error!(target: "availabilitystore", "Subsystem exited with an error {:?}", e);
})
.boxed();

SpawnedSubsystem {
name: "availability-store-subsystem",
Expand Down
8 changes: 4 additions & 4 deletions node/core/candidate-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,10 @@ impl metrics::Metrics for Metrics {
seconds: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"candidate_selection_invalid_selections_total",
"Number of Candidate Selection subsystem seconding selections which proved to be invalid.",
"candidate_selection_seconds_total",
"Number of Candidate Selection subsystem seconding events.",
),
&["succeeded", "failed"],
&["success"],
)?,
registry,
)?,
Expand All @@ -433,7 +433,7 @@ impl metrics::Metrics for Metrics {
"candidate_selection_invalid_selections_total",
"Number of Candidate Selection subsystem seconding selections which proved to be invalid.",
),
&["succeeded", "failed"],
&["success"],
)?,
registry,
)?,
Expand Down
4 changes: 2 additions & 2 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Metrics {
metrics.validation_requests.with_label_values(&["invalid"]).inc();
},
Err(_) => {
metrics.validation_requests.with_label_values(&["failed"]).inc();
metrics.validation_requests.with_label_values(&["validation failure"]).inc();
},
}
}
Expand All @@ -98,7 +98,7 @@ impl metrics::Metrics for Metrics {
"parachain_validation_requests_total",
"Number of validation requests served.",
),
&["valid", "invalid", "failed"],
&["validity"],
)?,
registry,
)?,
Expand Down
20 changes: 13 additions & 7 deletions node/core/chain-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ use polkadot_node_subsystem_util::{
};
use polkadot_primitives::v1::{Block, BlockId};
use sp_blockchain::HeaderBackend;
use std::sync::Arc;

use futures::prelude::*;

/// The Chain API Subsystem implementation.
pub struct ChainApiSubsystem<Client> {
client: Client,
client: Arc<Client>,
metrics: Metrics,
}

impl<Client> ChainApiSubsystem<Client> {
/// Create a new Chain API subsystem with the given client.
pub fn new(client: Client, metrics: Metrics) -> Self {
pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
ChainApiSubsystem {
client,
metrics,
Expand Down Expand Up @@ -126,8 +127,13 @@ where
// fewer than `k` ancestors are available
Ok(None) => None,
Ok(Some(header)) => {
hash = header.parent_hash;
Some(Ok(hash))
// stop at the genesis header.
if header.number == 1 {
None
} else {
hash = header.parent_hash;
Some(Ok(hash))
}
}
}
});
Expand Down Expand Up @@ -171,7 +177,7 @@ impl metrics::Metrics for Metrics {
"parachain_chain_api_requests_total",
"Number of Chain API requests served.",
),
&["succeeded", "failed"],
&["success"],
)?,
registry,
)?,
Expand Down Expand Up @@ -300,11 +306,11 @@ mod tests {
}

fn test_harness(
test: impl FnOnce(TestClient, TestSubsystemContextHandle<ChainApiMessage>)
test: impl FnOnce(Arc<TestClient>, TestSubsystemContextHandle<ChainApiMessage>)
-> BoxFuture<'static, ()>,
) {
let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new());
let client = TestClient::default();
let client = Arc::new(TestClient::default());

let subsystem = ChainApiSubsystem::new(client.clone(), Metrics(None));
let chain_api_task = run(ctx, subsystem).map(|x| x.unwrap());
Expand Down
11 changes: 6 additions & 5 deletions node/core/proposer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where
let (sender, receiver) = futures::channel::oneshot::channel();

overseer.wait_for_activation(parent_header_hash, sender).await?;
receiver.await.map_err(Error::ClosedChannelFromProvisioner)??;
receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)??;

let (sender, receiver) = futures::channel::oneshot::channel();
// strictly speaking, we don't _have_ to .await this send_msg before opening the
Expand All @@ -156,7 +156,7 @@ where
ProvisionerMessage::RequestInherentData(parent_header_hash, sender),
)).await?;

receiver.await.map_err(Error::ClosedChannelFromProvisioner)
receiver.await.map_err(|_| Error::ClosedChannelAwaitingInherentData)
}
.boxed()
.fuse();
Expand Down Expand Up @@ -236,7 +236,8 @@ pub enum Error {
Blockchain(sp_blockchain::Error),
Inherent(sp_inherents::Error),
Timeout,
ClosedChannelFromProvisioner(futures::channel::oneshot::Canceled),
ClosedChannelAwaitingActivation,
ClosedChannelAwaitingInherentData,
Subsystem(SubsystemError)
}

Expand Down Expand Up @@ -271,7 +272,8 @@ impl fmt::Display for Error {
Self::Blockchain(err) => write!(f, "blockchain error: {}", err),
Self::Inherent(err) => write!(f, "inherent error: {:?}", err),
Self::Timeout => write!(f, "timeout: provisioner did not return inherent data after {:?}", PROPOSE_TIMEOUT),
Self::ClosedChannelFromProvisioner(err) => write!(f, "provisioner closed inherent data channel before sending: {}", err),
Self::ClosedChannelAwaitingActivation => write!(f, "closed channel from overseer when awaiting activation"),
Self::ClosedChannelAwaitingInherentData => write!(f, "closed channel from provisioner when awaiting inherent data"),
Self::Subsystem(err) => write!(f, "subsystem error: {:?}", err),
}
}
Expand All @@ -282,7 +284,6 @@ impl std::error::Error for Error {
match self {
Self::Consensus(err) => Some(err),
Self::Blockchain(err) => Some(err),
Self::ClosedChannelFromProvisioner(err) => Some(err),
Self::Subsystem(err) => Some(err),
_ => None
}
Expand Down
4 changes: 2 additions & 2 deletions node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ impl Metrics {
fn on_inherent_data_request(&self, response: Result<(), ()>) {
if let Some(metrics) = &self.0 {
match response {
Ok(()) => metrics.inherent_data_requests.with_label_values(&["succeded"]).inc(),
Ok(()) => metrics.inherent_data_requests.with_label_values(&["succeeded"]).inc(),
Err(()) => metrics.inherent_data_requests.with_label_values(&["failed"]).inc(),
}
}
Expand All @@ -498,7 +498,7 @@ impl metrics::Metrics for Metrics {
"parachain_inherent_data_requests_total",
"Number of InherentData requests served by provisioner.",
),
&["succeeded", "failed"],
&["success"],
)?,
registry,
)?,
Expand Down
Loading

0 comments on commit 798f781

Please sign in to comment.