Skip to content

Commit

Permalink
Merge pull request #4018 from breez/intercept-forward-htlc
Browse files Browse the repository at this point in the history
Intercept forward htlc
  • Loading branch information
joostjager authored Jun 22, 2020
2 parents acc698a + 7b56268 commit 8f2a2fc
Show file tree
Hide file tree
Showing 20 changed files with 1,747 additions and 281 deletions.
170 changes: 170 additions & 0 deletions htlcswitch/interceptable_switch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package htlcswitch

import (
"fmt"
"sync"

"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
)

var (
// ErrFwdNotExists is an error returned when the caller tries to resolve
// a forward that doesn't exist anymore.
ErrFwdNotExists = errors.New("forward does not exist")
)

// InterceptableSwitch is an implementation of ForwardingSwitch interface.
// This implementation is used like a proxy that wraps the switch and
// intercepts forward requests. A reference to the Switch is held in order
// to communicate back the interception result where the options are:
// Resume - forwards the original request to the switch as is.
// Settle - routes UpdateFulfillHTLC to the originating link.
// Fail - routes UpdateFailHTLC to the originating link.
type InterceptableSwitch struct {
sync.RWMutex

// htlcSwitch is the underline switch
htlcSwitch *Switch

// fwdInterceptor is the callback that is called for each forward of
// an incoming htlc. It should return true if it is interested in handling
// it.
fwdInterceptor ForwardInterceptor
}

// NewInterceptableSwitch returns an instance of InterceptableSwitch.
func NewInterceptableSwitch(s *Switch) *InterceptableSwitch {
return &InterceptableSwitch{htlcSwitch: s}
}

// SetInterceptor sets the ForwardInterceptor to be used.
func (s *InterceptableSwitch) SetInterceptor(
interceptor ForwardInterceptor) {

s.Lock()
defer s.Unlock()
s.fwdInterceptor = interceptor
}

// ForwardPackets attempts to forward the batch of htlcs through the
// switch, any failed packets will be returned to the provided
// ChannelLink. The link's quit signal should be provided to allow
// cancellation of forwarding during link shutdown.
func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{},
packets ...*htlcPacket) error {

var interceptor ForwardInterceptor
s.Lock()
interceptor = s.fwdInterceptor
s.Unlock()

// Optimize for the case we don't have an interceptor.
if interceptor == nil {
return s.htlcSwitch.ForwardPackets(linkQuit, packets...)
}

var notIntercepted []*htlcPacket
for _, p := range packets {
if !s.interceptForward(p, interceptor, linkQuit) {
notIntercepted = append(notIntercepted, p)
}
}
return s.htlcSwitch.ForwardPackets(linkQuit, notIntercepted...)
}

// interceptForward checks if there is any external interceptor interested in
// this packet. Currently only htlc type of UpdateAddHTLC that are forwarded
// are being checked for interception. It can be extended in the future given
// the right use case.
func (s *InterceptableSwitch) interceptForward(packet *htlcPacket,
interceptor ForwardInterceptor, linkQuit chan struct{}) bool {

switch htlc := packet.htlc.(type) {
case *lnwire.UpdateAddHTLC:
// We are not interested in intercepting initated payments.
if packet.incomingChanID == hop.Source {
return false
}

intercepted := &interceptedForward{
linkQuit: linkQuit,
htlc: htlc,
packet: packet,
htlcSwitch: s.htlcSwitch,
}

// If this htlc was intercepted, don't handle the forward.
return interceptor(intercepted)
default:
return false
}
}

// interceptedForward implements the InterceptedForward interface.
// It is passed from the switch to external interceptors that are interested
// in holding forwards and resolve them manually.
type interceptedForward struct {
linkQuit chan struct{}
htlc *lnwire.UpdateAddHTLC
packet *htlcPacket
htlcSwitch *Switch
}

// Packet returns the intercepted htlc packet.
func (f *interceptedForward) Packet() lnwire.UpdateAddHTLC {
return *f.htlc
}

// CircuitKey returns the circuit key for the intercepted htlc.
func (f *interceptedForward) CircuitKey() channeldb.CircuitKey {
return channeldb.CircuitKey{
ChanID: f.packet.incomingChanID,
HtlcID: f.packet.incomingHTLCID,
}
}

// Resume resumes the default behavior as if the packet was not intercepted.
func (f *interceptedForward) Resume() error {
return f.htlcSwitch.ForwardPackets(f.linkQuit, f.packet)
}

// Fail forward a failed packet to the switch.
func (f *interceptedForward) Fail() error {
reason, err := f.packet.obfuscator.EncryptFirstHop(lnwire.NewTemporaryChannelFailure(nil))
if err != nil {
return fmt.Errorf("failed to encrypt failure reason %v", err)
}
return f.resolve(&lnwire.UpdateFailHTLC{
Reason: reason,
})
}

// Settle forwards a settled packet to the switch.
func (f *interceptedForward) Settle(preimage lntypes.Preimage) error {
if !preimage.Matches(f.htlc.PaymentHash) {
return errors.New("preimage does not match hash")
}
return f.resolve(&lnwire.UpdateFulfillHTLC{
PaymentPreimage: preimage,
})
}

// resolve is used for both Settle and Fail and forwards the message to the
// switch.
func (f *interceptedForward) resolve(message lnwire.Message) error {
pkt := &htlcPacket{
incomingChanID: f.packet.incomingChanID,
incomingHTLCID: f.packet.incomingHTLCID,
outgoingChanID: f.packet.outgoingChanID,
outgoingHTLCID: f.packet.outgoingHTLCID,
isResolution: true,
circuit: f.packet.circuit,
htlc: message,
obfuscator: f.packet.obfuscator,
}
return f.htlcSwitch.mailOrchestrator.Deliver(pkt.incomingChanID, pkt)
}
40 changes: 40 additions & 0 deletions htlcswitch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,46 @@ type TowerClient interface {
BackupState(*lnwire.ChannelID, *lnwallet.BreachRetribution, bool) error
}

