Skip to content

Commit

Permalink
Add timeouts to taker message sends/recv
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-belcher committed Feb 10, 2022
1 parent 0f75c1d commit af3e8a3
Showing 1 changed file with 175 additions and 111 deletions.
286 changes: 175 additions & 111 deletions src/taker_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::select;
use tokio::time::sleep;

use tokio_socks::tcp::Socks5Stream;
Expand Down Expand Up @@ -58,6 +59,7 @@ pub const REFUND_LOCKTIME_STEP: u16 = 3; //in blocks
// in people's closets, which are very important for decentralization
const FIRST_CONNECT_ATTEMPTS: u32 = 5;
const FIRST_CONNECT_SLEEP_DELAY_MSEC: u64 = 1000;
const FIRST_CONNECT_ATTEMPT_TIMEOUT: u64 = 20000;

//reconnect means when connecting to a maker again after having already gotten txes confirmed
// as it would be a waste of miner fees to give up, the taker is coded to be very persistent
Expand All @@ -69,6 +71,7 @@ const RECONNECT_ATTEMPTS: u32 = 3200;
const RECONNECT_SHORT_SLEEP_DELAY_MSEC: u64 = 10000;
const RECONNECT_LONG_SLEEP_DELAY_MSEC: u64 = 1000 * 60;
const SHORT_LONG_SLEEP_DELAY_TRANSITION: u32 = 60; //after this many attempts, switch to sleeping longer
const RECONNECT_ATTEMPT_TIMEOUT: u64 = 1000 * 60 * 5;

