diff --git a/relays/messages/src/message_race_delivery.rs b/relays/messages/src/message_race_delivery.rs index d25a2413b0db..b50e6c0841ea 100644 --- a/relays/messages/src/message_race_delivery.rs +++ b/relays/messages/src/message_race_delivery.rs @@ -255,7 +255,7 @@ struct MessageDeliveryStrategy { /// Latest confirmed nonces at the source client + the header id where we have first met this /// nonce. latest_confirmed_nonces_at_source: VecDeque<(SourceHeaderIdOf

, MessageNonce)>, - /// Target nonces from the source client. + /// Target nonces available at the **best** block of the target chain. target_nonces: Option>, /// Basic delivery strategy. strategy: MessageDeliveryStrategyBase

, @@ -387,13 +387,11 @@ where race_state: &mut RaceState, TargetHeaderIdOf

, P::MessagesProof>, ) { // best target nonces must always be ge than finalized target nonces - let mut target_nonces = self.target_nonces.take().unwrap_or_else(|| nonces.clone()); - target_nonces.nonces_data = nonces.nonces_data.clone(); - target_nonces.latest_nonce = std::cmp::max(target_nonces.latest_nonce, nonces.latest_nonce); - self.target_nonces = Some(target_nonces); + let latest_nonce = nonces.latest_nonce; + self.target_nonces = Some(nonces); self.strategy.best_target_nonces_updated( - TargetClientNonces { latest_nonce: nonces.latest_nonce, nonces_data: () }, + TargetClientNonces { latest_nonce, nonces_data: () }, race_state, ) } @@ -432,6 +430,7 @@ where &self, race_state: RaceState, TargetHeaderIdOf

, P::MessagesProof>, ) -> Option<(RangeInclusive, Self::ProofParameters)> { + let best_target_nonce = self.strategy.best_at_target()?; let best_finalized_source_header_id_at_best_target = race_state.best_finalized_source_header_id_at_best_target.clone()?; let latest_confirmed_nonce_at_source = self @@ -439,7 +438,8 @@ where .iter() .take_while(|(id, _)| id.0 <= best_finalized_source_header_id_at_best_target.0) .last() - .map(|(_, nonce)| *nonce)?; + .map(|(_, nonce)| *nonce) + .unwrap_or(best_target_nonce); let target_nonces = self.target_nonces.as_ref()?; // There's additional condition in the message delivery race: target would reject messages @@ -529,8 +529,8 @@ where let lane_source_client = self.lane_source_client.clone(); let lane_target_client = self.lane_target_client.clone(); - let maximal_source_queue_index = - self.strategy.maximal_available_source_queue_index(race_state)?; + let available_source_queue_indices = + self.strategy.available_source_queue_indices(race_state)?; let source_queue = self.strategy.source_queue(); let reference = RelayMessagesBatchReference { @@ -539,15 +539,13 @@ where max_messages_size_in_single_batch, lane_source_client: lane_source_client.clone(), lane_target_client: lane_target_client.clone(), + best_target_nonce, nonces_queue: source_queue.clone(), - nonces_queue_range: 0..maximal_source_queue_index + 1, + nonces_queue_range: available_source_queue_indices, metrics: self.metrics_msg.clone(), }; - let range_end = MessageRaceLimits::decide(reference).await?; - - let range_begin = source_queue[0].1.begin(); - let selected_nonces = range_begin..=range_end; + let selected_nonces = MessageRaceLimits::decide(reference).await?; let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces); Some(( @@ -1057,4 +1055,88 @@ mod tests { ); assert_eq!(strategy.required_source_header_at_target(&source_header_id), None); } + + #[async_std::test] + async fn previous_nonces_are_selected_if_reorg_happens_at_target_chain() { + // this is the copy of the similar test in the `mesage_race_strategy.rs`, but it also tests + // that the `MessageDeliveryStrategy` acts properly in the similar scenario + + // tune parameters to allow 5 nonces per delivery transaction + let (mut state, mut strategy) = prepare_strategy(); + strategy.max_unrewarded_relayer_entries_at_target = 5; + strategy.max_unconfirmed_nonces_at_target = 5; + strategy.max_messages_in_single_batch = 5; + strategy.max_messages_weight_in_single_batch = Weight::from_parts(5, 0); + strategy.max_messages_size_in_single_batch = 5; + + // in this state we have 4 available nonces for delivery + assert_eq!( + strategy.select_nonces_to_deliver(state.clone()).await, + Some(( + 20..=23, + MessageProofParameters { + outbound_state_proof_required: false, + dispatch_weight: Weight::from_parts(4, 0), + } + )), + ); + + // let's say we have submitted 20..=23 + state.nonces_submitted = Some(20..=23); + + // then new nonce 24 appear at the source block 2 + let new_nonce_24 = vec![( + 24, + MessageDetails { dispatch_weight: Weight::from_parts(1, 0), size: 0, reward: 0 }, + )] + .into_iter() + .collect(); + let source_header_2 = header_id(2); + state.best_finalized_source_header_id_at_source = Some(source_header_2); + strategy.source_nonces_updated( + source_header_2, + SourceClientNonces { new_nonces: new_nonce_24, confirmed_nonce: None }, + ); + // and nonce 23 appear at the best block of the target node (best finalized still has 0 + // nonces) + let target_nonces_data = DeliveryRaceTargetNoncesData { + confirmed_nonce: 19, + unrewarded_relayers: UnrewardedRelayersState::default(), + }; + let target_header_2 = header_id(2); + state.best_target_header_id = Some(target_header_2); + strategy.best_target_nonces_updated( + TargetClientNonces { latest_nonce: 23, nonces_data: target_nonces_data.clone() }, + &mut state, + ); + + // then best target header is retracted + strategy.best_target_nonces_updated( + TargetClientNonces { latest_nonce: 19, nonces_data: target_nonces_data.clone() }, + &mut state, + ); + + // ... and some fork with 19 delivered nonces is finalized + let target_header_2_fork = header_id(2_1); + state.best_finalized_source_header_id_at_source = Some(source_header_2); + state.best_finalized_source_header_id_at_best_target = Some(source_header_2); + state.best_target_header_id = Some(target_header_2_fork); + state.best_finalized_target_header_id = Some(target_header_2_fork); + strategy.finalized_target_nonces_updated( + TargetClientNonces { latest_nonce: 19, nonces_data: target_nonces_data.clone() }, + &mut state, + ); + + // now we have to select nonces 20..=23 for delivery again + assert_eq!( + strategy.select_nonces_to_deliver(state.clone()).await, + Some(( + 20..=24, + MessageProofParameters { + outbound_state_proof_required: false, + dispatch_weight: Weight::from_parts(5, 0), + } + )), + ); + } } diff --git a/relays/messages/src/message_race_limits.rs b/relays/messages/src/message_race_limits.rs index a28d9ba63da9..873bb6aad042 100644 --- a/relays/messages/src/message_race_limits.rs +++ b/relays/messages/src/message_race_limits.rs @@ -17,7 +17,7 @@ //! enforcement strategy use num_traits::Zero; -use std::ops::Range; +use std::ops::RangeInclusive; use bp_messages::{MessageNonce, Weight}; @@ -76,14 +76,17 @@ pub struct RelayMessagesBatchReference< pub lane_target_client: TargetClient, /// Metrics reference. pub metrics: Option, + /// Best available nonce at the **best** target block. We do not want to deliver nonces + /// less than this nonce, even though the block may be retracted. + pub best_target_nonce: MessageNonce, /// Source queue. pub nonces_queue: SourceRangesQueue< P::SourceHeaderHash, P::SourceHeaderNumber, MessageDetailsMap, >, - /// Source queue range - pub nonces_queue_range: Range, + /// Range of indices within the `nonces_queue` that are available for selection. + pub nonces_queue_range: RangeInclusive, } /// Limits of the message race transactions. @@ -97,14 +100,16 @@ impl MessageRaceLimits { TargetClient: MessageLaneTargetClient

, >( reference: RelayMessagesBatchReference, - ) -> Option { + ) -> Option> { let mut hard_selected_count = 0; let mut selected_weight = Weight::zero(); let mut selected_count: MessageNonce = 0; - let hard_selected_begin_nonce = - reference.nonces_queue[reference.nonces_queue_range.start].1.begin(); + let hard_selected_begin_nonce = std::cmp::max( + reference.best_target_nonce + 1, + reference.nonces_queue[*reference.nonces_queue_range.start()].1.begin(), + ); // relay reference let mut relay_reference = RelayReference { @@ -129,6 +134,7 @@ impl MessageRaceLimits { .nonces_queue .range(reference.nonces_queue_range.clone()) .flat_map(|(_, ready_nonces)| ready_nonces.iter()) + .filter(|(nonce, _)| **nonce >= hard_selected_begin_nonce) .enumerate(); for (index, (nonce, details)) in all_ready_nonces { relay_reference.index = index; @@ -192,7 +198,7 @@ impl MessageRaceLimits { if hard_selected_count != 0 { let selected_max_nonce = hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1; - Some(selected_max_nonce) + Some(hard_selected_begin_nonce..=selected_max_nonce) } else { None } diff --git a/relays/messages/src/message_race_strategy.rs b/relays/messages/src/message_race_strategy.rs index 9a53a487d94f..479ffe513290 100644 --- a/relays/messages/src/message_race_strategy.rs +++ b/relays/messages/src/message_race_strategy.rs @@ -41,8 +41,11 @@ pub struct BasicStrategy< Proof, > { /// All queued nonces. + /// + /// The queue may contain already delivered nonces. We only remove entries from this + /// queue after corresponding nonces are finalized by the target chain. source_queue: SourceRangesQueue, - /// The best nonce known to target node (at its best block). `None` if it has not been received + /// The best nonce known to target node at its best block. `None` if it has not been received /// yet. best_target_nonce: Option, /// Unused generic types dump. @@ -93,21 +96,26 @@ impl< &mut self.source_queue } - /// Returns index of the latest source queue entry, that may be delivered to the target node. + /// Returns indices of source queue entries, which may be delivered to the target node. + /// + /// The function may skip some nonces from the queue front if nonces from this entry are + /// already available at the **best** target block. After this block is finalized, the entry + /// will be removed from the queue. /// - /// Returns `None` if no entries may be delivered. All entries before and including the - /// `Some(_)` index are guaranteed to be witnessed at source blocks that are known to be - /// finalized at the target node. - pub fn maximal_available_source_queue_index( + /// All entries before and including the range end index, are guaranteed to be witnessed + /// at source blocks that are known to be finalized at the target node. + /// + /// Returns `None` if no entries may be delivered. + pub fn available_source_queue_indices( &self, race_state: RaceState< HeaderId, HeaderId, Proof, >, - ) -> Option { + ) -> Option> { // if we do not know best nonce at target node, we can't select anything - let _ = self.best_target_nonce?; + let best_target_nonce = self.best_target_nonce?; // if we have already selected nonces that we want to submit, do nothing if race_state.nonces_to_submit.is_some() { @@ -119,6 +127,15 @@ impl< return None } + // find first entry that may be delivered to the target node + let begin_index = self + .source_queue + .iter() + .enumerate() + .skip_while(|(_, (_, nonces))| nonces.end() <= best_target_nonce) + .map(|(index, _)| index) + .next()?; + // 1) we want to deliver all nonces, starting from `target_nonce + 1` // 2) we can't deliver new nonce until header, that has emitted this nonce, is finalized // by target client @@ -127,12 +144,16 @@ impl< // => let's first select range of entries inside deque that are already finalized at // the target client and pass this range to the selector let best_header_at_target = race_state.best_finalized_source_header_id_at_best_target?; - self.source_queue + let end_index = self + .source_queue .iter() .enumerate() + .skip(begin_index) .take_while(|(_, (queued_at, _))| queued_at.0 <= best_header_at_target.0) .map(|(index, _)| index) - .last() + .last()?; + + Some(begin_index..=end_index) } /// Remove all nonces that are less than or equal to given nonce from the source queue. @@ -237,22 +258,6 @@ impl< ) { let nonce = nonces.latest_nonce; - if let Some(best_target_nonce) = self.best_target_nonce { - if nonce < best_target_nonce { - return - } - } - - while let Some(true) = self.source_queue.front().map(|(_, range)| range.begin() <= nonce) { - let maybe_subrange = self.source_queue.pop_front().and_then(|(at_block, range)| { - range.greater_than(nonce).map(|subrange| (at_block, subrange)) - }); - if let Some((at_block, subrange)) = maybe_subrange { - self.source_queue.push_front((at_block, subrange)); - break - } - } - let need_to_select_new_nonces = race_state .nonces_to_submit .as_ref() @@ -271,8 +276,7 @@ impl< race_state.nonces_submitted = None; } - self.best_target_nonce = - Some(std::cmp::max(self.best_target_nonce.unwrap_or(nonces.latest_nonce), nonce)); + self.best_target_nonce = Some(nonce); } fn finalized_target_nonces_updated( @@ -284,7 +288,7 @@ impl< Proof, >, ) { - self.remove_le_nonces_from_source_queue(nonces.latest_nonce); // TODO: does it means that we'll try to submit old nonces in next tx??? + self.remove_le_nonces_from_source_queue(nonces.latest_nonce); self.best_target_nonce = Some(std::cmp::max( self.best_target_nonce.unwrap_or(nonces.latest_nonce), nonces.latest_nonce, @@ -299,9 +303,12 @@ impl< Proof, >, ) -> Option<(RangeInclusive, Self::ProofParameters)> { - let maximal_source_queue_index = self.maximal_available_source_queue_index(race_state)?; - let range_begin = self.source_queue[0].1.begin(); - let range_end = self.source_queue[maximal_source_queue_index].1.end(); + let available_indices = self.available_source_queue_indices(race_state)?; + let range_begin = std::cmp::max( + self.best_target_nonce? + 1, + self.source_queue[*available_indices.start()].1.begin(), + ); + let range_end = self.source_queue[*available_indices.end()].1.end(); Some((range_begin..=range_end, ())) } } @@ -351,7 +358,7 @@ mod tests { strategy.source_nonces_updated(header_id(1), source_nonces(1..=5)); assert_eq!(strategy.best_at_source(), None); strategy.best_target_nonces_updated(target_nonces(10), &mut Default::default()); - assert_eq!(strategy.source_queue, vec![]); + assert_eq!(strategy.source_queue, vec![(header_id(1), 1..=5)]); assert_eq!(strategy.best_at_source(), Some(10)); } @@ -372,16 +379,6 @@ mod tests { assert_eq!(strategy.source_queue, vec![(header_id(1), 1..=5)]); } - #[test] - fn target_nonce_is_never_lower_than_latest_known_target_nonce() { - let mut strategy = BasicStrategy::::new(); - assert_eq!(strategy.best_target_nonce, None); - strategy.best_target_nonces_updated(target_nonces(10), &mut Default::default()); - assert_eq!(strategy.best_target_nonce, Some(10)); - strategy.best_target_nonces_updated(target_nonces(5), &mut Default::default()); - assert_eq!(strategy.best_target_nonce, Some(10)); - } - #[test] fn updated_target_nonce_removes_queued_entries() { let mut strategy = BasicStrategy::::new(); @@ -389,9 +386,9 @@ mod tests { strategy.source_nonces_updated(header_id(2), source_nonces(6..=10)); strategy.source_nonces_updated(header_id(3), source_nonces(11..=15)); strategy.source_nonces_updated(header_id(4), source_nonces(16..=20)); - strategy.best_target_nonces_updated(target_nonces(15), &mut Default::default()); + strategy.finalized_target_nonces_updated(target_nonces(15), &mut Default::default()); assert_eq!(strategy.source_queue, vec![(header_id(4), 16..=20)]); - strategy.best_target_nonces_updated(target_nonces(17), &mut Default::default()); + strategy.finalized_target_nonces_updated(target_nonces(17), &mut Default::default()); assert_eq!(strategy.source_queue, vec![(header_id(4), 18..=20)]); } @@ -459,7 +456,7 @@ mod tests { } #[test] - fn maximal_available_source_queue_index_works() { + fn available_source_queue_indices_works() { let mut state = RaceState::<_, _, TestMessagesProof>::default(); let mut strategy = BasicStrategy::::new(); strategy.best_target_nonces_updated(target_nonces(0), &mut state); @@ -468,19 +465,19 @@ mod tests { strategy.source_nonces_updated(header_id(3), source_nonces(7..=9)); state.best_finalized_source_header_id_at_best_target = Some(header_id(0)); - assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), None); + assert_eq!(strategy.available_source_queue_indices(state.clone()), None); state.best_finalized_source_header_id_at_best_target = Some(header_id(1)); - assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), Some(0)); + assert_eq!(strategy.available_source_queue_indices(state.clone()), Some(0..=0)); state.best_finalized_source_header_id_at_best_target = Some(header_id(2)); - assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), Some(1)); + assert_eq!(strategy.available_source_queue_indices(state.clone()), Some(0..=1)); state.best_finalized_source_header_id_at_best_target = Some(header_id(3)); - assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), Some(2)); + assert_eq!(strategy.available_source_queue_indices(state.clone()), Some(0..=2)); state.best_finalized_source_header_id_at_best_target = Some(header_id(4)); - assert_eq!(strategy.maximal_available_source_queue_index(state), Some(2)); + assert_eq!(strategy.available_source_queue_indices(state), Some(0..=2)); } #[test] @@ -514,4 +511,67 @@ mod tests { strategy.remove_le_nonces_from_source_queue(100); assert_eq!(source_queue_nonces(&strategy.source_queue), Vec::::new(),); } + + #[async_std::test] + async fn previous_nonces_are_selected_if_reorg_happens_at_target_chain() { + let source_header_1 = header_id(1); + let target_header_1 = header_id(1); + + // we start in perfec sync state - all headers are synced and finalized on both ends + let mut state = RaceState::<_, _, TestMessagesProof> { + best_finalized_source_header_id_at_source: Some(source_header_1), + best_finalized_source_header_id_at_best_target: Some(source_header_1), + best_target_header_id: Some(target_header_1), + best_finalized_target_header_id: Some(target_header_1), + nonces_to_submit: None, + nonces_submitted: None, + }; + + // in this state we have 1 available nonce for delivery + let mut strategy = BasicStrategy:: { + source_queue: vec![(header_id(1), 1..=1)].into_iter().collect(), + best_target_nonce: Some(0), + _phantom: PhantomData, + }; + assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, Some((1..=1, ())),); + + // let's say we have submitted 1..=1 + state.nonces_submitted = Some(1..=1); + + // then new nonce 2 appear at the source block 2 + let source_header_2 = header_id(2); + state.best_finalized_source_header_id_at_source = Some(source_header_2); + strategy.source_nonces_updated( + source_header_2, + SourceClientNonces { new_nonces: 2..=2, confirmed_nonce: None }, + ); + // and nonce 1 appear at the best block of the target node (best finalized still has 0 + // nonces) + let target_header_2 = header_id(2); + state.best_target_header_id = Some(target_header_2); + strategy.best_target_nonces_updated( + TargetClientNonces { latest_nonce: 1, nonces_data: () }, + &mut state, + ); + + // then best target header is retracted + strategy.best_target_nonces_updated( + TargetClientNonces { latest_nonce: 0, nonces_data: () }, + &mut state, + ); + + // ... and some fork with zero delivered nonces is finalized + let target_header_2_fork = header_id(2_1); + state.best_finalized_source_header_id_at_source = Some(source_header_2); + state.best_finalized_source_header_id_at_best_target = Some(source_header_2); + state.best_target_header_id = Some(target_header_2_fork); + state.best_finalized_target_header_id = Some(target_header_2_fork); + strategy.finalized_target_nonces_updated( + TargetClientNonces { latest_nonce: 0, nonces_data: () }, + &mut state, + ); + + // now we have to select nonce 1 for delivery again + assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, Some((1..=2, ())),); + } }