// InterceptableHtlcForwarder is the interface to set the interceptor
// implementation that intercepts htlc forwards.
type InterceptableHtlcForwarder interface {
// SetInterceptor sets a ForwardInterceptor.
SetInterceptor(interceptor ForwardInterceptor)
}

// ForwardInterceptor is a function that is invoked from the switch for every
// incoming htlc that is intended to be forwarded. It is passed with the
// InterceptedForward that contains the information about the packet and a way
// to resolve it manually later in case it is held.
// The return value indicates if this handler will take control of this forward
// and resolve it later or let the switch execute its default behavior.
type ForwardInterceptor func(InterceptedForward) bool

// InterceptedForward is passed to the ForwardInterceptor for every forwarded
// htlc. It contains all the information about the packet which accordingly
// the interceptor decides if to hold or not.
// In addition this interface allows a later resolution by calling either
// Resume, Settle or Fail.
type InterceptedForward interface {
// CircuitKey returns the intercepted packet.
CircuitKey() channeldb.CircuitKey

// Packet returns the intercepted packet.
Packet() lnwire.UpdateAddHTLC

// Resume notifies the intention to resume an existing hold forward. This
// basically means the caller wants to resume with the default behavior for
// this htlc which usually means forward it.
Resume() error

// Settle notifies the intention to settle an existing hold
// forward with a given preimage.
Settle(lntypes.Preimage) error

// Fails notifies the intention to fail an existing hold forward
Fail() error
}

// htlcNotifier is an interface which represents the input side of the
// HtlcNotifier which htlc events are piped through. This interface is intended
// to allow for mocking of the htlcNotifier in tests, so is unexported because
Expand Down
12 changes: 7 additions & 5 deletions htlcswitch/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ type ChannelLinkConfig struct {
Switch *Switch

// ForwardPackets attempts to forward the batch of htlcs through the
// switch, any failed packets will be returned to the provided
// ChannelLink. The link's quit signal should be provided to allow
// switch. The function returns and error in case it fails to send one or
// more packets. The link's quit signal should be provided to allow
// cancellation of forwarding during link shutdown.
ForwardPackets func(chan struct{}, ...*htlcPacket) chan error
ForwardPackets func(chan struct{}, ...*htlcPacket) error

// DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
// blobs, which are then used to inform how to forward an HTLC.
Expand Down Expand Up @@ -2971,8 +2971,10 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) {
filteredPkts = append(filteredPkts, pkt)
}

errChan := l.cfg.ForwardPackets(l.quit, filteredPkts...)
go handleBatchFwdErrs(errChan, l.log)
if err := l.cfg.ForwardPackets(l.quit, filteredPkts...); err != nil {
log.Errorf("Unhandled error while reforwarding htlc "+
"settle/fail over htlcswitch: %v", err)
}
}

// sendHTLCError functions cancels HTLC and send cancel message back to the
Expand Down
10 changes: 6 additions & 4 deletions htlcswitch/mailbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type mailBoxConfig struct {
// forwardPackets send a varidic number of htlcPackets to the switch to
// be routed. A quit channel should be provided so that the call can
// properly exit during shutdown.
forwardPackets func(chan struct{}, ...*htlcPacket) chan error
forwardPackets func(chan struct{}, ...*htlcPacket) error

// clock is a time source for the mailbox.
clock clock.Clock
Expand Down Expand Up @@ -680,8 +680,10 @@ func (m *memoryMailBox) FailAdd(pkt *htlcPacket) {
},
}

errChan := m.cfg.forwardPackets(m.quit, failPkt)
go handleBatchFwdErrs(errChan, log)
if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil {
log.Errorf("Unhandled error while reforwarding packets "+
"settle/fail over htlcswitch: %v", err)
}
}

// MessageOutBox returns a channel that any new messages ready for delivery
Expand Down Expand Up @@ -734,7 +736,7 @@ type mailOrchConfig struct {
// forwardPackets send a varidic number of htlcPackets to the switch to
// be routed. A quit channel should be provided so that the call can
// properly exit during shutdown.
forwardPackets func(chan struct{}, ...*htlcPacket) chan error
forwardPackets func(chan struct{}, ...*htlcPacket) error

// fetchUpdate retreives the most recent channel update for the channel
// this mailbox belongs to.
Expand Down
15 changes: 4 additions & 11 deletions htlcswitch/mailbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,13 @@ func newMailboxContext(t *testing.T, startTime time.Time,
}

func (c *mailboxContext) forward(_ chan struct{},
pkts ...*htlcPacket) chan error {
pkts ...*htlcPacket) error {

for _, pkt := range pkts {
c.forwards <- pkt
}

errChan := make(chan error)
close(errChan)

return errChan
return nil
}

func (c *mailboxContext) sendAdds(start, num int) []*htlcPacket {
Expand Down Expand Up @@ -555,12 +552,8 @@ func TestMailOrchestrator(t *testing.T) {
}, nil
},
forwardPackets: func(_ chan struct{},
pkts ...*htlcPacket) chan error {
// Close the channel immediately so the goroutine
// logging errors can exit.
errChan := make(chan error)
close(errChan)
return errChan
pkts ...*htlcPacket) error {
return nil
},
clock: clock.NewTestClock(time.Now()),
expiry: testExpiry,
Expand Down
Loading

0 comments on commit 8f2a2fc

Please sign in to comment.