Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chanbackup: update on-disk backup file with unconfirmed channels #3993

Merged
merged 5 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions channel_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
)

Expand Down Expand Up @@ -51,6 +52,35 @@ func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{
quit := make(chan struct{})
chanUpdates := make(chan chanbackup.ChannelEvent, 1)

// sendChanOpenUpdate is a closure that sends a ChannelEvent to the
// chanUpdates channel to inform subscribers about new pending or
// confirmed channels.
sendChanOpenUpdate := func(newOrPendingChan *channeldb.OpenChannel) {
nodeAddrs, err := c.addrs.AddrsForNode(
newOrPendingChan.IdentityPub,
)
if err != nil {
pub := newOrPendingChan.IdentityPub
ltndLog.Errorf("unable to fetch addrs for %x: %v",
pub.SerializeCompressed(), err)
}

chanEvent := chanbackup.ChannelEvent{
NewChans: []chanbackup.ChannelWithAddrs{
{
OpenChannel: newOrPendingChan,
Addrs: nodeAddrs,
},
},
}

select {
case chanUpdates <- chanEvent:
case <-quit:
return
}
}

// In order to adhere to the interface, we'll proxy the events from the
// channel notifier to the sub-swapper in a format it understands.
go func() {
Expand All @@ -74,37 +104,18 @@ func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{
// TODO(roasbeef): batch dispatch ntnfs

switch event := e.(type) {

// A new channel has been opened, we'll obtain
// the node address, then send to the
// A new channel has been opened and is still
// pending. We can still create a backup, even
// if the final channel ID is not yet available.
case channelnotifier.PendingOpenChannelEvent:
pendingChan := event.PendingChannel
sendChanOpenUpdate(pendingChan)

// A new channel has been confirmed, we'll
// obtain the node address, then send to the
// sub-swapper.
case channelnotifier.OpenChannelEvent:
nodeAddrs, err := c.addrs.AddrsForNode(
event.Channel.IdentityPub,
)
if err != nil {
pub := event.Channel.IdentityPub
ltndLog.Errorf("unable to "+
"fetch addrs for %x: %v",
pub.SerializeCompressed(),
err)
}

channel := event.Channel
chanEvent := chanbackup.ChannelEvent{
NewChans: []chanbackup.ChannelWithAddrs{
{
OpenChannel: channel,
Addrs: nodeAddrs,
},
},
}

select {
case chanUpdates <- chanEvent:
case <-quit:
return
}
sendChanOpenUpdate(event.Channel)

// An existing channel has been closed, we'll
// send only the chanPoint of the closed
Expand Down
24 changes: 19 additions & 5 deletions channelnotifier/channelnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@ type ChannelNotifier struct {
// PendingOpenChannelEvent represents a new event where a new channel has
// entered a pending open state.
type PendingOpenChannelEvent struct {
// ChannelPoint is the channelpoint for the new channel.
// ChannelPoint is the channel outpoint for the new channel.
ChannelPoint *wire.OutPoint

// PendingChannel is the channel configuration for the newly created
// channel. This might not have been persisted to the channel DB yet
// because we are still waiting for the final message from the remote
// peer.
PendingChannel *channeldb.OpenChannel
}

// OpenChannelEvent represents a new event where a channel goes from pending
Expand Down Expand Up @@ -89,10 +95,18 @@ func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
return c.ntfnServer.Subscribe()
}

// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine that a
// new channel is pending.
func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint) {
event := PendingOpenChannelEvent{ChannelPoint: &chanPoint}
// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine
// that a new channel is pending. The pending channel is passed as a parameter
// instead of read from the database because it might not yet have been
// persisted to the DB because we still wait for the final message from the
// remote peer.
func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint,
pendingChan *channeldb.OpenChannel) {

event := PendingOpenChannelEvent{
ChannelPoint: &chanPoint,
PendingChannel: pendingChan,
}

if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send pending open channel update: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions fundingmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ type fundingConfig struct {

// NotifyPendingOpenChannelEvent informs the ChannelNotifier when channels
// enter a pending state.
NotifyPendingOpenChannelEvent func(wire.OutPoint)
NotifyPendingOpenChannelEvent func(wire.OutPoint, *channeldb.OpenChannel)
}

// fundingManager acts as an orchestrator/bridge between the wallet's
Expand Down Expand Up @@ -1697,7 +1697,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {

// Inform the ChannelNotifier that the channel has entered
// pending open state.
f.cfg.NotifyPendingOpenChannelEvent(fundingOut)
f.cfg.NotifyPendingOpenChannelEvent(fundingOut, completeChan)
guggero marked this conversation as resolved.
Show resolved Hide resolved

// At this point we have sent our last funding message to the
// initiating peer before the funding transaction will be broadcast.
Expand Down Expand Up @@ -1845,7 +1845,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
case resCtx.updates <- upd:
// Inform the ChannelNotifier that the channel has entered
// pending open state.
f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint)
f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint, completeChan)
case <-f.quit:
return
}
Expand Down
66 changes: 62 additions & 4 deletions fundingmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
Expand All @@ -48,6 +49,10 @@ const (
// testPollSleepMs is the number of milliseconds to sleep between
// each attempt to access the database to check its state.
testPollSleepMs = 500

// maxPending is the maximum number of channels we allow opening to the
// same peer in the max pending channels test.
maxPending = 4
)

var (
Expand Down Expand Up @@ -138,6 +143,24 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte,
}, nil
}

type mockChanEvent struct {
openEvent chan wire.OutPoint
pendingOpenEvent chan channelnotifier.PendingOpenChannelEvent
}

func (m *mockChanEvent) NotifyOpenChannelEvent(outpoint wire.OutPoint) {
m.openEvent <- outpoint
}

func (m *mockChanEvent) NotifyPendingOpenChannelEvent(outpoint wire.OutPoint,
pendingChannel *channeldb.OpenChannel) {

m.pendingOpenEvent <- channelnotifier.PendingOpenChannelEvent{
ChannelPoint: &outpoint,
PendingChannel: pendingChannel,
}
}

type testNode struct {
privKey *btcec.PrivateKey
addr *lnwire.NetAddress
Expand All @@ -147,6 +170,7 @@ type testNode struct {
fundingMgr *fundingManager
newChannels chan *newChannelMsg
mockNotifier *mockNotifier
mockChanEvent *mockChanEvent
testDir string
shutdownChannel chan struct{}
remoteFeatures []lnwire.FeatureBit
Expand Down Expand Up @@ -274,6 +298,17 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
bestHeight: fundingBroadcastHeight,
}

// The mock channel event notifier will receive events for each pending
guggero marked this conversation as resolved.
Show resolved Hide resolved
// open and open channel. Because some tests will create multiple
// channels in a row before advancing to the next step, these channels
// need to be buffered.
evt := &mockChanEvent{
openEvent: make(chan wire.OutPoint, maxPending),
pendingOpenEvent: make(
chan channelnotifier.PendingOpenChannelEvent, maxPending,
),
}

dbDir := filepath.Join(tempTestDir, "cdb")
cdb, err := channeldb.Open(dbDir)
if err != nil {
Expand Down Expand Up @@ -379,9 +414,9 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
ZombieSweeperInterval: 1 * time.Hour,
ReservationTimeout: 1 * time.Nanosecond,
MaxPendingChannels: DefaultMaxPendingChannels,
NotifyOpenChannelEvent: func(wire.OutPoint) {},
NotifyOpenChannelEvent: evt.NotifyOpenChannelEvent,
OpenChannelPredicate: chainedAcceptor,
NotifyPendingOpenChannelEvent: func(wire.OutPoint) {},
NotifyPendingOpenChannelEvent: evt.NotifyPendingOpenChannelEvent,
}

for _, op := range options {
Expand All @@ -404,6 +439,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
publTxChan: publTxChan,
fundingMgr: f,
mockNotifier: chainNotifier,
mockChanEvent: evt,
testDir: tempTestDir,
shutdownChannel: shutdownChan,
addr: addr,
Expand Down Expand Up @@ -685,6 +721,18 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
t.Fatalf("alice did not publish funding tx")
}

// Make sure the notification about the pending channel was sent out.
select {
case <-alice.mockChanEvent.pendingOpenEvent:
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send pending channel event")
}
select {
case <-bob.mockChanEvent.pendingOpenEvent:
case <-time.After(time.Second * 5):
t.Fatalf("bob did not send pending channel event")
}

// Finally, make sure neither have active reservation for the channel
// now pending open in the database.
assertNumPendingReservations(t, alice, bobPubKey, 0)
Expand Down Expand Up @@ -867,6 +915,18 @@ func assertMarkedOpen(t *testing.T, alice, bob *testNode,
fundingOutPoint *wire.OutPoint) {
t.Helper()

// Make sure the notification about the pending channel was sent out.
select {
case <-alice.mockChanEvent.openEvent:
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send open channel event")
}
select {
case <-bob.mockChanEvent.openEvent:
case <-time.After(time.Second * 5):
t.Fatalf("bob did not send open channel event")
}

assertDatabaseState(t, alice, fundingOutPoint, markedOpen)
assertDatabaseState(t, bob, fundingOutPoint, markedOpen)
}
Expand Down Expand Up @@ -2558,8 +2618,6 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
func TestFundingManagerMaxPendingChannels(t *testing.T) {
t.Parallel()

const maxPending = 4

alice, bob := setupFundingManagers(
t, func(cfg *fundingConfig) {
cfg.MaxPendingChannels = maxPending
Expand Down
Loading