Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use GAT to define generic MPSC channel context #2935

Merged
merged 32 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2a26642
Use explicit injection for runtime error
soareschen Dec 8, 2022
6966741
Allow Src/Dst chain Error type to be different from Relay::Error
soareschen Dec 8, 2022
592c9bc
Initial draft for GAT channels
soareschen Dec 9, 2022
4bce1a9
Add type alias for batch channel types
soareschen Dec 9, 2022
4cdc9d3
Add new HasBatchChannelTypes trait
soareschen Dec 9, 2022
f4cb0c0
Add HasBatchSender/Receiver traits
soareschen Dec 9, 2022
a47fb8f
Refactor SendMessagetoBatchWorker
soareschen Dec 9, 2022
439337f
Refactor batch worker
soareschen Dec 9, 2022
7db906c
Remove old batch traits and types
soareschen Dec 12, 2022
36b50f0
Separate OfaRuntimeWrapper impls into separate files
soareschen Dec 12, 2022
efcb265
Implement channel traits for OfaRuntimeWrapper
soareschen Dec 12, 2022
79d6761
Abstract batch components working in relayer framework
soareschen Dec 12, 2022
dc48501
Fix all compile errors
soareschen Dec 12, 2022
01ceb3d
Merge branch 'soares/relayer-next' into soares/gat-batch-context
soareschen Dec 12, 2022
4f883ed
Merge branch 'soares/relayer-next' into soares/explicit-error-injection
soareschen Dec 12, 2022
b5e088b
Remove unused HasBatchChannelTypes trait
soareschen Dec 12, 2022
5520597
Refactor batch type aliases to be based on Chain and Error
soareschen Dec 12, 2022
ccc4255
Reorganize batch module definitions
soareschen Dec 12, 2022
ac52de6
Separate Channel and ChannelOnce types
soareschen Dec 12, 2022
2bbcfb2
Put Channel and ChannelOnce into separate modules
soareschen Dec 12, 2022
eccdc19
Fix clippy
soareschen Dec 12, 2022
30928a4
Remove From<TokioError> which is no longer needed
soareschen Dec 12, 2022
ea9ce93
Merge branch 'soares/explicit-error-injection' into soares/gat-batch-…
soareschen Dec 12, 2022
d9f8aba
Resolve merge conflicts in soares/gat-batch-context
soareschen Dec 12, 2022
69a5583
Merge branch 'soares/gat-batch-context-merge' into soares/gat-batch-c…
soareschen Dec 12, 2022
51fcdfb
Merge branch 'soares/relayer-next' into soares/gat-batch-context
soareschen Dec 15, 2022
85a774d
Move OfaRuntimeWrapper to types module
soareschen Dec 15, 2022
961b8a0
Adding documentation for GAT channel traits
soareschen Dec 15, 2022
3b5ec99
Merge branch 'soares/relayer-next' into soares/gat-batch-context
soareschen Dec 15, 2022
b6dd698
Merge branch 'soares/relayer-next' into soares/gat-batch-context
soareschen Dec 15, 2022
762e2f5
Merge branch 'soares/relayer-next' into soares/gat-batch-context
soareschen Dec 15, 2022
cc6fb86
Fix merge errors
soareschen Dec 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/relayer-cosmos/src/base/impls/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use ibc_relayer::path::PathIdentifiers;
use ibc_relayer_framework::base::one_for_all::traits::chain::{
OfaBaseChain, OfaChainTypes, OfaIbcChain,
};
use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;
use ibc_relayer_runtime::tokio::context::TokioRuntimeContext;
use ibc_relayer_runtime::tokio::error::Error as TokioError;
use ibc_relayer_types::clients::ics07_tendermint::consensus_state::ConsensusState;
Expand Down Expand Up @@ -106,7 +106,7 @@ where
}
}

fn runtime(&self) -> &OfaRuntimeContext<TokioRuntimeContext> {
fn runtime(&self) -> &OfaRuntimeWrapper<TokioRuntimeContext> {
&self.runtime
}

Expand Down
4 changes: 2 additions & 2 deletions crates/relayer-cosmos/src/base/impls/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use ibc_relayer_framework::base::one_for_all::traits::chain::OfaChainTypes;
use ibc_relayer_framework::base::one_for_all::traits::relay::{OfaBaseRelay, OfaRelayTypes};
use ibc_relayer_framework::base::one_for_all::types::chain::OfaChainWrapper;

use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;

use crate::base::error::Error;

Expand Down Expand Up @@ -99,7 +99,7 @@ where
&packet.timeout_timestamp
}

