diff --git a/bridges/relays/client-substrate/src/chain.rs b/bridges/relays/client-substrate/src/chain.rs index 8c7dc00aa67bb..54c9ad4f3b68c 100644 --- a/bridges/relays/client-substrate/src/chain.rs +++ b/bridges/relays/client-substrate/src/chain.rs @@ -55,7 +55,7 @@ pub trait Chain: ChainBase + Clone { /// Block type. type SignedBlock: Member + Serialize + DeserializeOwned + BlockWithJustification; /// The aggregated `Call` type. - type Call: Clone + Codec + Debug + Send; + type Call: Clone + Codec + Debug + Send + Sync; } /// Substrate-based relay chain that supports parachains. diff --git a/bridges/relays/lib-substrate-relay/src/lib.rs b/bridges/relays/lib-substrate-relay/src/lib.rs index 37a4d602e598d..f9bd80d50793c 100644 --- a/bridges/relays/lib-substrate-relay/src/lib.rs +++ b/bridges/relays/lib-substrate-relay/src/lib.rs @@ -91,7 +91,7 @@ impl TaggedAccount { } /// Batch call builder. -pub trait BatchCallBuilder: Clone + Send { +pub trait BatchCallBuilder: Clone + Send + Sync { /// Create batch call from given calls vector. fn build_batch_call(&self, _calls: Vec) -> Call; } diff --git a/bridges/relays/messages/Cargo.toml b/bridges/relays/messages/Cargo.toml index 8c4b8257d5aa6..a45b272810537 100644 --- a/bridges/relays/messages/Cargo.toml +++ b/bridges/relays/messages/Cargo.toml @@ -8,6 +8,7 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0" [dependencies] async-std = { version = "1.6.5", features = ["attributes"] } async-trait = "0.1" +env_logger = "0.10" futures = "0.3.28" hex = "0.4" log = "0.4.17" diff --git a/bridges/relays/messages/src/message_lane_loop.rs b/bridges/relays/messages/src/message_lane_loop.rs index ba86f05ffd32a..b681d86d2ae8f 100644 --- a/bridges/relays/messages/src/message_lane_loop.rs +++ b/bridges/relays/messages/src/message_lane_loop.rs @@ -111,7 +111,7 @@ pub struct NoncesSubmitArtifacts { /// Batch transaction that already submit some headers and needs to be extended with /// messages/delivery proof before sending. -pub trait BatchTransaction: Debug + Send { +pub trait BatchTransaction: Debug + Send + Sync { /// Header that was required in the original call and which is bundled within this /// batch transaction. fn required_header_id(&self) -> HeaderId; @@ -622,11 +622,19 @@ pub(crate) mod tests { } impl TestClientData { - fn receive_messages(&mut self, proof: TestMessagesProof) { + fn receive_messages( + &mut self, + maybe_batch_tx: Option, + proof: TestMessagesProof, + ) { self.target_state.best_self = HeaderId(self.target_state.best_self.0 + 1, self.target_state.best_self.1 + 1); self.target_state.best_finalized_self = self.target_state.best_self; self.target_latest_received_nonce = *proof.0.end(); + if let Some(maybe_batch_tx) = maybe_batch_tx { + self.target_state.best_finalized_peer_at_best_self = + Some(maybe_batch_tx.required_header_id()); + } if let Some(target_latest_confirmed_received_nonce) = proof.1 { self.target_latest_confirmed_received_nonce = target_latest_confirmed_received_nonce; @@ -634,10 +642,18 @@ pub(crate) mod tests { self.submitted_messages_proofs.push(proof); } - fn receive_messages_delivery_proof(&mut self, proof: TestMessagesReceivingProof) { + fn receive_messages_delivery_proof( + &mut self, + maybe_batch_tx: Option, + proof: TestMessagesReceivingProof, + ) { self.source_state.best_self = HeaderId(self.source_state.best_self.0 + 1, self.source_state.best_self.1 + 1); self.source_state.best_finalized_self = self.source_state.best_self; + if let Some(maybe_batch_tx) = maybe_batch_tx { + self.source_state.best_finalized_peer_at_best_self = + Some(maybe_batch_tx.required_header_id()); + } self.submitted_messages_receiving_proofs.push(proof); self.source_latest_confirmed_received_nonce = proof; } @@ -760,13 +776,13 @@ pub(crate) mod tests { async fn submit_messages_receiving_proof( &self, - _maybe_batch_tx: Option, + maybe_batch_tx: Option, _generated_at_block: TargetHeaderIdOf, proof: TestMessagesReceivingProof, ) -> Result { let mut data = self.data.lock(); (self.tick)(&mut data); - data.receive_messages_delivery_proof(proof); + data.receive_messages_delivery_proof(maybe_batch_tx, proof); (self.post_tick)(&mut data); Ok(TestTransactionTracker(data.source_tracked_transaction_status)) } @@ -885,7 +901,7 @@ pub(crate) mod tests { async fn submit_messages_proof( &self, - _maybe_batch_tx: Option, + maybe_batch_tx: Option, _generated_at_header: SourceHeaderIdOf, nonces: RangeInclusive, proof: TestMessagesProof, @@ -895,7 +911,7 @@ pub(crate) mod tests { if data.is_target_fails { return Err(TestError) } - data.receive_messages(proof); + data.receive_messages(maybe_batch_tx, proof); (self.post_tick)(&mut data); Ok(NoncesSubmitArtifacts { nonces, diff --git a/bridges/relays/messages/src/message_race_delivery.rs b/bridges/relays/messages/src/message_race_delivery.rs index 7a245858b32d8..4af02ba2b568a 100644 --- a/bridges/relays/messages/src/message_race_delivery.rs +++ b/bridges/relays/messages/src/message_race_delivery.rs @@ -290,7 +290,185 @@ impl std::fmt::Debug for MessageDeliveryStrategy MessageDeliveryStrategy { +impl MessageDeliveryStrategy +where + P: MessageLane, + SC: MessageLaneSourceClient

, + TC: MessageLaneTargetClient

, +{ + /// Returns true if some race action can be selected (with `select_race_action`) at given + /// `best_finalized_source_header_id_at_best_target` source header at target. + async fn can_submit_transaction_with< + RS: RaceState, TargetHeaderIdOf

>, + >( + &self, + mut race_state: RS, + maybe_best_finalized_source_header_id_at_best_target: Option>, + ) -> bool { + if let Some(best_finalized_source_header_id_at_best_target) = + maybe_best_finalized_source_header_id_at_best_target + { + race_state.set_best_finalized_source_header_id_at_best_target( + best_finalized_source_header_id_at_best_target, + ); + + return self.select_race_action(race_state).await.is_some() + } + + false + } + + async fn select_race_action, TargetHeaderIdOf

>>( + &self, + race_state: RS, + ) -> Option<(RangeInclusive, MessageProofParameters)> { + let best_target_nonce = self.strategy.best_at_target()?; + let best_finalized_source_header_id_at_best_target = + race_state.best_finalized_source_header_id_at_best_target()?; + let latest_confirmed_nonce_at_source = self + .latest_confirmed_nonce_at_source(&best_finalized_source_header_id_at_best_target) + .unwrap_or(best_target_nonce); + let target_nonces = self.target_nonces.as_ref()?; + + // There's additional condition in the message delivery race: target would reject messages + // if there are too much unconfirmed messages at the inbound lane. + + // The receiving race is responsible to deliver confirmations back to the source chain. So + // if there's a lot of unconfirmed messages, let's wait until it'll be able to do its job. + let latest_received_nonce_at_target = target_nonces.latest_nonce; + let confirmations_missing = + latest_received_nonce_at_target.checked_sub(latest_confirmed_nonce_at_source); + match confirmations_missing { + Some(confirmations_missing) + if confirmations_missing >= self.max_unconfirmed_nonces_at_target => + { + log::debug!( + target: "bridge", + "Cannot deliver any more messages from {} to {}. Too many unconfirmed nonces \ + at target: target.latest_received={:?}, source.latest_confirmed={:?}, max={:?}", + MessageDeliveryRace::

::source_name(), + MessageDeliveryRace::

::target_name(), + latest_received_nonce_at_target, + latest_confirmed_nonce_at_source, + self.max_unconfirmed_nonces_at_target, + ); + + return None + }, + _ => (), + } + + // Ok - we may have new nonces to deliver. But target may still reject new messages, because + // we haven't notified it that (some) messages have been confirmed. So we may want to + // include updated `source.latest_confirmed` in the proof. + // + // Important note: we're including outbound state lane proof whenever there are unconfirmed + // nonces on the target chain. Other strategy is to include it only if it's absolutely + // necessary. + let latest_confirmed_nonce_at_target = target_nonces.nonces_data.confirmed_nonce; + let outbound_state_proof_required = + latest_confirmed_nonce_at_target < latest_confirmed_nonce_at_source; + + // The target node would also reject messages if there are too many entries in the + // "unrewarded relayers" set. If we are unable to prove new rewards to the target node, then + // we should wait for confirmations race. + let unrewarded_limit_reached = + target_nonces.nonces_data.unrewarded_relayers.unrewarded_relayer_entries >= + self.max_unrewarded_relayer_entries_at_target || + target_nonces.nonces_data.unrewarded_relayers.total_messages >= + self.max_unconfirmed_nonces_at_target; + if unrewarded_limit_reached { + // so there are already too many unrewarded relayer entries in the set + // + // => check if we can prove enough rewards. If not, we should wait for more rewards to + // be paid + let number_of_rewards_being_proved = + latest_confirmed_nonce_at_source.saturating_sub(latest_confirmed_nonce_at_target); + let enough_rewards_being_proved = number_of_rewards_being_proved >= + target_nonces.nonces_data.unrewarded_relayers.messages_in_oldest_entry; + if !enough_rewards_being_proved { + return None + } + } + + // If we're here, then the confirmations race did its job && sending side now knows that + // messages have been delivered. Now let's select nonces that we want to deliver. + // + // We may deliver at most: + // + // max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target - + // latest_confirmed_nonce_at_target) + // + // messages in the batch. But since we're including outbound state proof in the batch, then + // it may be increased to: + // + // max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target - + // latest_confirmed_nonce_at_source) + let future_confirmed_nonce_at_target = if outbound_state_proof_required { + latest_confirmed_nonce_at_source + } else { + latest_confirmed_nonce_at_target + }; + let max_nonces = latest_received_nonce_at_target + .checked_sub(future_confirmed_nonce_at_target) + .and_then(|diff| self.max_unconfirmed_nonces_at_target.checked_sub(diff)) + .unwrap_or_default(); + let max_nonces = std::cmp::min(max_nonces, self.max_messages_in_single_batch); + let max_messages_weight_in_single_batch = self.max_messages_weight_in_single_batch; + let max_messages_size_in_single_batch = self.max_messages_size_in_single_batch; + let lane_source_client = self.lane_source_client.clone(); + let lane_target_client = self.lane_target_client.clone(); + + // select nonces from nonces, available for delivery + let selected_nonces = match self.strategy.available_source_queue_indices(race_state) { + Some(available_source_queue_indices) => { + let source_queue = self.strategy.source_queue(); + let reference = RelayMessagesBatchReference { + max_messages_in_this_batch: max_nonces, + max_messages_weight_in_single_batch, + max_messages_size_in_single_batch, + lane_source_client: lane_source_client.clone(), + lane_target_client: lane_target_client.clone(), + best_target_nonce, + nonces_queue: source_queue.clone(), + nonces_queue_range: available_source_queue_indices, + metrics: self.metrics_msg.clone(), + }; + + MessageRaceLimits::decide(reference).await + }, + None => { + // we still may need to submit delivery transaction with zero messages to + // unblock the lane. But it'll only be accepted if the lane is blocked + // (i.e. when `unrewarded_limit_reached` is `true`) + None + }, + }; + + // check if we need unblocking transaction and we may submit it + #[allow(clippy::reversed_empty_ranges)] + let selected_nonces = match selected_nonces { + Some(selected_nonces) => selected_nonces, + None if unrewarded_limit_reached && outbound_state_proof_required => 1..=0, + _ => return None, + }; + + let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces); + Some(( + selected_nonces, + MessageProofParameters { outbound_state_proof_required, dispatch_weight }, + )) + } + + /// Returns lastest confirmed message at source chain, given source block. + fn latest_confirmed_nonce_at_source(&self, at: &SourceHeaderIdOf

) -> Option { + self.latest_confirmed_nonces_at_source + .iter() + .take_while(|(id, _)| id.0 <= at.0) + .last() + .map(|(_, nonce)| *nonce) + } + /// Returns total weight of all undelivered messages. fn dispatch_weight_for_range(&self, range: &RangeInclusive) -> Weight { self.strategy @@ -322,9 +500,10 @@ where self.strategy.is_empty() } - fn required_source_header_at_target, TargetHeaderIdOf

