Skip to content

Commit

Permalink
Use GAT to define generic MPSC channel context (#2935)
Browse files Browse the repository at this point in the history
* Use explicit injection for runtime error

* Allow Src/Dst chain Error type to be different from Relay::Error

Only remaining fix needed is the same error constraint in BatchContext,
which can be fixed in #2816.

* Initial draft for GAT channels

* Add type alias for batch channel types

* Add new HasBatchChannelTypes trait

* Add HasBatchSender/Receiver traits

* Refactor SendMessagetoBatchWorker

* Refactor batch worker

* Remove old batch traits and types

* Separate OfaRuntimeWrapper impls into separate files

* Implement channel traits for OfaRuntimeWrapper

* Abstract batch components working in relayer framework

* Fix all compile errors

* Remove unused HasBatchChannelTypes trait

* Refactor batch type aliases to be based on Chain and Error

* Reorganize batch module definitions

* Separate Channel and ChannelOnce types

* Put Channel and ChannelOnce into separate modules

* Fix clippy

* Remove From<TokioError> which is no longer needed

* Resolve merge conflicts in soares/gat-batch-context

* Move OfaRuntimeWrapper to types module

* Adding documentation for GAT channel traits

* Fix merge errors
  • Loading branch information
soareschen authored Dec 15, 2022
1 parent 6c9ea21 commit 869278d
Show file tree
Hide file tree
Showing 64 changed files with 956 additions and 554 deletions.
4 changes: 2 additions & 2 deletions crates/relayer-cosmos/src/base/impls/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use ibc_relayer::path::PathIdentifiers;
use ibc_relayer_framework::base::one_for_all::traits::chain::{
OfaBaseChain, OfaChainTypes, OfaIbcChain,
};
use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;
use ibc_relayer_runtime::tokio::context::TokioRuntimeContext;
use ibc_relayer_runtime::tokio::error::Error as TokioError;
use ibc_relayer_types::clients::ics07_tendermint::consensus_state::ConsensusState;
Expand Down Expand Up @@ -106,7 +106,7 @@ where
}
}

fn runtime(&self) -> &OfaRuntimeContext<TokioRuntimeContext> {
fn runtime(&self) -> &OfaRuntimeWrapper<TokioRuntimeContext> {
&self.runtime
}

Expand Down
4 changes: 2 additions & 2 deletions crates/relayer-cosmos/src/base/impls/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use ibc_relayer_framework::base::one_for_all::traits::chain::OfaChainTypes;
use ibc_relayer_framework::base::one_for_all::traits::relay::{OfaBaseRelay, OfaRelayTypes};
use ibc_relayer_framework::base::one_for_all::types::chain::OfaChainWrapper;

use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;

use crate::base::error::Error;

Expand Down Expand Up @@ -99,7 +99,7 @@ where
&packet.timeout_timestamp
}

