Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward over substitute channels #1578

Closed
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9864a26
Add `counterparty_node_id` to `short_to_id` map
ViktorTigerstrom May 17, 2022
0780fb9
Rename `short_to_id` map to `short_to_chan_info`
ViktorTigerstrom Jun 9, 2022
7833589
Add id_to_peer map
ViktorTigerstrom Jun 3, 2022
b0975eb
Add `ChannelManager:id_to_peer` map coverage test
ViktorTigerstrom Jun 6, 2022
d3667d9
Add `counterparty_node` to test macros
ViktorTigerstrom May 27, 2022
6c62533
Store channels per-peer
ViktorTigerstrom May 25, 2022
519fcb4
f - Store per peer: aquire locks inside get_channel_ref
ViktorTigerstrom Jun 9, 2022
c37b984
f - Make per_peer_state a FairRwLock
ViktorTigerstrom Jun 6, 2022
f7b51d2
f - Remove unreacable branches that can be reached
ViktorTigerstrom Jun 6, 2022
2c112d7
f - Store channels per-peer: Update per_peer_state docs
ViktorTigerstrom Jun 7, 2022
7b594ee
Remove unnecessary `per_peer_state` branch
ViktorTigerstrom Jun 10, 2022
7e9ac7b
Avoid retaking locks
ViktorTigerstrom Jun 6, 2022
6c4b80d
Update failure to query `Channel` error messages
ViktorTigerstrom Jun 6, 2022
4991456
Add duplicate temporary_channel_id for 2 peers test
ViktorTigerstrom May 31, 2022
ae665ce
Add handle unkown peer test
ViktorTigerstrom Jun 13, 2022
ef4107a
Enable htlc forwarding over substitute channels
ViktorTigerstrom Jun 20, 2022
1d44945
Test htlc forwarding over substitute channels
ViktorTigerstrom Jun 20, 2022
1dec1f9
f - Move creation of CommitmentUpdate structs
ViktorTigerstrom Jun 27, 2022
6b9a261
-f DRY up htlc_msg macros
ViktorTigerstrom Jun 27, 2022
13dbfe3
f - cleanup channel selection order
ViktorTigerstrom Jun 29, 2022
92a067c
f - cleanup tests
ViktorTigerstrom Jun 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 118 additions & 66 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3081,9 +3081,36 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(forward_chan_id) {
let mut add_htlc_msgs = Vec::new();
let mut fail_htlc_msgs = Vec::new();
if peer_state.channel_by_id.contains_key(&forward_chan_id) {
let mut htlcs_msgs_by_id: HashMap<[u8; 32], (Vec<msgs::UpdateAddHTLC>, Vec<msgs::UpdateFailHTLC>)> = HashMap::new();

macro_rules! add_channel_key {
($channel_id: expr) => {{
if !htlcs_msgs_by_id.contains_key(&$channel_id){
htlcs_msgs_by_id.insert($channel_id, (Vec::new(), Vec::new()));
}
}}
}

macro_rules! add_update_add_htlc {
($add_htlc_msg: expr, $channel_id: expr) => {{
add_channel_key!($channel_id);
if let hash_map::Entry::Occupied(mut entry) = htlcs_msgs_by_id.entry($channel_id) {
let msgs_entry = entry.get_mut();
msgs_entry.0.push($add_htlc_msg);
}
}}
}

macro_rules! add_update_fail_htlc {
($fail_htlc_msg: expr, $channel_id: expr) => {{
add_channel_key!($channel_id);
if let hash_map::Entry::Occupied(mut entry) = htlcs_msgs_by_id.entry($channel_id) {
let msgs_entry = entry.get_mut();
msgs_entry.1.push($fail_htlc_msg);
}
}}
}
for forward_info in pending_forwards.drain(..) {
match forward_info {
HTLCForwardInfo::AddHTLC { prev_short_channel_id, prev_htlc_id, forward_info: PendingHTLCInfo {
Expand All @@ -3100,41 +3127,64 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
// Phantom payments are only PendingHTLCRouting::Receive.
phantom_shared_secret: None,
});
match chan.get_mut().send_htlc(amt_to_forward, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet, &self.logger) {
Err(e) => {
if let ChannelError::Ignore(msg) = e {
log_trace!(self.logger, "Failed to forward HTLC with payment_hash {}: {}", log_bytes!(payment_hash.0), msg);
} else {
panic!("Stated return value requirements in send_htlc() were not met");
}
let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan.get());
failed_forwards.push((htlc_source, payment_hash,
HTLCFailReason::Reason { failure_code, data }
));
continue;
},
Ok(update_add) => {
match update_add {
Some(msg) => { add_htlc_msgs.push(msg); },
None => {
// Nothing to do here...we're waiting on a remote
// revoke_and_ack before we can add anymore HTLCs. The Channel
// will automatically handle building the update_add_htlc and
// commitment_signed messages when we can.
// TODO: Do some kind of timer to set the channel as !is_live()
// as we don't really want others relying on us relaying through
// this channel currently :/.

// Attempt to forward the HTLC over all available channels
// to the peer, but attempt to forward the HTLC over the
// channel specified in the onion payload first.
let mut counterparty_channel_ids = peer_state.channel_by_id.keys()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can think of a more efficient way of doing this, please me know :).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using iter::once() and iter::chain() should allow you to get rid of the insert. E.g.:

let mut counterparty_channel_ids = core::iter::once(forward_chan_id)
    .chain(peer_state.channel_by_id.keys()
                .filter(|&chan_id| *chan_id != forward_chan_id)
                .map(|&chan_id| chan_id))
    .collect::<Vec<_>>();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice thanks! 13dbfe3

.filter(|chan_id| **chan_id != forward_chan_id)
.map(|chan_id| *chan_id).collect::<Vec<_>>();
counterparty_channel_ids.insert(0, forward_chan_id);
let mut send_succeeded = false;
for chan_id in counterparty_channel_ids {
match peer_state.channel_by_id.get_mut(&chan_id).unwrap().send_htlc(amt_to_forward, payment_hash, outgoing_cltv_value, htlc_source.clone(), onion_packet.clone(), &self.logger) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR assumes that we want to attempt to forward the HTLC over another channel if forwarding over the specified channel in the onion fails for any reason, as I figured that it's probably useful to attempt forwarding over substitute channels for more reasons than just insufficient liquidity as specified in the issue. Is that correct?

If so, would we like to avoid attempt forwarding over substitute channels for some failure reasons, such as for example that the peer is disconnected?

Err(e) => {
if let ChannelError::Ignore(msg) = e {
log_trace!(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit unsure of our logging guidelines. Is this line and the log_info + log_trace calls I've added below how we would like to handle logging?

self.logger,
"Could not forward HTLC with payment_hash {}, over channel {} to peer {}. Will attempt to forward the HTLC over a substitute channel instead if possible. Reason: {}",
log_bytes!(payment_hash.0), log_bytes!(chan_id), counterparty_node_id, msg
);
} else {
panic!("Stated return value requirements in send_htlc() were not met");
}
},
Ok(update_add) => {
match update_add {
Some(msg) => {
log_info!(self.logger, "Will forward HTLC with payment_hash {}, over channel {}", log_bytes!(payment_hash.0), log_bytes!(chan_id));
add_update_add_htlc!(msg, chan_id);
},
None => {
// Nothing to do here...we're waiting on a remote
// revoke_and_ack before we can add anymore HTLCs. The Channel
// will automatically handle building the update_add_htlc and
// commitment_signed messages when we can.
// TODO: Do some kind of timer to set the channel as !is_live()
// as we don't really want others relying on us relaying through
// this channel currently :/.
}
}
send_succeeded = true;
break;
}
}
}
if !send_succeeded {
log_trace!(self.logger, "Failed to forward HTLC with payment_hash {} over all of the available channels to the peer", log_bytes!(payment_hash.0));
let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, peer_state.channel_by_id.get(&forward_chan_id).unwrap());
failed_forwards.push((htlc_source, payment_hash,
HTLCFailReason::Reason { failure_code, data }
));
continue;
}
},
HTLCForwardInfo::AddHTLC { .. } => {
panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
},
HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
log_trace!(self.logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
match chan.get_mut().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
match peer_state.channel_by_id.get_mut(&forward_chan_id).unwrap().get_update_fail_htlc(htlc_id, err_packet, &self.logger) {
Err(e) => {
if let ChannelError::Ignore(msg) = e {
log_trace!(self.logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
Expand All @@ -3146,7 +3196,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
// the chain and sending the HTLC-Timeout is their problem.
continue;
},
Ok(Some(msg)) => { fail_htlc_msgs.push(msg); },
Ok(Some(msg)) => { add_update_fail_htlc!(msg, forward_chan_id); },
Ok(None) => {
// Nothing to do here...we're waiting on a remote
// revoke_and_ack before we can update the commitment
Expand All @@ -3162,46 +3212,48 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
}
}

if !add_htlc_msgs.is_empty() || !fail_htlc_msgs.is_empty() {
let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) {
Ok(res) => res,
Err(e) => {
// We surely failed send_commitment due to bad keys, in that case
// close channel and then send error message to peer.
let counterparty_node_id = chan.get().get_counterparty_node_id();
let err: Result<(), _> = match e {
ChannelError::Ignore(_) | ChannelError::Warn(_) => {
panic!("Stated return value requirements in send_commitment() were not met");
}
ChannelError::Close(msg) => {
log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
let mut channel = remove_channel!(self, channel_state, chan);
// ChannelClosed event is generated by handle_error for us.
Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
},
ChannelError::CloseDelayBroadcast(_) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
};
handle_errors.push((counterparty_node_id, err));
for (chan_id, (add_htlc_msgs, fail_htlc_msgs)) in htlcs_msgs_by_id.into_iter() {
if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(chan_id) {
let (commitment_msg, monitor_update) = match chan.get_mut().send_commitment(&self.logger) {
Ok(res) => res,
Err(e) => {
// We surely failed send_commitment due to bad keys, in that case
// close channel and then send error message to peer.
let counterparty_node_id = chan.get().get_counterparty_node_id();
let err: Result<(), _> = match e {
ChannelError::Ignore(_) | ChannelError::Warn(_) => {
panic!("Stated return value requirements in send_commitment() were not met");
}
ChannelError::Close(msg) => {
log_trace!(self.logger, "Closing channel {} due to Close-required error: {}", log_bytes!(chan.key()[..]), msg);
let mut channel = remove_channel!(self, channel_state, chan);
// ChannelClosed event is generated by handle_error for us.
Err(MsgHandleErrInternal::from_finish_shutdown(msg, channel.channel_id(), channel.get_user_id(), channel.force_shutdown(true), self.get_channel_update_for_broadcast(&channel).ok()))
},
ChannelError::CloseDelayBroadcast(_) => { panic!("Wait is only generated on receipt of channel_reestablish, which is handled by try_chan_entry, we don't bother to support it here"); }
};
handle_errors.push((counterparty_node_id, err));
continue;
}
};
if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, true)));
continue;
}
};
if let Err(e) = self.chain_monitor.update_channel(chan.get().get_funding_txo().unwrap(), monitor_update) {
handle_errors.push((chan.get().get_counterparty_node_id(), handle_monitor_err!(self, e, channel_state, chan, RAACommitmentOrder::CommitmentFirst, false, true)));
continue;
log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}",
add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id()));
channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
node_id: chan.get().get_counterparty_node_id(),
updates: msgs::CommitmentUpdate {
update_add_htlcs: add_htlc_msgs,
update_fulfill_htlcs: Vec::new(),
update_fail_htlcs: fail_htlc_msgs,
update_fail_malformed_htlcs: Vec::new(),
update_fee: None,
commitment_signed: commitment_msg,
},
});
}
log_debug!(self.logger, "Forwarding HTLCs resulted in a commitment update with {} HTLCs added and {} HTLCs failed for channel {}",
add_htlc_msgs.len(), fail_htlc_msgs.len(), log_bytes!(chan.get().channel_id()));
channel_state.pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
node_id: chan.get().get_counterparty_node_id(),
updates: msgs::CommitmentUpdate {
update_add_htlcs: add_htlc_msgs,
update_fulfill_htlcs: Vec::new(),
update_fail_htlcs: fail_htlc_msgs,
update_fail_malformed_htlcs: Vec::new(),
update_fee: None,
commitment_signed: commitment_msg,
},
});
}
} else {
let err = Err(MsgHandleErrInternal::send_err_msg_no_close(format!("No such channel for the counterparty_node_id {}, as indicated by the short_to_id map", counterparty_node_id), forward_chan_id));
Expand Down