>>( + async fn required_source_header_at_target< + RS: RaceState, TargetHeaderIdOf

>, + >( &self, - current_best: &SourceHeaderIdOf

, race_state: RS, ) -> Option> { // we have already submitted something - let's wait until it is mined @@ -332,32 +511,41 @@ where return None } - let has_nonces_to_deliver = !self.strategy.is_empty(); - let header_required_for_messages_delivery = - self.strategy.required_source_header_at_target(current_best, race_state); - let header_required_for_reward_confirmations_delivery = self - .latest_confirmed_nonces_at_source - .back() - .filter(|(id, nonce)| *nonce != 0 && id.0 > current_best.0) - .map(|(id, _)| id.clone()); - match ( - has_nonces_to_deliver, - header_required_for_messages_delivery, - header_required_for_reward_confirmations_delivery, - ) { - // if we need to delver messages and proof-of-delivery-confirmations, then we need to - // select the most recent header to avoid extra roundtrips - (true, Some(id1), Some(id2)) => Some(if id1.0 > id2.0 { id1 } else { id2 }), - // if we only need to deliver messages - fine, let's require some source header - // - // if we need new header for proof-of-delivery-confirmations - let's also ask for that. - // Even though it may require additional header, we'll be sure that we won't block the - // lane (sometimes we can't deliver messages without proof-of-delivery-confirmations) - (true, a, b) => a.or(b), - // we never submit delivery transaction without messages, so if `has_nonces_to_deliver` - // if `false`, we don't need any source headers at target - (false, _, _) => None, + // if we can deliver something using current race state, go on + let selected_nonces = self.select_race_action(race_state.clone()).await; + if selected_nonces.is_some() { + return None + } + + // check if we may deliver some messages if we'll relay require source header + // to target first + let maybe_source_header_for_delivery = + self.strategy.source_queue().back().map(|(id, _)| id.clone()); + if self + .can_submit_transaction_with( + race_state.clone(), + maybe_source_header_for_delivery.clone(), + ) + .await + { + return maybe_source_header_for_delivery + } + + // ok, we can't delivery anything even if we relay some source blocks first. But maybe + // the lane is blocked and we need to submit unblock transaction? + let maybe_source_header_for_reward_confirmation = + self.latest_confirmed_nonces_at_source.back().map(|(id, _)| id.clone()); + if self + .can_submit_transaction_with( + race_state.clone(), + maybe_source_header_for_reward_confirmation.clone(), + ) + .await + { + return maybe_source_header_for_reward_confirmation } + + None } fn best_at_source(&self) -> Option { @@ -436,128 +624,7 @@ where &self, race_state: RS, ) -> Option<(RangeInclusive, Self::ProofParameters)> { - let best_target_nonce = self.strategy.best_at_target()?; - let best_finalized_source_header_id_at_best_target = - race_state.best_finalized_source_header_id_at_best_target()?; - let latest_confirmed_nonce_at_source = self - .latest_confirmed_nonces_at_source - .iter() - .take_while(|(id, _)| id.0 <= best_finalized_source_header_id_at_best_target.0) - .last() - .map(|(_, nonce)| *nonce) - .unwrap_or(best_target_nonce); - let target_nonces = self.target_nonces.as_ref()?; - - // There's additional condition in the message delivery race: target would reject messages - // if there are too much unconfirmed messages at the inbound lane. - - // The receiving race is responsible to deliver confirmations back to the source chain. So - // if there's a lot of unconfirmed messages, let's wait until it'll be able to do its job. - let latest_received_nonce_at_target = target_nonces.latest_nonce; - let confirmations_missing = - latest_received_nonce_at_target.checked_sub(latest_confirmed_nonce_at_source); - match confirmations_missing { - Some(confirmations_missing) - if confirmations_missing >= self.max_unconfirmed_nonces_at_target => - { - log::debug!( - target: "bridge", - "Cannot deliver any more messages from {} to {}. Too many unconfirmed nonces \ - at target: target.latest_received={:?}, source.latest_confirmed={:?}, max={:?}", - MessageDeliveryRace::

::source_name(), - MessageDeliveryRace::

::target_name(), - latest_received_nonce_at_target, - latest_confirmed_nonce_at_source, - self.max_unconfirmed_nonces_at_target, - ); - - return None - }, - _ => (), - } - - // Ok - we may have new nonces to deliver. But target may still reject new messages, because - // we haven't notified it that (some) messages have been confirmed. So we may want to - // include updated `source.latest_confirmed` in the proof. - // - // Important note: we're including outbound state lane proof whenever there are unconfirmed - // nonces on the target chain. Other strategy is to include it only if it's absolutely - // necessary. - let latest_confirmed_nonce_at_target = target_nonces.nonces_data.confirmed_nonce; - let outbound_state_proof_required = - latest_confirmed_nonce_at_target < latest_confirmed_nonce_at_source; - - // The target node would also reject messages if there are too many entries in the - // "unrewarded relayers" set. If we are unable to prove new rewards to the target node, then - // we should wait for confirmations race. - let unrewarded_relayer_entries_limit_reached = - target_nonces.nonces_data.unrewarded_relayers.unrewarded_relayer_entries >= - self.max_unrewarded_relayer_entries_at_target; - if unrewarded_relayer_entries_limit_reached { - // so there are already too many unrewarded relayer entries in the set - // - // => check if we can prove enough rewards. If not, we should wait for more rewards to - // be paid - let number_of_rewards_being_proved = - latest_confirmed_nonce_at_source.saturating_sub(latest_confirmed_nonce_at_target); - let enough_rewards_being_proved = number_of_rewards_being_proved >= - target_nonces.nonces_data.unrewarded_relayers.messages_in_oldest_entry; - if !enough_rewards_being_proved { - return None - } - } - - // If we're here, then the confirmations race did its job && sending side now knows that - // messages have been delivered. Now let's select nonces that we want to deliver. - // - // We may deliver at most: - // - // max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target - - // latest_confirmed_nonce_at_target) - // - // messages in the batch. But since we're including outbound state proof in the batch, then - // it may be increased to: - // - // max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target - - // latest_confirmed_nonce_at_source) - let future_confirmed_nonce_at_target = if outbound_state_proof_required { - latest_confirmed_nonce_at_source - } else { - latest_confirmed_nonce_at_target - }; - let max_nonces = latest_received_nonce_at_target - .checked_sub(future_confirmed_nonce_at_target) - .and_then(|diff| self.max_unconfirmed_nonces_at_target.checked_sub(diff)) - .unwrap_or_default(); - let max_nonces = std::cmp::min(max_nonces, self.max_messages_in_single_batch); - let max_messages_weight_in_single_batch = self.max_messages_weight_in_single_batch; - let max_messages_size_in_single_batch = self.max_messages_size_in_single_batch; - let lane_source_client = self.lane_source_client.clone(); - let lane_target_client = self.lane_target_client.clone(); - - let available_source_queue_indices = - self.strategy.available_source_queue_indices(race_state)?; - let source_queue = self.strategy.source_queue(); - - let reference = RelayMessagesBatchReference { - max_messages_in_this_batch: max_nonces, - max_messages_weight_in_single_batch, - max_messages_size_in_single_batch, - lane_source_client: lane_source_client.clone(), - lane_target_client: lane_target_client.clone(), - best_target_nonce, - nonces_queue: source_queue.clone(), - nonces_queue_range: available_source_queue_indices, - metrics: self.metrics_msg.clone(), - }; - - let selected_nonces = MessageRaceLimits::decide(reference).await?; - let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces); - - Some(( - selected_nonces, - MessageProofParameters { outbound_state_proof_required, dispatch_weight }, - )) + self.select_race_action(race_state).await } } @@ -980,31 +1047,41 @@ mod tests { ); // nothing needs to be delivered now and we don't need any new headers assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None); - assert_eq!(strategy.required_source_header_at_target(&header_id(1), state.clone()), None); - - // now let's generate two more nonces [24; 25] at the soruce; - strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 19, 0)); - // - // - so now we'll need to relay source block#2 to be able to accept messages [24; 25]. - assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None); - assert_eq!( - strategy.required_source_header_at_target(&header_id(1), state.clone()), - Some(header_id(2)) - ); + assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None); - // let's relay source block#2 + // block#2 is generated state.best_finalized_source_header_id_at_source = Some(header_id(2)); state.best_finalized_source_header_id_at_best_target = Some(header_id(2)); state.best_target_header_id = Some(header_id(2)); state.best_finalized_target_header_id = Some(header_id(2)); + // now let's generate two more nonces [24; 25] at the source; + strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 19, 0)); + // + // we don't need to relay more headers to target, because messages [20; 23] have + // not confirmed to source yet + assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None); + assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None); + + // let's relay source block#3 + state.best_finalized_source_header_id_at_source = Some(header_id(3)); + state.best_finalized_source_header_id_at_best_target = Some(header_id(3)); + state.best_target_header_id = Some(header_id(3)); + state.best_finalized_target_header_id = Some(header_id(3)); + // and ask strategy again => still nothing to deliver, because parallel confirmations // race need to be pushed further assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None); - assert_eq!(strategy.required_source_header_at_target(&header_id(2), state.clone()), None); + assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None); + + // let's relay source block#3 + state.best_finalized_source_header_id_at_source = Some(header_id(4)); + state.best_finalized_source_header_id_at_best_target = Some(header_id(4)); + state.best_target_header_id = Some(header_id(4)); + state.best_finalized_target_header_id = Some(header_id(4)); // let's confirm messages [20; 23] - strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 23, 0)); + strategy.source_nonces_updated(header_id(4), source_nonces(24..=25, 23, 0)); // and ask strategy again => now we have everything required to deliver remaining // [24; 25] nonces and proof of [20; 23] confirmation @@ -1012,7 +1089,7 @@ mod tests { strategy.select_nonces_to_deliver(state.clone()).await, Some(((24..=25), proof_parameters(true, 2))), ); - assert_eq!(strategy.required_source_header_at_target(&header_id(2), state), None); + assert_eq!(strategy.required_source_header_at_target(state).await, None); } #[async_std::test] @@ -1041,9 +1118,9 @@ mod tests { ); } - #[test] + #[async_std::test] #[allow(clippy::reversed_empty_ranges)] - fn no_source_headers_required_at_target_if_lanes_are_empty() { + async fn no_source_headers_required_at_target_if_lanes_are_empty() { let (state, _) = prepare_strategy(); let mut strategy = TestStrategy { max_unrewarded_relayer_entries_at_target: 4, @@ -1073,7 +1150,7 @@ mod tests { strategy.latest_confirmed_nonces_at_source, VecDeque::from([(source_header_id, 0)]) ); - assert_eq!(strategy.required_source_header_at_target(&source_header_id, state), None); + assert_eq!(strategy.required_source_header_at_target(state).await, None); } #[async_std::test] @@ -1159,4 +1236,138 @@ mod tests { )), ); } + + #[async_std::test] + #[allow(clippy::reversed_empty_ranges)] + async fn delivery_race_is_able_to_unblock_lane() { + // step 1: messages 20..=23 are delivered from source to target at target block 2 + fn at_target_block_2_deliver_messages( + strategy: &mut TestStrategy, + state: &mut TestRaceState, + occupied_relayer_slots: MessageNonce, + occupied_message_slots: MessageNonce, + ) { + let nonces_at_target = TargetClientNonces { + latest_nonce: 23, + nonces_data: DeliveryRaceTargetNoncesData { + confirmed_nonce: 19, + unrewarded_relayers: UnrewardedRelayersState { + unrewarded_relayer_entries: occupied_relayer_slots, + total_messages: occupied_message_slots, + ..Default::default() + }, + }, + }; + + state.best_target_header_id = Some(header_id(2)); + state.best_finalized_target_header_id = Some(header_id(2)); + + strategy.best_target_nonces_updated(nonces_at_target.clone(), state); + strategy.finalized_target_nonces_updated(nonces_at_target, state); + } + + // step 2: delivery of messages 20..=23 is confirmed to the source node at source block 2 + fn at_source_block_2_deliver_confirmations( + strategy: &mut TestStrategy, + state: &mut TestRaceState, + ) { + state.best_finalized_source_header_id_at_source = Some(header_id(2)); + + strategy.source_nonces_updated( + header_id(2), + SourceClientNonces { new_nonces: Default::default(), confirmed_nonce: Some(23) }, + ); + } + + // step 3: finalize source block 2 at target block 3 and select nonces to deliver + async fn at_target_block_3_select_nonces_to_deliver( + strategy: &TestStrategy, + mut state: TestRaceState, + ) -> Option<(RangeInclusive, MessageProofParameters)> { + state.best_finalized_source_header_id_at_best_target = Some(header_id(2)); + state.best_target_header_id = Some(header_id(3)); + state.best_finalized_target_header_id = Some(header_id(3)); + + strategy.select_nonces_to_deliver(state).await + } + + let max_unrewarded_relayer_entries_at_target = 4; + let max_unconfirmed_nonces_at_target = 4; + let expected_rewards_proof = Some(( + 1..=0, + MessageProofParameters { + outbound_state_proof_required: true, + dispatch_weight: Weight::zero(), + }, + )); + + // TODO: also fix + test `required_source_header_at_target` + + // when lane is NOT blocked + let (mut state, mut strategy) = prepare_strategy(); + at_target_block_2_deliver_messages( + &mut strategy, + &mut state, + max_unrewarded_relayer_entries_at_target - 1, + max_unconfirmed_nonces_at_target - 1, + ); + at_source_block_2_deliver_confirmations(&mut strategy, &mut state); + assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None); + assert_eq!(at_target_block_3_select_nonces_to_deliver(&strategy, state).await, None); + + // when lane is blocked by no-relayer-slots in unrewarded relayers vector + let (mut state, mut strategy) = prepare_strategy(); + at_target_block_2_deliver_messages( + &mut strategy, + &mut state, + max_unrewarded_relayer_entries_at_target, + max_unconfirmed_nonces_at_target - 1, + ); + at_source_block_2_deliver_confirmations(&mut strategy, &mut state); + assert_eq!( + strategy.required_source_header_at_target(state.clone()).await, + Some(header_id(2)) + ); + assert_eq!( + at_target_block_3_select_nonces_to_deliver(&strategy, state).await, + expected_rewards_proof + ); + + // when lane is blocked by no-message-slots in unrewarded relayers vector + let (mut state, mut strategy) = prepare_strategy(); + at_target_block_2_deliver_messages( + &mut strategy, + &mut state, + max_unrewarded_relayer_entries_at_target - 1, + max_unconfirmed_nonces_at_target, + ); + at_source_block_2_deliver_confirmations(&mut strategy, &mut state); + assert_eq!( + strategy.required_source_header_at_target(state.clone()).await, + Some(header_id(2)) + ); + assert_eq!( + at_target_block_3_select_nonces_to_deliver(&strategy, state).await, + expected_rewards_proof + ); + + // when lane is blocked by no-message-slots and no-message-slots in unrewarded relayers + // vector + let (mut state, mut strategy) = prepare_strategy(); + at_target_block_2_deliver_messages( + &mut strategy, + &mut state, + max_unrewarded_relayer_entries_at_target - 1, + max_unconfirmed_nonces_at_target, + ); + at_source_block_2_deliver_confirmations(&mut strategy, &mut state); + assert_eq!( + strategy.required_source_header_at_target(state.clone()).await, + Some(header_id(2)) + ); + assert_eq!( + at_target_block_3_select_nonces_to_deliver(&strategy, state).await, + expected_rewards_proof + ); + } } diff --git a/bridges/relays/messages/src/message_race_loop.rs b/bridges/relays/messages/src/message_race_loop.rs index 7e3f84dd5d119..be7d5b4675659 100644 --- a/bridges/relays/messages/src/message_race_loop.rs +++ b/bridges/relays/messages/src/message_race_loop.rs @@ -41,14 +41,14 @@ use std::{ /// One of races within lane. pub trait MessageRace { /// Header id of the race source. - type SourceHeaderId: Debug + Clone + PartialEq + Send; + type SourceHeaderId: Debug + Clone + PartialEq + Send + Sync; /// Header id of the race source. - type TargetHeaderId: Debug + Clone + PartialEq + Send; + type TargetHeaderId: Debug + Clone + PartialEq + Send + Sync; /// Message nonce used in the race. type MessageNonce: Debug + Clone; /// Proof that is generated and delivered in this race. - type Proof: Debug + Clone + Send; + type Proof: Debug + Clone + Send + Sync; /// Name of the race source. fn source_name() -> String; @@ -175,9 +175,8 @@ pub trait RaceStrategy: Debug { /// Should return true if nothing has to be synced. fn is_empty(&self) -> bool; /// Return id of source header that is required to be on target to continue synchronization. - fn required_source_header_at_target>( + async fn required_source_header_at_target>( &self, - current_best: &SourceHeaderId, race_state: RS, ) -> Option; /// Return the best nonce at source node. @@ -218,7 +217,11 @@ pub trait RaceStrategy: Debug { } /// State of the race. -pub trait RaceState: Send { +pub trait RaceState: Clone + Send + Sync { + /// Set best finalized source header id at the best block on the target + /// client (at the `best_finalized_source_header_id_at_best_target`). + fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId); + /// Best finalized source header id at the source client. fn best_finalized_source_header_id_at_source(&self) -> Option; /// Best finalized source header id at the best block on the target @@ -281,11 +284,15 @@ impl Default impl RaceState for RaceStateImpl where - SourceHeaderId: Clone + Send, - TargetHeaderId: Clone + Send, - Proof: Clone + Send, - BatchTx: Clone + Send, + SourceHeaderId: Clone + Send + Sync, + TargetHeaderId: Clone + Send + Sync, + Proof: Clone + Send + Sync, + BatchTx: Clone + Send + Sync, { + fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId) { + self.best_finalized_source_header_id_at_best_target = Some(id); + } + fn best_finalized_source_header_id_at_source(&self) -> Option { self.best_finalized_source_header_id_at_source.clone() } @@ -430,10 +437,9 @@ pub async fn run, TC: TargetClient

>( ).fail_if_connection_error(FailedClient::Source)?; // ask for more headers if we have nonces to deliver and required headers are missing - source_required_header = race_state - .best_finalized_source_header_id_at_best_target - .as_ref() - .and_then(|best| strategy.required_source_header_at_target(best, race_state.clone())); + source_required_header = strategy + .required_source_header_at_target(race_state.clone()) + .await; }, nonces = target_best_nonces => { target_best_nonces_required = false; diff --git a/bridges/relays/messages/src/message_race_strategy.rs b/bridges/relays/messages/src/message_race_strategy.rs index e6016448c95cc..718c296391c5c 100644 --- a/bridges/relays/messages/src/message_race_strategy.rs +++ b/bridges/relays/messages/src/message_race_strategy.rs @@ -205,16 +205,16 @@ impl< self.source_queue.is_empty() } - fn required_source_header_at_target< + async fn required_source_header_at_target< RS: RaceState< HeaderId, HeaderId, >, >( &self, - current_best: &HeaderId, - _race_state: RS, + race_state: RS, ) -> Option> { + let current_best = race_state.best_finalized_source_header_id_at_best_target()?; self.source_queue .back() .and_then(|(h, _)| if h.0 > current_best.0 { Some(h.clone()) } else { None })