#[derive(Debug, Clone, Copy)]
pub struct TakerConfig {
Expand Down Expand Up @@ -513,30 +516,44 @@ async fn request_senders_contract_tx_signatures<S: SwapCoin>(
let mut ii = 0;
loop {
ii += 1;
let ret = request_senders_contract_tx_signatures_attempt_once(
maker_address,
outgoing_swapcoins,
maker_multisig_nonces,
maker_hashlock_nonces,
locktime,
)
.await;
match ret {
Ok(sigs) => return Ok(sigs),
Err(e) => {
select! {
ret = request_senders_contract_tx_signatures_attempt_once(
maker_address,
outgoing_swapcoins,
maker_multisig_nonces,
maker_hashlock_nonces,
locktime,
) => {
match ret {
Ok(sigs) => return Ok(sigs),
Err(e) => {
log::warn!(
"Failed to request senders contract tx sigs from maker {}, \
reattempting... error={:?}",
maker_address,
e
);
if ii <= FIRST_CONNECT_ATTEMPTS {
sleep(Duration::from_millis(FIRST_CONNECT_SLEEP_DELAY_MSEC)).await;
continue;
} else {
return Err(e);
}
}
}
},
_ = sleep(Duration::from_millis(FIRST_CONNECT_ATTEMPT_TIMEOUT)) => {
log::warn!(
"Failed to request senders contract tx sigs from maker {}, \
reattempting... error={:?}",
maker_address,
e
"Timeout for request senders contract tx sig from maker {}, reattempting...",
maker_address
);
if ii <= FIRST_CONNECT_ATTEMPTS {
sleep(Duration::from_millis(FIRST_CONNECT_SLEEP_DELAY_MSEC)).await;
continue;
} else {
return Err(e);
return Err(Error::Protocol(
"Timed out of request_senders_contract_tx_signatures attempt"));
}
}
},
}
}
}
Expand All @@ -548,6 +565,7 @@ async fn request_senders_contract_tx_signatures_attempt_once<S: SwapCoin>(
maker_hashlock_nonces: &[SecretKey],
locktime: u16,
) -> Result<Vec<Signature>, Error> {
log::info!("Connecting to {}", maker_address);
let mut socket = TcpStream::connect(maker_address.get_tcpstream_address()).await?;
let (mut socket_reader, mut socket_writer) =
handshake_maker(&mut socket, maker_address).await?;
Expand Down Expand Up @@ -608,35 +626,49 @@ async fn request_receivers_contract_tx_signatures<S: SwapCoin>(
let mut ii = 0;
loop {
ii += 1;
let ret = request_receivers_contract_tx_signatures_attempt_once(
maker_address,
incoming_swapcoins,
receivers_contract_txes,
)
.await;
match ret {
Ok(sigs) => return Ok(sigs),
Err(e) => {
select! {
ret = request_receivers_contract_tx_signatures_attempt_once(
maker_address,
incoming_swapcoins,
receivers_contract_txes,
) => {
match ret {
Ok(sigs) => return Ok(sigs),
Err(e) => {
log::warn!(
"Failed to request receivers contract tx sigs from maker {}, \
reattempting... error={:?}",
maker_address,
e
);
if ii <= RECONNECT_ATTEMPTS {
sleep(Duration::from_millis(
if ii <= SHORT_LONG_SLEEP_DELAY_TRANSITION {
RECONNECT_SHORT_SLEEP_DELAY_MSEC
} else {
RECONNECT_LONG_SLEEP_DELAY_MSEC
},
))
.await;
continue;
} else {
return Err(e);
}
}
}
},
_ = sleep(Duration::from_millis(RECONNECT_ATTEMPT_TIMEOUT)) => {
log::warn!(
"Failed to request receivers contract tx sigs from maker {}, \
reattempting... error={:?}",
maker_address,
e
"Timeout for request receivers contract tx sig from maker {}, reattempting...",
maker_address
);
if ii <= RECONNECT_ATTEMPTS {
sleep(Duration::from_millis(
if ii <= SHORT_LONG_SLEEP_DELAY_TRANSITION {
RECONNECT_SHORT_SLEEP_DELAY_MSEC
} else {
RECONNECT_LONG_SLEEP_DELAY_MSEC
},
))
.await;
continue;
} else {
return Err(e);
return Err(Error::Protocol(
"Timed out of request_receivers_contract_tx_signatures attempt"));
}
}
},
}
}
}
Expand All @@ -646,6 +678,7 @@ async fn request_receivers_contract_tx_signatures_attempt_once<S: SwapCoin>(
incoming_swapcoins: &[S],
receivers_contract_txes: &[Transaction],
) -> Result<Vec<Signature>, Error> {
log::info!("Connecting to {}", maker_address);
let mut socket = TcpStream::connect(maker_address.get_tcpstream_address()).await?;
let (mut socket_reader, mut socket_writer) =
handshake_maker(&mut socket, maker_address).await?;
Expand Down Expand Up @@ -838,50 +871,64 @@ async fn exchange_signatures_and_find_next_maker<'a>(
let mut ii = 0;
loop {
ii += 1;
let ret = exchange_signatures_and_find_next_maker_attempt_once(
rpc,
config,
maker_offers_addresses,
this_maker,
previous_maker,
is_taker_previous_peer,
is_taker_next_peer,
funding_txes,
funding_tx_merkleproofs,
this_maker_multisig_redeemscripts,
this_maker_multisig_privkeys,
this_maker_contract_redeemscripts,
this_maker_hashlock_privkeys,
this_maker_contract_txes,
maker_refund_locktime,
hashvalue,
outgoing_swapcoins,
watchonly_swapcoins,
)
.await;
match ret {
Ok(return_value) => return Ok(return_value),
Err(e) => {
select! {
ret = exchange_signatures_and_find_next_maker_attempt_once(
rpc,
config,
maker_offers_addresses,
this_maker,
previous_maker,
is_taker_previous_peer,
is_taker_next_peer,
funding_txes,
funding_tx_merkleproofs,
this_maker_multisig_redeemscripts,
this_maker_multisig_privkeys,
this_maker_contract_redeemscripts,
this_maker_hashlock_privkeys,
this_maker_contract_txes,
maker_refund_locktime,
hashvalue,
outgoing_swapcoins,
watchonly_swapcoins,
) => {
match ret {
Ok(return_value) => return Ok(return_value),
Err(e) => {
log::warn!(
"Failed to exchange signatures with maker {}, \
reattempting... error={:?}",
this_maker.address,
e
);
if ii <= RECONNECT_ATTEMPTS {
sleep(Duration::from_millis(
if ii <= SHORT_LONG_SLEEP_DELAY_TRANSITION {
RECONNECT_SHORT_SLEEP_DELAY_MSEC
} else {
RECONNECT_LONG_SLEEP_DELAY_MSEC
},
))
.await;
continue;
} else {
return Err(e);
}
}
}
},
_ = sleep(Duration::from_millis(RECONNECT_ATTEMPT_TIMEOUT)) => {
log::warn!(
"Failed to exchange signatures with maker {}, \
reattempting... error={:?}",
this_maker.address,
e
"Timeout for exchange signatures with maker {}, reattempting...",
this_maker.address
);
if ii <= RECONNECT_ATTEMPTS {
sleep(Duration::from_millis(
if ii <= SHORT_LONG_SLEEP_DELAY_TRANSITION {
RECONNECT_SHORT_SLEEP_DELAY_MSEC
} else {
RECONNECT_LONG_SLEEP_DELAY_MSEC
},
))
.await;
continue;
} else {
return Err(e);
return Err(Error::Protocol(
"Timed out of exchange_signatures_and_find_next_maker attempt"));
}
}
},
}
}
}
Expand Down Expand Up @@ -919,6 +966,7 @@ async fn exchange_signatures_and_find_next_maker_attempt_once<'a>(
//return next_peer_multisig_pubkeys, next_peer_multisig_keys_or_nonces,
// next_peer_hashlock_keys_or_nonces, (), next_swap_contract_redeemscripts, found_next_maker

log::info!("Connecting to {}", this_maker.address);
let mut socket = TcpStream::connect(this_maker.address.get_tcpstream_address()).await?;
let (mut socket_reader, mut socket_writer) =
handshake_maker(&mut socket, &this_maker.address).await?;
Expand Down Expand Up @@ -1445,41 +1493,56 @@ async fn settle_all_coinswaps_send_hash_preimage_and_privkeys(
let mut ii = 0;
loop {
ii += 1;
let ret = settle_one_coinswap(
maker_address,
index,
is_taker_previous_peer,
is_taker_next_peer,
&mut outgoing_privkeys,
outgoing_swapcoins,
watchonly_swapcoins,
incoming_swapcoins,
&senders_multisig_redeemscripts,
&receivers_multisig_redeemscripts,
preimage,
)
.await;
if let Err(e) = ret {
log::warn!(
"Failed to connect to maker {} to settle coinswap, reattempting... error={:?}",
select! {
ret = settle_one_coinswap(
maker_address,
e
);
if ii <= RECONNECT_ATTEMPTS {
sleep(Duration::from_millis(
if ii <= SHORT_LONG_SLEEP_DELAY_TRANSITION {
RECONNECT_SHORT_SLEEP_DELAY_MSEC
index,
is_taker_previous_peer,
is_taker_next_peer,
&mut outgoing_privkeys,
outgoing_swapcoins,
watchonly_swapcoins,
incoming_swapcoins,
&senders_multisig_redeemscripts,
&receivers_multisig_redeemscripts,
preimage,
) => {
if let Err(e) = ret {
log::warn!(
"Failed to connect to maker {} to settle coinswap, \
reattempting... error={:?}",
maker_address,
e
);
if ii <= RECONNECT_ATTEMPTS {
sleep(Duration::from_millis(
if ii <= SHORT_LONG_SLEEP_DELAY_TRANSITION {
RECONNECT_SHORT_SLEEP_DELAY_MSEC
} else {
RECONNECT_LONG_SLEEP_DELAY_MSEC
},
))
.await;
continue;
} else {
RECONNECT_LONG_SLEEP_DELAY_MSEC
},
))
.await;
continue;
} else {
return Err(e);
}
return Err(e);
}
}
break;
},
_ = sleep(Duration::from_millis(RECONNECT_ATTEMPT_TIMEOUT)) => {
log::warn!(
"Timeout for settling coinswap with maker {}, reattempting...",
maker_address
);
if ii <= RECONNECT_ATTEMPTS {
continue;
} else {
return Err(Error::Protocol(
"Timed out of settle_one_coinswap attempt"));
}
},
}
break;
}
}
Ok(())
Expand All @@ -1498,6 +1561,7 @@ async fn settle_one_coinswap(
receivers_multisig_redeemscripts: &Vec<Script>,
preimage: Preimage,
) -> Result<(), Error> {
log::info!("Connecting to {}", maker_address);
let mut socket = TcpStream::connect(maker_address.get_tcpstream_address()).await?;
let (mut socket_reader, mut socket_writer) =
handshake_maker(&mut socket, maker_address).await?;
Expand Down

0 comments on commit af3e8a3

Please sign in to comment.