fn runtime(&self) -> &OfaRuntimeContext<TokioRuntimeContext> {
fn runtime(&self) -> &OfaRuntimeWrapper<TokioRuntimeContext> {
&self.runtime
}

Expand Down
6 changes: 3 additions & 3 deletions crates/relayer-cosmos/src/base/types/chain.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use alloc::sync::Arc;
use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;
use ibc_relayer_runtime::tokio::context::TokioRuntimeContext;

#[derive(Clone)]
pub struct CosmosChainWrapper<Chain> {
pub chain: Arc<Chain>,
pub runtime: OfaRuntimeContext<TokioRuntimeContext>,
pub runtime: OfaRuntimeWrapper<TokioRuntimeContext>,
}

impl<Chain> CosmosChainWrapper<Chain> {
pub fn new(chain: Arc<Chain>, runtime: TokioRuntimeContext) -> Self {
Self {
chain,
runtime: OfaRuntimeContext::new(runtime),
runtime: OfaRuntimeWrapper::new(runtime),
}
}
}
6 changes: 3 additions & 3 deletions crates/relayer-cosmos/src/base/types/relay.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloc::sync::Arc;
use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::chain::OfaChainWrapper;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;
use ibc_relayer_runtime::tokio::context::TokioRuntimeContext;

use crate::base::traits::relay::CosmosRelay;
Expand All @@ -11,7 +11,7 @@ pub struct CosmosRelayWrapper<Relay: CosmosRelay> {
pub relay: Arc<Relay>,
pub src_chain: OfaChainWrapper<CosmosChainWrapper<Relay::SrcChain>>,
pub dst_chain: OfaChainWrapper<CosmosChainWrapper<Relay::DstChain>>,
pub runtime: OfaRuntimeContext<TokioRuntimeContext>,
pub runtime: OfaRuntimeWrapper<TokioRuntimeContext>,
}

impl<Relay: CosmosRelay> CosmosRelayWrapper<Relay> {
Expand All @@ -26,7 +26,7 @@ impl<Relay: CosmosRelay> CosmosRelayWrapper<Relay> {
runtime.clone(),
));

let runtime = OfaRuntimeContext::new(runtime);
let runtime = OfaRuntimeWrapper::new(runtime);

Self {
relay,
Expand Down
12 changes: 0 additions & 12 deletions crates/relayer-cosmos/src/contexts/full/chain.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use ibc_relayer::chain::cosmos::types::config::TxConfig;
use ibc_relayer::chain::handle::ChainHandle;
use ibc_relayer::keyring::Secp256k1KeyPair;
use ibc_relayer_framework::full::batch::context::new_batch_channel;
use ibc_relayer_framework::full::one_for_all::presets::full::FullPreset;
use ibc_relayer_framework::full::one_for_all::traits::batch::OfaBatchWrapper;
use ibc_relayer_framework::full::one_for_all::traits::telemetry::OfaTelemetryWrapper;
use ibc_relayer_types::signer::Signer;

use crate::base::traits::chain::CosmosChain;
use crate::base::types::chain::CosmosChainWrapper;
use crate::full::traits::chain::CosmosFullChain;
use crate::full::types::batch::CosmosBatchChannel;
use crate::full::types::telemetry::CosmosTelemetry;

#[derive(Clone)]
Expand All @@ -19,7 +15,6 @@ pub struct FullCosmosChainContext<Handle: ChainHandle> {
pub signer: Signer,
pub tx_config: TxConfig,
pub key_entry: Secp256k1KeyPair,
pub batch_channel: CosmosBatchChannel,
pub telemetry: OfaTelemetryWrapper<CosmosTelemetry>,
}

Expand All @@ -31,14 +26,11 @@ impl<Handle: ChainHandle> FullCosmosChainContext<Handle> {
key_entry: Secp256k1KeyPair,
telemetry: CosmosTelemetry,
) -> Self {
let batch_channel = new_batch_channel::<OfaBatchWrapper<CosmosChainWrapper<Self>>>();

let chain = Self {
handle,
signer,
tx_config,
key_entry,
batch_channel,
telemetry: OfaTelemetryWrapper::new(telemetry),
};

Expand Down Expand Up @@ -75,10 +67,6 @@ impl<Handle> CosmosFullChain for FullCosmosChainContext<Handle>
where
Handle: ChainHandle,
{
fn batch_channel(&self) -> &CosmosBatchChannel {
&self.batch_channel
}

fn telemetry(&self) -> &OfaTelemetryWrapper<CosmosTelemetry> {
&self.telemetry
}
Expand Down
44 changes: 40 additions & 4 deletions crates/relayer-cosmos/src/contexts/full/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ use ibc_relayer::config::filter::PacketFilter;
use ibc_relayer::foreign_client::ForeignClient;
use ibc_relayer_framework::base::one_for_all::types::relay::OfaRelayWrapper;
use ibc_relayer_framework::base::relay::traits::target::{DestinationTarget, SourceTarget};
use ibc_relayer_framework::full::batch::config::BatchConfig;
use ibc_relayer_framework::full::batch::spawn::{
BatchMessageWorkerSpawner, CanSpawnBatchMessageWorker,
};
use ibc_relayer_framework::full::batch::impls::spawn::BatchMessageWorkerSpawner;
use ibc_relayer_framework::full::batch::traits::spawn::CanSpawnBatchMessageWorker;
use ibc_relayer_framework::full::batch::types::config::BatchConfig;
use ibc_relayer_runtime::tokio::context::TokioRuntimeContext;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::Mutex;

use crate::base::traits::chain::CosmosChain;
use crate::base::traits::relay::CosmosRelay;
use crate::base::types::relay::CosmosRelayWrapper;
use crate::contexts::full::chain::FullCosmosChainContext;
use crate::full::traits::relay::CosmosFullRelay;
use crate::full::types::batch::{CosmosBatchReceiver, CosmosBatchSender};

#[derive(Clone)]
pub struct FullCosmosRelay<SrcChain, DstChain>
Expand All @@ -33,6 +35,10 @@ where
<DstChain as CosmosChain>::ChainHandle,
>,
pub packet_filter: PacketFilter,
pub src_chain_message_batch_sender: CosmosBatchSender,
pub src_chain_message_batch_receiver: CosmosBatchReceiver,
pub dst_chain_message_batch_sender: CosmosBatchSender,
pub dst_chain_message_batch_receiver: CosmosBatchReceiver,
}

impl<SrcChain, DstChain> FullCosmosRelay<SrcChain, DstChain>
Expand All @@ -53,12 +59,26 @@ where
>,
packet_filter: PacketFilter,
) -> Self {
let (src_chain_message_batch_sender, src_chain_message_batch_receiver) =
unbounded_channel();

let (dst_chain_message_batch_sender, dst_chain_message_batch_receiver) =
unbounded_channel();

let relay = Self {
src_chain,
dst_chain,
src_to_dst_client,
dst_to_src_client,
packet_filter,
src_chain_message_batch_sender,
src_chain_message_batch_receiver: Arc::new(Mutex::new(
src_chain_message_batch_receiver,
)),
dst_chain_message_batch_sender,
dst_chain_message_batch_receiver: Arc::new(Mutex::new(
dst_chain_message_batch_receiver,
)),
};

relay
Expand Down Expand Up @@ -152,4 +172,20 @@ where
fn packet_filter(&self) -> &PacketFilter {
&self.packet_filter
}

fn src_chain_message_batch_sender(&self) -> &CosmosBatchSender {
&self.src_chain_message_batch_sender
}

fn src_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver {
&self.src_chain_message_batch_receiver
}

fn dst_chain_message_batch_sender(&self) -> &CosmosBatchSender {
&self.dst_chain_message_batch_sender
}

fn dst_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver {
&self.dst_chain_message_batch_receiver
}
}
8 changes: 0 additions & 8 deletions crates/relayer-cosmos/src/full/impls/chain.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
use ibc_relayer_framework::full::one_for_all::traits::chain::OfaFullChain;
use ibc_relayer_framework::full::one_for_all::traits::telemetry::OfaTelemetryWrapper;
use ibc_relayer_runtime::tokio::context::TokioRuntimeContext;

use crate::base::types::chain::CosmosChainWrapper;
use crate::full::traits::chain::CosmosFullChain;
use crate::full::types::batch::CosmosBatchChannel;
use crate::full::types::telemetry::CosmosTelemetry;

impl<Chain> OfaFullChain for CosmosChainWrapper<Chain>
where
Chain: CosmosFullChain,
{
type BatchContext = TokioRuntimeContext;

type Telemetry = CosmosTelemetry;

fn batch_channel(&self) -> &CosmosBatchChannel {
self.chain.batch_channel()
}

fn telemetry(&self) -> &OfaTelemetryWrapper<CosmosTelemetry> {
self.chain.telemetry()
}
Expand Down
17 changes: 17 additions & 0 deletions crates/relayer-cosmos/src/full/impls/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use ibc_relayer_framework::full::one_for_all::traits::relay::OfaFullRelay;

use crate::base::types::relay::CosmosRelayWrapper;
use crate::full::traits::relay::CosmosFullRelay;
use crate::full::types::batch::{CosmosBatchReceiver, CosmosBatchSender};

#[async_trait]
impl<Relay> OfaFullRelay for CosmosRelayWrapper<Relay>
Expand All @@ -23,4 +24,20 @@ where
.packet_filter()
.is_allowed(&packet.source_port, &packet.source_channel))
}

fn src_chain_message_batch_sender(&self) -> &CosmosBatchSender {
self.relay.src_chain_message_batch_sender()
}

fn src_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver {
self.relay.src_chain_message_batch_receiver()
}

fn dst_chain_message_batch_sender(&self) -> &CosmosBatchSender {
self.relay.dst_chain_message_batch_sender()
}

fn dst_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver {
self.relay.dst_chain_message_batch_receiver()
}
}
3 changes: 0 additions & 3 deletions crates/relayer-cosmos/src/full/traits/chain.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use ibc_relayer_framework::full::one_for_all::traits::telemetry::OfaTelemetryWrapper;

