Skip to content

Commit

Permalink
wip(libp2p): Use new API for swap/src/network/swap_setup/bob.rs (See l…
Browse files Browse the repository at this point in the history
  • Loading branch information
binarybaron committed Oct 18, 2024
1 parent df64e76 commit 0b8f578
Showing 1 changed file with 117 additions and 118 deletions.
235 changes: 117 additions & 118 deletions swap/src/network/swap_setup/bob.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
use crate::network::swap_setup::{
protocol, read_cbor_message, write_cbor_message, BlockchainNetwork, SpotPriceError,
SpotPriceRequest, SpotPriceResponse,
};
use crate::network::swap_setup::{protocol, BlockchainNetwork, SpotPriceError, SpotPriceResponse};
use crate::protocol::bob::{State0, State2};
use crate::protocol::{Message1, Message3};
use crate::{bitcoin, cli, env, monero};
use anyhow::Result;
use futures::future::{BoxFuture, OptionFuture};
use futures::{AsyncWriteExt, FutureExt};
use libp2p::core::connection::ConnectionId;
use futures::FutureExt;
use libp2p::core::upgrade;
use libp2p::swarm::{
ConnectionDenied, ConnectionHandler, FromSwarm, KeepAlive, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm
ConnectionDenied, ConnectionHandler, ConnectionHandlerEvent, ConnectionId, FromSwarm,
NetworkBehaviour, SubstreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use libp2p::{Multiaddr, PeerId};
use std::collections::VecDeque;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use uuid::Uuid;
use void::Void;

use super::{read_cbor_message, write_cbor_message, SpotPriceRequest};

#[allow(missing_debug_implementations)]
pub struct Behaviour {
Expand Down Expand Up @@ -52,7 +50,7 @@ impl From<Completed> for cli::OutEvent {

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Handler;
type ToSwarm = ToSwarm<Completed, Self::ConnectionHandler>;
type ToSwarm = Completed;

fn handle_established_inbound_connection(
&mut self,
Expand All @@ -76,26 +74,17 @@ impl NetworkBehaviour for Behaviour {

fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(_) => {},
FromSwarm::ConnectionClosed(_) => {},
_ => {},
FromSwarm::ConnectionEstablished(_) => {}
FromSwarm::ConnectionClosed(_) => {}
_ => {}
}
}

/*fn on_connection_handler_event(
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
_connection_id: ConnectionId,
event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
self.completed_swaps.push_back((peer_id, event));
}*/

fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
_connection_id: libp2p::swarm::ConnectionId,
event: THandlerOutEvent<Self>,
_connection_id: libp2p::swarm::ConnectionId,
event: THandlerOutEvent<Self>,
) {
self.completed_swaps.push_back((peer_id, event));
}
Expand All @@ -119,7 +108,7 @@ pub struct Handler {
timeout: Duration,
new_swaps: VecDeque<NewSwap>,
bitcoin_wallet: Arc<bitcoin::Wallet>,
keep_alive: KeepAlive,
keep_alive: bool,
}

impl Handler {
Expand All @@ -130,7 +119,7 @@ impl Handler {
timeout: Duration::from_secs(120),
new_swaps: VecDeque::default(),
bitcoin_wallet,
keep_alive: KeepAlive::Yes,
keep_alive: true,
}
}
}
Expand All @@ -147,10 +136,9 @@ pub struct NewSwap {
#[derive(Debug)]
pub struct Completed(Result<State2>);

impl ProtocolsHandler for Handler {
type InEvent = NewSwap;
type OutEvent = Completed;
type Error = Void;
impl ConnectionHandler for Handler {
type FromBehaviour = NewSwap;
type ToBehaviour = Completed;
type InboundProtocol = upgrade::DeniedUpgrade;
type OutboundProtocol = protocol::SwapSetup;
type InboundOpenInfo = ();
Expand All @@ -160,114 +148,125 @@ impl ProtocolsHandler for Handler {
SubstreamProtocol::new(upgrade::DeniedUpgrade, ())
}

fn inject_fully_negotiated_inbound(&mut self, _: Void, _: Self::InboundOpenInfo) {
unreachable!("Bob does not support inbound substreams")
}

fn inject_fully_negotiated_outbound(
&mut self,
mut substream: NegotiatedSubstream,
info: Self::OutboundOpenInfo,
) {
let bitcoin_wallet = self.bitcoin_wallet.clone();
let env_config = self.env_config;

let protocol = tokio::time::timeout(self.timeout, async move {
write_cbor_message(
&mut substream,
SpotPriceRequest {
btc: info.btc,
blockchain_network: BlockchainNetwork {
bitcoin: env_config.bitcoin_network,
monero: env_config.monero_network,
},
},
)
.await?;

let xmr = Result::from(read_cbor_message::<SpotPriceResponse>(&mut substream).await?)?;

let state0 = State0::new(
info.swap_id,
&mut rand::thread_rng(),
info.btc,
xmr,
env_config.bitcoin_cancel_timelock,
env_config.bitcoin_punish_timelock,
info.bitcoin_refund_address,
env_config.monero_finality_confirmations,
info.tx_refund_fee,
info.tx_cancel_fee,
);

write_cbor_message(&mut substream, state0.next_message()).await?;
let message1 = read_cbor_message::<Message1>(&mut substream).await?;
let state1 = state0.receive(bitcoin_wallet.as_ref(), message1).await?;

write_cbor_message(&mut substream, state1.next_message()).await?;
let message3 = read_cbor_message::<Message3>(&mut substream).await?;
let state2 = state1.receive(message3)?;

write_cbor_message(&mut substream, state2.next_message()).await?;

substream.flush().await?;
substream.close().await?;

Ok(state2)
});

let max_seconds = self.timeout.as_secs();
self.outbound_stream = OptionFuture::from(Some(
async move {
protocol.await.map_err(|_| Error::Timeout {
seconds: max_seconds,
})?
}
.boxed(),
));
}

fn inject_event(&mut self, new_swap: Self::InEvent) {
fn on_behaviour_event(&mut self, new_swap: Self::FromBehaviour) {
self.new_swaps.push_back(new_swap);
}

fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
_: ProtocolsHandlerUpgrErr<Void>,
) {
}

fn connection_keep_alive(&self) -> KeepAlive {
fn connection_keep_alive(&self) -> bool {
self.keep_alive
}

#[allow(clippy::type_complexity)]
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
if let Some(new_swap) = self.new_swaps.pop_front() {
self.keep_alive = KeepAlive::Yes;
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
self.keep_alive = true;
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(protocol::new(), new_swap),
});
}

if let Some(result) = futures::ready!(self.outbound_stream.poll_unpin(cx)) {
self.outbound_stream = OptionFuture::from(None);
return Poll::Ready(ProtocolsHandlerEvent::Custom(Completed(result)));
if let Some(outbound_stream) = self.outbound_stream.as_mut() {
if let Poll::Ready(result) = outbound_stream.poll_unpin(cx) {
self.outbound_stream = None;
self.keep_alive = false; // Set to false after completing the stream
return Poll::Ready(ConnectionHandlerEvent::Custom(Completed(result)));
}
}

Poll::Pending
}

fn on_connection_event(
&mut self,
event: libp2p::swarm::handler::ConnectionEvent<
Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) {
match event {
libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedInbound(_, _) => {
unreachable!("Bob does not support inbound substreams")
}
libp2p::swarm::handler::ConnectionEvent::FullyNegotiatedOutbound(outbound) => {
let mut substream = outbound.protocol;
let info = outbound.info;

let bitcoin_wallet = self.bitcoin_wallet.clone();
let env_config = self.env_config;

let bitcoin_wallet = self.bitcoin_wallet.clone();
let env_config = self.env_config;

let protocol = tokio::time::timeout(self.timeout, async move {
write_cbor_message(
&mut substream,
SpotPriceRequest {
btc: info.btc,
blockchain_network: BlockchainNetwork {
bitcoin: env_config.bitcoin_network,
monero: env_config.monero_network,
},
},
)
.await?;

let xmr = Result::from(
read_cbor_message::<SpotPriceResponse>(&mut substream).await?,
)?;

let state0 = State0::new(
info.swap_id,
&mut rand::thread_rng(),
info.btc,
xmr,
env_config.bitcoin_cancel_timelock,
env_config.bitcoin_punish_timelock,
info.bitcoin_refund_address,
env_config.monero_finality_confirmations,
info.tx_refund_fee,
info.tx_cancel_fee,
);

write_cbor_message(&mut substream, state0.next_message()).await?;
let message1 = read_cbor_message::<Message1>(&mut substream).await?;
let state1 = state0.receive(bitcoin_wallet.as_ref(), message1).await?;

write_cbor_message(&mut substream, state1.next_message()).await?;
let message3 = read_cbor_message::<Message3>(&mut substream).await?;
let state2 = state1.receive(message3)?;

write_cbor_message(&mut substream, state2.next_message()).await?;

substream.flush().await?;
substream.close().await?;

Ok(state2)
});

let max_seconds = self.timeout.as_secs();
self.outbound_stream = Some(Box::pin(
async move {
protocol.await.map_err(|_| Error::Timeout {
seconds: max_seconds,
})?
}
.boxed(),
));
self.keep_alive = true; // Ensure the connection stays alive while processing
}
libp2p::swarm::handler::ConnectionEvent::DialUpgradeError(dial_upgrade_err) => {
// Handle dial upgrade error if needed
self.keep_alive = false; // Consider setting to false on error
}
_ => {}
}
}
}

impl From<SpotPriceResponse> for Result<monero::Amount, Error> {
Expand Down

0 comments on commit 0b8f578

Please sign in to comment.