fn runtime(&self) -> &OfaRuntimeContext<TokioRuntimeContext> {
fn runtime(&self) -> &OfaRuntimeWrapper<TokioRuntimeContext> {
&self.runtime
}

Expand Down
6 changes: 3 additions & 3 deletions crates/relayer-cosmos/src/base/types/chain.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use alloc::sync::Arc;
use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;
use ibc_relayer_runtime::tokio::context::TokioRuntimeContext;

#[derive(Clone)]
pub struct CosmosChainWrapper<Chain> {
pub chain: Arc<Chain>,
pub runtime: OfaRuntimeContext<TokioRuntimeContext>,
pub runtime: OfaRuntimeWrapper<TokioRuntimeContext>,
}

impl<Chain> CosmosChainWrapper<Chain> {
pub fn new(chain: Arc<Chain>, runtime: TokioRuntimeContext) -> Self {
Self {
chain,
runtime: OfaRuntimeContext::new(runtime),
runtime: OfaRuntimeWrapper::new(runtime),
}
}
}
6 changes: 3 additions & 3 deletions crates/relayer-cosmos/src/base/types/relay.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloc::sync::Arc;
use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::chain::OfaChainWrapper;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;
use ibc_relayer_runtime::tokio::context::TokioRuntimeContext;

use crate::base::traits::relay::CosmosRelay;
Expand All @@ -11,7 +11,7 @@ pub struct CosmosRelayWrapper<Relay: CosmosRelay> {
pub relay: Arc<Relay>,
pub src_chain: OfaChainWrapper<CosmosChainWrapper<Relay::SrcChain>>,
pub dst_chain: OfaChainWrapper<CosmosChainWrapper<Relay::DstChain>>,
pub runtime: OfaRuntimeContext<TokioRuntimeContext>,
pub runtime: OfaRuntimeWrapper<TokioRuntimeContext>,
}

impl<Relay: CosmosRelay> CosmosRelayWrapper<Relay> {
Expand All @@ -26,7 +26,7 @@ impl<Relay: CosmosRelay> CosmosRelayWrapper<Relay> {
runtime.clone(),
));

let runtime = OfaRuntimeContext::new(runtime);
let runtime = OfaRuntimeWrapper::new(runtime);

Self {
relay,
Expand Down
12 changes: 0 additions & 12 deletions crates/relayer-cosmos/src/contexts/full/chain.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use ibc_relayer::chain::cosmos::types::config::TxConfig;
use ibc_relayer::chain::handle::ChainHandle;
use ibc_relayer::keyring::Secp256k1KeyPair;
use ibc_relayer_framework::full::batch::context::new_batch_channel;
use ibc_relayer_framework::full::one_for_all::presets::full::FullPreset;
use ibc_relayer_framework::full::one_for_all::traits::batch::OfaBatchWrapper;
use ibc_relayer_framework::full::one_for_all::traits::telemetry::OfaTelemetryWrapper;
use ibc_relayer_types::signer::Signer;

use crate::base::traits::chain::CosmosChain;
use crate::base::types::chain::CosmosChainWrapper;
use crate::full::traits::chain::CosmosFullChain;
use crate::full::types::batch::CosmosBatchChannel;
use crate::full::types::telemetry::CosmosTelemetry;

#[derive(Clone)]
Expand All @@ -19,7 +15,6 @@ pub struct FullCosmosChainContext<Handle: ChainHandle> {
pub signer: Signer,
pub tx_config: TxConfig,
pub key_entry: Secp256k1KeyPair,
pub batch_channel: CosmosBatchChannel,
pub telemetry: OfaTelemetryWrapper<CosmosTelemetry>,
}

Expand All @@ -31,14 +26,11 @@ impl<Handle: ChainHandle> FullCosmosChainContext<Handle> {
key_entry: Secp256k1KeyPair,
telemetry: CosmosTelemetry,
) -> Self {
let batch_channel = new_batch_channel::<OfaBatchWrapper<CosmosChainWrapper<Self>>>();

let chain = Self {
handle,
signer,
tx_config,
key_entry,
batch_channel,
telemetry: OfaTelemetryWrapper::new(telemetry),
};

Expand Down Expand Up @@ -75,10 +67,6 @@ impl<Handle> CosmosFullChain for FullCosmosChainContext<Handle>
where
Handle: ChainHandle,
{
fn batch_channel(&self) -> &CosmosBatchChannel {
&self.batch_channel
}

fn telemetry(&self) -> &OfaTelemetryWrapper<CosmosTelemetry> {
&self.telemetry
}
Expand Down
44 changes: 40 additions & 4 deletions crates/relayer-cosmos/src/contexts/full/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ use ibc_relayer::config::filter::PacketFilter;
use ibc_relayer::foreign_client::ForeignClient;
use ibc_relayer_framework::base::one_for_all::types::relay::OfaRelayWrapper;
use ibc_relayer_framework::base::relay::traits::target::{DestinationTarget, SourceTarget};
use ibc_relayer_framework::full::batch::config::BatchConfig;
use ibc_relayer_framework::full::batch::spawn::{
BatchMessageWorkerSpawner, CanSpawnBatchMessageWorker,
};
use ibc_relayer_framework::full::batch::impls::spawn::BatchMessageWorkerSpawner;
use ibc_relayer_framework::full::batch::traits::spawn::CanSpawnBatchMessageWorker;
use ibc_relayer_framework::full::batch::types::config::BatchConfig;
use ibc_relayer_runtime::tokio::context::TokioRuntimeContext;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::Mutex;

use crate::base::traits::chain::CosmosChain;
use crate::base::traits::relay::CosmosRelay;
use crate::base::types::relay::CosmosRelayWrapper;
use crate::contexts::full::chain::FullCosmosChainContext;
use crate::full::traits::relay::CosmosFullRelay;
use crate::full::types::batch::{CosmosBatchReceiver, CosmosBatchSender};

#[derive(Clone)]
pub struct FullCosmosRelay<SrcChain, DstChain>
Expand All @@ -33,6 +35,10 @@ where
<DstChain as CosmosChain>::ChainHandle,
>,
pub packet_filter: PacketFilter,
pub src_chain_message_batch_sender: CosmosBatchSender,
pub src_chain_message_batch_receiver: CosmosBatchReceiver,
pub dst_chain_message_batch_sender: CosmosBatchSender,
pub dst_chain_message_batch_receiver: CosmosBatchReceiver,
}

impl<SrcChain, DstChain> FullCosmosRelay<SrcChain, DstChain>
Expand All @@ -53,12 +59,26 @@ where
>,
packet_filter: PacketFilter,
) -> Self {
let (src_chain_message_batch_sender, src_chain_message_batch_receiver) =
unbounded_channel();

let (dst_chain_message_batch_sender, dst_chain_message_batch_receiver) =
unbounded_channel();

let relay = Self {
src_chain,
dst_chain,
src_to_dst_client,
dst_to_src_client,
packet_filter,
src_chain_message_batch_sender,
src_chain_message_batch_receiver: Arc::new(Mutex::new(
src_chain_message_batch_receiver,
)),
dst_chain_message_batch_sender,
dst_chain_message_batch_receiver: Arc::new(Mutex::new(
dst_chain_message_batch_receiver,
)),
};

relay
Expand Down Expand Up @@ -152,4 +172,20 @@ where
fn packet_filter(&self) -> &PacketFilter {
&self.packet_filter
}

fn src_chain_message_batch_sender(&self) -> &CosmosBatchSender {
&self.src_chain_message_batch_sender
}

fn src_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver {
&self.src_chain_message_batch_receiver
}

fn dst_chain_message_batch_sender(&self) -> &CosmosBatchSender {
&self.dst_chain_message_batch_sender
}

fn dst_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver {
&self.dst_chain_message_batch_receiver
}
}
8 changes: 0 additions & 8 deletions crates/relayer-cosmos/src/full/impls/chain.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
use ibc_relayer_framework::full::one_for_all::traits::chain::OfaFullChain;
use ibc_relayer_framework::full::one_for_all::traits::telemetry::OfaTelemetryWrapper;
use ibc_relayer_runtime::tokio::context::TokioRuntimeContext;

use crate::base::types::chain::CosmosChainWrapper;
use crate::full::traits::chain::CosmosFullChain;
use crate::full::types::batch::CosmosBatchChannel;
use crate::full::types::telemetry::CosmosTelemetry;

impl<Chain> OfaFullChain for CosmosChainWrapper<Chain>
where
Chain: CosmosFullChain,
{
type BatchContext = TokioRuntimeContext;

type Telemetry = CosmosTelemetry;

fn batch_channel(&self) -> &CosmosBatchChannel {
self.chain.batch_channel()
}

fn telemetry(&self) -> &OfaTelemetryWrapper<CosmosTelemetry> {
self.chain.telemetry()
}
Expand Down
17 changes: 17 additions & 0 deletions crates/relayer-cosmos/src/full/impls/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use ibc_relayer_framework::full::one_for_all::traits::relay::OfaFullRelay;

use crate::base::types::relay::CosmosRelayWrapper;
use crate::full::traits::relay::CosmosFullRelay;
use crate::full::types::batch::{CosmosBatchReceiver, CosmosBatchSender};

#[async_trait]
impl<Relay> OfaFullRelay for CosmosRelayWrapper<Relay>
Expand All @@ -23,4 +24,20 @@ where
.packet_filter()
.is_allowed(&packet.source_port, &packet.source_channel))
}

fn src_chain_message_batch_sender(&self) -> &CosmosBatchSender {
self.relay.src_chain_message_batch_sender()
}

fn src_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver {
self.relay.src_chain_message_batch_receiver()
}

fn dst_chain_message_batch_sender(&self) -> &CosmosBatchSender {
self.relay.dst_chain_message_batch_sender()
}

fn dst_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver {
self.relay.dst_chain_message_batch_receiver()
}
}
3 changes: 0 additions & 3 deletions crates/relayer-cosmos/src/full/traits/chain.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use ibc_relayer_framework::full::one_for_all::traits::telemetry::OfaTelemetryWrapper;