use crate::base::traits::chain::CosmosChain;
use crate::full::types::batch::CosmosBatchChannel;
use crate::full::types::telemetry::CosmosTelemetry;

pub trait CosmosFullChain: CosmosChain {
fn batch_channel(&self) -> &CosmosBatchChannel;

fn telemetry(&self) -> &OfaTelemetryWrapper<CosmosTelemetry>;
}
9 changes: 9 additions & 0 deletions crates/relayer-cosmos/src/full/traits/relay.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
use ibc_relayer::config::filter::PacketFilter;

use crate::base::traits::relay::CosmosRelay;
use crate::full::types::batch::{CosmosBatchReceiver, CosmosBatchSender};

pub trait CosmosFullRelay: CosmosRelay {
fn packet_filter(&self) -> &PacketFilter;

fn src_chain_message_batch_sender(&self) -> &CosmosBatchSender;

fn src_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver;

fn dst_chain_message_batch_sender(&self) -> &CosmosBatchSender;

fn dst_chain_message_batch_receiver(&self) -> &CosmosBatchReceiver;
}
15 changes: 7 additions & 8 deletions crates/relayer-cosmos/src/full/types/batch.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use ibc_relayer_framework::full::batch::context::BatchChannel;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tendermint::abci::Event;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::Sender as SenderOnce;
use tokio::sync::Mutex;

