Skip to content

Commit

Permalink
Support for incremental packet clearing (#2167)
Browse files Browse the repository at this point in the history
* Added docs for unreceived_acknowledgements and unreceived_packets

* Flipped order of sentences

* First iteration for #2087; packet-recv works, packet-ack left

* Incremental processing of packet-ack relaying

* Added consensus_state_height_bounded with optimistic path

* Nits

* Simplified progress tracking

* Comment cleanup

* fmt and better logs

* Document the alternative path in consensus_state_height_bounded

* renamed method to make it more explicit h/t SeanC

* better var name

* Added workaround to tonic::code inconsistencies.

Ref:
#1971 (comment)

* refactored the packet query methods

* Refactored CLIs for incremental relaying

* Experiment

* Fix for packet send for non-zero delay

* Using app status instead of network status

* fmt

* Undoing unnecessary chenges on foreign_client.rs

* Removed outdated comments

* Apply suggestions from code review

Co-authored-by: Anca Zamfir <[email protected]>

* Small refactor in cli.rs

* Address review comments

* Remove duplicate code

* Undo one-chain change

* Fix documentation

* Changelog

Co-authored-by: Anca Zamfir <[email protected]>
Co-authored-by: Anca Zamfir <[email protected]>
  • Loading branch information
3 people authored May 13, 2022
1 parent 3aed2d9 commit 39606d2
Show file tree
Hide file tree
Showing 21 changed files with 475 additions and 328 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Added support for incremental processing of packet clearing commands.
([#2087](https://github.com/informalsystems/ibc-rs/issues/2087))
2 changes: 1 addition & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ enabled = true

# Parametrize the periodic packet clearing feature.
# Interval (in number of blocks) at which pending packets
# should be eagerly cleared. A value of '0' will disable
# should be periodically cleared. A value of '0' will disable
# periodic packet clearing. [Default: 100]
clear_interval = 100

Expand Down
8 changes: 7 additions & 1 deletion modules/src/core/ics02_client/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl core::fmt::Display for CreateClient {
}

/// UpdateClient event signals a recent update of an on-chain client (IBC Client).
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)]
pub struct UpdateClient {
pub common: Attributes,
pub header: Option<AnyHeader>,
Expand Down Expand Up @@ -289,6 +289,12 @@ impl core::fmt::Display for UpdateClient {
}
}

impl core::fmt::Debug for UpdateClient {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{}", self.common)
}
}

/// ClientMisbehaviour event signals the update of an on-chain client (IBC Client) with evidence of
/// misbehaviour.
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
Expand Down
28 changes: 25 additions & 3 deletions modules/src/core/ics04_channel/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ impl_try_from_raw_obj_for_event!(
CloseConfirm
);

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)]
pub struct SendPacket {
pub height: Height,
pub packet: Packet,
Expand Down Expand Up @@ -811,6 +811,12 @@ impl core::fmt::Display for SendPacket {
}
}

impl core::fmt::Debug for SendPacket {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "SendPacket - h:{}, {}", self.height, self.packet)
}
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
pub struct ReceivePacket {
pub height: Height,
Expand Down Expand Up @@ -850,7 +856,7 @@ impl core::fmt::Display for ReceivePacket {
}
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)]
pub struct WriteAcknowledgement {
pub height: Height,
pub packet: Packet,
Expand Down Expand Up @@ -915,7 +921,17 @@ impl core::fmt::Display for WriteAcknowledgement {
}
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
impl core::fmt::Debug for WriteAcknowledgement {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(
f,
"WriteAcknowledgement - h:{}, {}",
self.height, self.packet
)
}
}

#[derive(Deserialize, Serialize, Clone, PartialEq, Eq)]
pub struct AcknowledgePacket {
pub height: Height,
pub packet: Packet,
Expand Down Expand Up @@ -960,6 +976,12 @@ impl core::fmt::Display for AcknowledgePacket {
}
}

impl core::fmt::Debug for AcknowledgePacket {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "AcknowledgePacket - h:{}, {}", self.height, self.packet)
}
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
pub struct TimeoutPacket {
pub height: Height,
Expand Down
8 changes: 8 additions & 0 deletions proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ pub mod cosmos {
..Default::default()
})
}