use crate::base::traits::chain::CosmosChain;
use crate::full::types::batch::CosmosBatchChannel;
use crate::full::types::telemetry::CosmosTelemetry;

pub trait CosmosFullChain: CosmosChain {
fn batch_channel(&self) -> &CosmosBatchChannel;

fn telemetry(&self) -> &OfaTelemetryWrapper<CosmosTelemetry>;
}
9 changes: 9 additions & 0 deletions crates/relayer-cosmos/src/full/traits/relay.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
use ibc_relayer::config::filter::PacketFilter;

use crate::base::traits::relay::CosmosRelay;
use crate::full::types::batch::{CosmosBatchReceiver, CosmosBatchSender};

pub trait CosmosFullRelay: CosmosRelay {
fn packet_filter(&self) -> &PacketFilter;

fn src_chain_message_batch_sender(&self) -> &CosmosBatchSender;

fn src_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver;

fn dst_chain_message_batch_sender(&self) -> &CosmosBatchSender;

fn dst_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver;
}
15 changes: 7 additions & 8 deletions crates/relayer-cosmos/src/full/types/batch.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use ibc_relayer_framework::full::batch::context::BatchChannel;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tendermint::abci::Event;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::Sender as SenderOnce;
use tokio::sync::Mutex;

use crate::base::error::Error;
use crate::base::types::message::CosmosIbcMessage;