use crate::base::error::Error;
use crate::base::types::message::CosmosIbcMessage;

pub type CosmosBatchPayload = (
Vec<CosmosIbcMessage>,
oneshot::Sender<Result<Vec<Vec<Event>>, Error>>,
SenderOnce<Result<Vec<Vec<Event>>, Error>>,
);

pub type CosmosBatchSender = mpsc::UnboundedSender<CosmosBatchPayload>;
pub type CosmosBatchSender = UnboundedSender<CosmosBatchPayload>;

pub type CosmosBatchReceiver = Arc<Mutex<mpsc::UnboundedReceiver<CosmosBatchPayload>>>;

pub type CosmosBatchChannel = BatchChannel<CosmosBatchSender, CosmosBatchReceiver>;
pub type CosmosBatchReceiver = Arc<Mutex<UnboundedReceiver<CosmosBatchPayload>>>;
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use ibc_relayer_framework::base::one_for_all::traits::chain::{
};

use ibc_relayer_framework::base::one_for_all::presets::min::MinimalPreset;
use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;
use ibc_relayer_runtime::tokio::error::Error as TokioError;

impl OfaChainTypes for MockChainContext {
Expand Down Expand Up @@ -66,7 +66,7 @@ impl OfaChainTypes for MockChainContext {

#[async_trait]
impl OfaBaseChain for MockChainContext {
fn runtime(&self) -> &OfaRuntimeContext<MockRuntimeContext> {
fn runtime(&self) -> &OfaRuntimeWrapper<MockRuntimeContext> {
self.runtime()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use crate::relayer_mock::contexts::relay::MockRelayContext;
use ibc_relayer_framework::base::one_for_all::presets::min::MinimalPreset;
use ibc_relayer_framework::base::one_for_all::traits::chain::OfaChainTypes;
use ibc_relayer_framework::base::one_for_all::traits::relay::{OfaBaseRelay, OfaRelayTypes};
use ibc_relayer_framework::base::one_for_all::traits::runtime::OfaRuntimeContext;
use ibc_relayer_framework::base::one_for_all::types::chain::OfaChainWrapper;
use ibc_relayer_framework::base::one_for_all::types::runtime::OfaRuntimeWrapper;
use ibc_relayer_runtime::tokio::error::Error as TokioError;

impl OfaRelayTypes for MockRelayContext {
Expand Down Expand Up @@ -82,7 +82,7 @@ impl OfaBaseRelay for MockRelayContext {
&packet.timeout_timestamp
}

fn runtime(&self) -> &OfaRuntimeContext<Self::Runtime> {
fn runtime(&self) -> &OfaRuntimeWrapper<Self::Runtime> {
&self.runtime
}

Expand Down
Loading