,
+ _pair: PhantomData,
}
-impl BeefyWorker
+impl BeefyWorker
where
- Block: BlockT,
- Id: Public + Debug,
+ B: Block,
+ BE: Backend,
+ P: sp_core::Pair,
+ P::Public: AppPublic + Codec,
+ P::Signature: Clone + Codec + Debug + PartialEq + TryFrom>,
+ C: Client,
+ C::Api: BeefyApi,
{
+ /// Retrun a new BEEFY worker instance.
+ ///
+ /// Note that full BEEFY worker initialization can only be completed, if an
+ /// on-chain BEEFY pallet is available. Reason is that the current active
+ /// validator set has to be fetched from the on-chain BEFFY pallet.
+ ///
+ /// For this reason, BEEFY worker initialization completes only after a finality
+ /// notification has been received. Such a notifcation is basically an indication
+ /// that an on-chain BEEFY pallet is available.
pub(crate) fn new(
- validator_set: ValidatorSet,
+ client: Arc,
key_store: SyncCryptoStorePtr,
- finality_notifications: FinalityNotifications,
- gossip_engine: GossipEngine,
- signed_commitment_sender: notification::BeefySignedCommitmentSender,
- best_finalized_block: NumberFor,
- best_block_voted_on: NumberFor,
+ signed_commitment_sender: notification::BeefySignedCommitmentSender,
+ gossip_engine: GossipEngine,
) -> Self {
+ BeefyWorker {
+ state: State::New,
+ local_id: None,
+ key_store,
+ min_interval: 2,
+ rounds: round::Rounds::new(vec![]),
+ finality_notifications: client.finality_notification_stream(),
+ gossip_engine: Arc::new(Mutex::new(gossip_engine)),
+ signed_commitment_sender,
+ best_finalized_block: client.info().finalized_number,
+ best_block_voted_on: Zero::zero(),
+ validator_set_id: 0,
+ client,
+ _backend: PhantomData,
+ _pair: PhantomData,
+ }
+ }
+
+ fn init_validator_set(&mut self) -> Result<(), error::Lifecycle> {
+ let at = BlockId::hash(self.client.info().best_hash);
+
+ let validator_set = self
+ .client
+ .runtime_api()
+ .validator_set(&at)
+ .map_err(|err| error::Lifecycle::MissingValidatorSet(err.to_string()))?;
+
let local_id = match validator_set
.validators
.iter()
- .find(|id| SyncCryptoStore::has_keys(&*key_store, &[(id.to_raw_vec(), KEY_TYPE)]))
+ .find(|id| SyncCryptoStore::has_keys(&*self.key_store, &[(id.to_raw_vec(), KEY_TYPE)]))
{
Some(id) => {
info!(target: "beefy", "🥩 Starting BEEFY worker with local id: {:?}", id);
+ self.state = State::Validate;
Some(id.clone())
}
None => {
info!(target: "beefy", "🥩 No local id found, BEEFY worker will be gossip only.");
+ self.state = State::Gossip;
None
}
};
- BeefyWorker {
- local_id,
- key_store,
- min_interval: 2,
- rounds: round::Rounds::new(validator_set.validators),
- finality_notifications,
- gossip_engine: Arc::new(Mutex::new(gossip_engine)),
- signed_commitment_sender,
- best_finalized_block,
- best_block_voted_on,
- validator_set_id: validator_set.id,
- }
+ self.local_id = local_id;
+ self.rounds = round::Rounds::new(validator_set.validators.clone());
+
+ debug!(target: "beefy", "🥩 Validator set with id {} initialized", validator_set.id);
+
+ Ok(())
}
}
-impl BeefyWorker
+impl BeefyWorker
where
- Block: BlockT,
- Id: Codec + Debug + PartialEq + Public,
- Signature: Clone + Codec + Debug + PartialEq + std::convert::TryFrom>,
- FinalityNotifications: Stream- > + Unpin,
+ B: Block,
+ S: Clone + Codec + Debug + PartialEq + std::convert::TryFrom>,
+ BE: Backend,
+ P: sp_core::Pair,
+ P::Public: AppPublic + Codec,
+ P::Signature: Clone + Codec + Debug + PartialEq + TryFrom>,
+ C: Client,
+ C::Api: BeefyApi,
{
- fn should_vote_on(&self, number: NumberFor) -> bool {
+ fn should_vote_on(&self, number: NumberFor) -> bool {
use sp_runtime::{traits::Saturating, SaturatedConversion};
// we only vote as a validator
- if self.local_id.is_none() {
+ if self.state != State::Validate {
return false;
}
@@ -134,38 +211,38 @@ where
number == next_block_to_vote_on
}
- fn sign_commitment(&self, id: &Id, commitment: &[u8]) -> Result> {
+ fn sign_commitment(&self, id: &P::Public, commitment: &[u8]) -> Result
> {
let sig = SyncCryptoStore::sign_with(&*self.key_store, KEY_TYPE, &id.to_public_crypto_pair(), &commitment)
- .map_err(|e| error::Error::CannotSign((*id).clone(), e.to_string()))?
- .ok_or_else(|| error::Error::CannotSign((*id).clone(), "No key in KeyStore found".into()))?;
+ .map_err(|e| error::Crypto::CannotSign((*id).clone(), e.to_string()))?
+ .ok_or_else(|| error::Crypto::CannotSign((*id).clone(), "No key in KeyStore found".into()))?;
let sig = sig
.clone()
.try_into()
- .map_err(|_| error::Error::InvalidSignature(sig.encode_hex(), (*id).clone()))?;
+ .map_err(|_| error::Crypto::InvalidSignature(sig.encode_hex(), (*id).clone()))?;
Ok(sig)
}
- fn handle_finality_notification(&mut self, notification: FinalityNotification) {
+ fn handle_finality_notification(&mut self, notification: FinalityNotification) {
debug!(target: "beefy", "🥩 Finality notification: {:?}", notification);
if self.should_vote_on(*notification.header.number()) {
let local_id = if let Some(id) = &self.local_id {
id
} else {
- warn!(target: "beefy", "🥩 Missing validator id - can't vote for: {:?}", notification.header.hash());
+ error!(target: "beefy", "🥩 Missing validator id - can't vote for: {:?}", notification.header.hash());
return;
};
- let mmr_root = if let Some(hash) = find_mmr_root_digest::(¬ification.header) {
+ let mmr_root = if let Some(hash) = find_mmr_root_digest::(¬ification.header) {
hash
} else {
warn!(target: "beefy", "🥩 No MMR root digest found for: {:?}", notification.header.hash());
return;
};
- if let Some(new) = find_authorities_change::(¬ification.header) {
+ if let Some(new) = find_authorities_change::(¬ification.header) {
debug!(target: "beefy", "🥩 New validator set: {:?}", new);
self.validator_set_id = new.id;
};
@@ -194,7 +271,7 @@ where
self.gossip_engine
.lock()
- .gossip_message(topic::(), message.encode(), false);
+ .gossip_message(topic::(), message.encode(), false);
debug!(target: "beefy", "🥩 Sent vote message: {:?}", message);
@@ -207,7 +284,7 @@ where
self.best_finalized_block = *notification.header.number();
}
- fn handle_vote(&mut self, round: (MmrRootHash, NumberFor), vote: (Id, Signature)) {
+ fn handle_vote(&mut self, round: (MmrRootHash, NumberFor), vote: (P::Public, S)) {
// TODO: validate signature
let vote_added = self.rounds.add_vote(round, vote);
@@ -229,11 +306,11 @@ where
}
pub(crate) async fn run(mut self) {
- let mut votes = Box::pin(self.gossip_engine.lock().messages_for(topic::()).filter_map(
+ let mut votes = Box::pin(self.gossip_engine.lock().messages_for(topic::()).filter_map(
|notification| async move {
debug!(target: "beefy", "🥩 Got vote message: {:?}", notification);
- VoteMessage::, Id, Signature>::decode(&mut ¬ification.message[..]).ok()
+ VoteMessage::, P::Public, S>::decode(&mut ¬ification.message[..]).ok()
},
));
@@ -244,12 +321,22 @@ where
futures::select! {
notification = self.finality_notifications.next().fuse() => {
if let Some(notification) = notification {
+ if self.state == State::New {
+ match self.init_validator_set() {
+ Ok(()) => (),
+ Err(err) => {
+ // we don't treat this as an error here because there really is
+ // nothing a node operator could do in order to remedy the error.
+ info!(target: "beefy", "🥩 Init validator set failed: {:?}", err);
+ }
+ }
+ }
self.handle_finality_notification(notification);
} else {
return;
}
},
- vote = votes.next() => {
+ vote = votes.next().fuse() => {
if let Some(vote) = vote {
self.handle_vote(
(vote.commitment.payload, vote.commitment.block_number),
@@ -269,8 +356,9 @@ where
}
/// Extract the MMR root hash from a digest in the given header, if it exists.
-fn find_mmr_root_digest(header: &Block::Header) -> Option
+fn find_mmr_root_digest(header: &B::Header) -> Option
where
+ B: Block,
Id: Codec,
{
header.digest().logs().iter().find_map(|log| {
@@ -285,7 +373,7 @@ where
/// validator set or `None` in case no validator set change has been signaled.
fn find_authorities_change(header: &B::Header) -> Option>
where
- B: BlockT,
+ B: Block,
Id: Codec,
{
let id = OpaqueDigestItemId::Consensus(&BEEFY_ENGINE_ID);