pub type CosmosBatchPayload = (
Vec<CosmosIbcMessage>,
oneshot::Sender<Result<Vec<Vec<Event>>, Error>>,
SenderOnce<Result<Vec<Vec<Event>>, Error>>,
);

pub type CosmosBatchSender = mpsc::UnboundedSender<CosmosBatchPayload>;
pub type CosmosBatchSender = UnboundedSender<CosmosBatchPayload>;

pub type CosmosBatchReceiver = Arc<Mutex<mpsc::UnboundedReceiver<CosmosBatchPayload>>>;

pub type CosmosBatchChannel = BatchChannel<CosmosBatchSender, CosmosBatchReceiver>;
pub type CosmosBatchReceiver = Arc<Mutex<UnboundedReceiver<CosmosBatchPayload>>>;
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use ibc_relayer_framework::base::one_for_all::traits::chain::{
};

use ibc_relayer_framework::base::one_for_all::presets::min::MinimalPreset;
use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;
use ibc_relayer_runtime::tokio::error::Error as TokioError;

impl OfaChainTypes for MockChainContext {
Expand Down Expand Up @@ -66,7 +66,7 @@ impl OfaChainTypes for MockChainContext {

#[async_trait]
impl OfaBaseChain for MockChainContext {
fn runtime(&self) -> &OfaRuntimeContext<MockRuntimeContext> {
fn runtime(&self) -> &OfaRuntimeWrapper<MockRuntimeContext> {
self.runtime()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::relayer_mock::contexts::relay::MockRelayContext;
use ibc_relayer_framework::base::one_for_all::presets::min::MinimalPreset;
use ibc_relayer_framework::base::one_for_all::traits::chain::OfaChainTypes;
use ibc_relayer_framework::base::one_for_all::traits::relay::{OfaBaseRelay, OfaRelayTypes};
use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::chain::OfaChainWrapper;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;
use ibc_relayer_runtime::tokio::error::Error as TokioError;

impl OfaRelayTypes for MockRelayContext {
Expand Down Expand Up @@ -82,7 +82,7 @@ impl OfaBaseRelay for MockRelayContext {
&packet.timeout_timestamp
}

fn runtime(&self) -> &OfaRuntimeContext<Self::Runtime> {
fn runtime(&self) -> &OfaRuntimeWrapper<Self::Runtime> {
&self.runtime
}

Expand Down
Loading

0 comments on commit 869278d

Please sign in to comment.