pub fn latest_limited(limit: u64) -> Option<PageRequest> {
Some(PageRequest {
limit,
reverse: true,
..Default::default()
})
}
}
}
pub mod reflection {
Expand Down
12 changes: 4 additions & 8 deletions relayer-cli/src/commands/clear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,15 @@ impl Runnable for ClearPacketsCmd {
// Schedule RecvPacket messages for pending packets in both directions.
// This may produce pending acks which will be processed in the next phase.
run_and_collect_events(&mut ev_list, || {
fwd_link.build_and_send_recv_packet_messages()
fwd_link.relay_recv_packet_and_timeout_messages()
});
run_and_collect_events(&mut ev_list, || {
rev_link.build_and_send_recv_packet_messages()
rev_link.relay_recv_packet_and_timeout_messages()
});

// Schedule AckPacket messages in both directions.
run_and_collect_events(&mut ev_list, || {
fwd_link.build_and_send_ack_packet_messages()
});
run_and_collect_events(&mut ev_list, || {
rev_link.build_and_send_ack_packet_messages()
});
run_and_collect_events(&mut ev_list, || fwd_link.relay_ack_packet_messages());
run_and_collect_events(&mut ev_list, || rev_link.relay_ack_packet_messages());

Output::success(ev_list).exit()
}
Expand Down
3 changes: 2 additions & 1 deletion relayer-cli/src/commands/query/packet/unreceived_acks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ impl QueryUnreceivedAcknowledgementCmd {
self.chain_id, chan_conn_cli.channel,
);

unreceived_acknowledgements(&chains.src, &chains.dst, &chan_conn_cli.channel)
unreceived_acknowledgements(&chains.src, &chains.dst, &(&chan_conn_cli.channel).into())
.map(|(sns, _)| sns)
.map_err(Error::supervisor)
}
}
Expand Down
3 changes: 2 additions & 1 deletion relayer-cli/src/commands/query/packet/unreceived_packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ impl QueryUnreceivedPacketsCmd {
self.chain_id, chan_conn_cli.channel
);

unreceived_packets(&chains.src, &chains.dst, &chan_conn_cli.channel)
unreceived_packets(&chains.src, &chains.dst, &(&chan_conn_cli.channel).into())
.map_err(Error::supervisor)
.map(|(seq, _)| seq)
}
}

Expand Down
7 changes: 3 additions & 4 deletions relayer-cli/src/commands/tx/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Runnable for TxRawPacketRecvCmd {
};

let res: Result<Vec<IbcEvent>, Error> = link
.build_and_send_recv_packet_messages()
.relay_recv_packet_and_timeout_messages()
.map_err(Error::link);

match res {
Expand Down Expand Up @@ -87,9 +87,8 @@ impl Runnable for TxRawPacketAckCmd {
Err(e) => Output::error(format!("{}", e)).exit(),
};

let res: Result<Vec<IbcEvent>, Error> = link
.build_and_send_ack_packet_messages()
.map_err(Error::link);
let res: Result<Vec<IbcEvent>, Error> =
link.relay_ack_packet_messages().map_err(Error::link);

match res {
Ok(ev) => Output::success(ev).exit(),
Expand Down
1 change: 0 additions & 1 deletion relayer-cli/src/conclude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ impl fmt::Display for Result {
/// which typically signals a success (UNIX process return code `0`) or an error (code `1`). An
/// optional `result` can be added to an output.
///
#[derive(Debug)]
pub struct Output {
/// The return status
pub status: Status,
Expand Down
106 changes: 48 additions & 58 deletions relayer/src/chain/counterparty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use tracing::{error, trace};

use crate::channel::ChannelError;
use crate::path::PathIdentifiers;
use crate::supervisor::Error;
use ibc::{
core::{
Expand Down Expand Up @@ -399,52 +400,45 @@ pub fn unreceived_acknowledgements_sequences(
.map_err(Error::relayer)
}

/// This method returns a vector of sequence numbers for all the packets
/// which the counterparty chain _sent_ on the given channel and which the
/// (target) chain did not yet _receive_.
/// Expects an [`IdentifiedChannelEnd`] plus a pair of
/// [`ChainHandle`]s representing the chains at the two ends of this
/// channel, called a (target) chain and a counterparty chain.
/// Given a channel, this method returns:
/// - The sequences of the packets _sent_ on the counterparty chain and not _received_ by
/// the (target) chain.
/// - The counterparty height at which the query was made.
///
/// Expects an [`IdentifiedChannelEnd`] and a pair of [`ChainHandle`]s representing the chains
/// at the two ends of this channel, called a (target) chain and a counterparty chain.
///
/// ### Implementation details
/// This method involves two separate queries:
///
/// 1. It performs a [`QueryPacketCommitmentsRequest`] on the counterparty chain.
/// This query returns the vector of all sequence numbers for those packets that have
/// commitments written on-chain in the counterparty chain's state.
/// This query returns the sequences for the packets with stored
/// commitments in the counterparty chain's state, and the height at which the query was made
///
/// This step relies on [`commitments_on_chain`], see that method for more details.
///
/// 2. It performs a [`QueryUnreceivedPacketsRequest`] on the (target) chain.
/// Given the sequence numbers of packet commitments on the counterparty (query #1),
/// this query returns a subset of these sequence numbers, all of which the target
/// Given the sequences of packet commitments on the counterparty (query #1),
/// this query returns the sequences of the packets which the target
/// chain has not yet _received_.
///
/// This step relies on [`unreceived_packets_sequences`], see that method for more details.
///
pub fn unreceived_packets(
chain: &impl ChainHandle,
counterparty_chain: &impl ChainHandle,
channel: &IdentifiedChannelEnd,
) -> Result<Vec<u64>, Error> {
let counterparty = channel.channel_end.counterparty();
let counterparty_channel_id = counterparty
.channel_id
.as_ref()
.ok_or_else(Error::missing_counterparty_channel_id)?;

let (commit_sequences, _) = commitments_on_chain(
path: &PathIdentifiers,
) -> Result<(Vec<u64>, Height), Error> {
let (commit_sequences, h) = commitments_on_chain(
counterparty_chain,
&counterparty.port_id,
counterparty_channel_id,
&path.counterparty_port_id,
&path.counterparty_channel_id,
)?;

unreceived_packets_sequences(
chain,
&channel.port_id,
&channel.channel_id,
commit_sequences,
)
let packet_seq_nrs =
unreceived_packets_sequences(chain, &path.port_id, &path.channel_id, commit_sequences)?;

Ok((packet_seq_nrs, h))
}

pub fn acknowledgements_on_chain(
Expand Down Expand Up @@ -474,61 +468,57 @@ pub fn acknowledgements_on_chain(
Ok((sequences, height))
}

/// This method returns a vector of sequence numbers for those packets
/// which the counterparty chain _received_ on the given channel and which the
/// (target) chain did not yet _acknowledge_.
/// Expects an [`IdentifiedChannelEnd`] plus a pair of
/// [`ChainHandle`]s representing the chains at the two ends of this
/// channel, called a (target) chain and a counterparty chain.
/// Given a channel, this method returns:
/// - The sequences of all packets _received on the counterparty chain and not _acknowledged_ by
/// the (target) chain.
/// - The counterparty height at which the query was made.
///
/// Expects an [`IdentifiedChannelEnd`] and a pair of [`ChainHandle`]s representing the chains
/// at the two ends of this channel, called a (target) chain and a counterparty chain.
///
/// ### Implementation details
/// This method involves two separate queries:
///
/// 1. It performs a [`QueryPacketCommitmentsRequest`] on the target chain.
/// This query returns the vector of all sequence numbers for those packets that have
/// commitments written on-chain in the (target) chain's state.
/// This query returns the sequences for the packets with stored
/// commitments in the target chain's state, and the height at which the query was made
///
/// This step relies on [`commitments_on_chain`], see that method for more details.
///
/// 2. It performs a [`QueryPacketAcknowledgementsRequest`] on the counterparty chain.
/// Given the sequence numbers of packet commitments on the target chain (from step #1),
/// this query returns a subset of these sequence numbers, all of which the counterparty
/// chain has _received_ (i.e., it stores acknowledgments for these sequence numbers).
/// Given the sequences of packet commitments on the target chain (query #1),
/// this query returns the sequences of the packets which the counterparty chain has
/// _acknowledged_.
///
/// This step relies on [`packet_acknowledgements`], see that method for more details.
///
/// 3. It performs a [`QueryUnreceivedAcksRequest`] on the target chain.
/// Given the sequence numbers of packet acknowledgements on the counterparty (step #3),
/// this query fetches the subset among these acknowledgements which have not been
/// relayed yet to the target chain.
/// Given the sequences of packet acknowledgements on the counterparty (step #2),
/// this query fetches the subset for which acknowledgements have not been
/// received by the target chain.
/// This step relies on [`unreceived_acknowledgements_sequences`].
pub fn unreceived_acknowledgements(
chain: &impl ChainHandle,
counterparty_chain: &impl ChainHandle,
channel: &IdentifiedChannelEnd,
) -> Result<Vec<u64>, Error> {
let counterparty = channel.channel_end.counterparty();
let counterparty_channel_id = counterparty
.channel_id
.as_ref()
.ok_or_else(Error::missing_counterparty_channel_id)?;

let (commitments_on_src, _) =
commitments_on_chain(chain, &channel.port_id, &channel.channel_id)?;
path: &PathIdentifiers,
) -> Result<(Vec<u64>, Height), Error> {
let (commitments_on_src, _) = commitments_on_chain(chain, &path.port_id, &path.channel_id)?;

let (acks_on_counterparty, _) = packet_acknowledgements(
let (acks_on_counterparty, src_response_height) = packet_acknowledgements(
counterparty_chain,
&counterparty.port_id,
counterparty_channel_id,
&path.counterparty_port_id,
&path.counterparty_channel_id,
commitments_on_src,
)?;

unreceived_acknowledgements_sequences(
let sns = unreceived_acknowledgements_sequences(
chain,
&channel.port_id,
&channel.channel_id,
&path.port_id,
&path.channel_id,
acks_on_counterparty,
)
)?;

Ok((sns, src_response_height))
}

/// A structure to display pending packet commitment IDs
Expand Down
1 change: 1 addition & 0 deletions relayer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod light_client;
pub mod link;
pub mod macros;
pub mod object;
pub mod path;
pub mod registry;
pub mod rest;
pub mod sdk_error;
Expand Down
2 changes: 2 additions & 0 deletions relayer/src/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ pub mod cli;
pub mod error;
pub mod operational_data;

mod packet_events;
mod pending;
mod relay_path;
mod relay_sender;
mod relay_summary;
mod tx_hashes;

use tx_hashes::TxHashes;

// Re-export the telemetries summary
Expand Down
Loading

0 comments on commit 39606d2

Please sign in to comment.