Skip to content

Commit

Permalink
Merge pull request lightningnetwork#7186 from yyforyongyu/fix-missing…
Browse files Browse the repository at this point in the history
…-channel-updates

Fix potential channel announcements missing
  • Loading branch information
guggero authored Dec 12, 2022
2 parents 72d97e9 + 1206174 commit d9febbb
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 184 deletions.
211 changes: 119 additions & 92 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ func (d *AuthenticatedGossiper) Stop() error {

func (d *AuthenticatedGossiper) stop() {
log.Info("Authenticated Gossiper is stopping")
defer log.Info("Authenticated Gossiper stopped")

d.blockEpochs.Cancel()

Expand Down Expand Up @@ -1187,12 +1188,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
announcement.msg.MsgType(),
announcement.isRemote)

// We should only broadcast this message forward if it
// originated from us or it wasn't received as part of
// our initial historical sync.
shouldBroadcast := !announcement.isRemote ||
d.syncMgr.IsGraphSynced()

switch announcement.msg.(type) {
// Channel announcement signatures are amongst the only
// messages that we'll process serially.
Expand Down Expand Up @@ -1231,70 +1226,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
validationBarrier.InitJobDependencies(announcement.msg)

d.wg.Add(1)
go func() {
defer d.wg.Done()
defer validationBarrier.CompleteJob()

// If this message has an existing dependency,
// then we'll wait until that has been fully
// validated before we proceed.
err := validationBarrier.WaitForDependants(
announcement.msg,
)
if err != nil {
if !routing.IsError(
err,
routing.ErrVBarrierShuttingDown,
routing.ErrParentValidationFailed,
) {

log.Warnf("unexpected error "+
"during validation "+
"barrier shutdown: %v",
err)
}
announcement.err <- err
return
}

// Process the network announcement to
// determine if this is either a new
// announcement from our PoV or an edges to a
// prior vertex/edge we previously proceeded.
emittedAnnouncements, allowDependents := d.processNetworkAnnouncement(
announcement,
)

log.Tracef("Processed network message %s, "+
"returned len(announcements)=%v, "+
"allowDependents=%v",
announcement.msg.MsgType(),
len(emittedAnnouncements),
allowDependents)

// If this message had any dependencies, then
// we can now signal them to continue.
validationBarrier.SignalDependants(
announcement.msg, allowDependents,
)

// If the announcement was accepted, then add
// the emitted announcements to our announce
// batch to be broadcast once the trickle timer
// ticks gain.
if emittedAnnouncements != nil && shouldBroadcast {
// TODO(roasbeef): exclude peer that
// sent.
announcements.AddMsgs(
emittedAnnouncements...,
)
} else if emittedAnnouncements != nil {
log.Trace("Skipping broadcast of " +
"announcements received " +
"during initial graph sync")
}

}()
go d.handleNetworkMessages(
announcement, &announcements, validationBarrier,
)

// The trickle timer has ticked, which indicates we should
// flush to the network the pending batch of new announcements
Expand Down Expand Up @@ -1359,6 +1293,67 @@ func (d *AuthenticatedGossiper) networkHandler() {
}
}

// handleNetworkMessages is responsible for waiting for dependencies for a
// given network message and processing the message. Once processed, it will
// signal its dependants and add the new announcements to the announce batch.
//
// NOTE: must be run as a goroutine.
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
deDuped *deDupedAnnouncements, vb *routing.ValidationBarrier) {

defer d.wg.Done()
defer vb.CompleteJob()

// We should only broadcast this message forward if it originated from
// us or it wasn't received as part of our initial historical sync.
shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()

// If this message has an existing dependency, then we'll wait until
// that has been fully validated before we proceed.
err := vb.WaitForDependants(nMsg.msg)
if err != nil {
log.Debugf("Validating network message %s got err: %v",
nMsg.msg.MsgType(), err)

if !routing.IsError(
err,
routing.ErrVBarrierShuttingDown,
routing.ErrParentValidationFailed,
) {

log.Warnf("unexpected error during validation "+
"barrier shutdown: %v", err)
}
nMsg.err <- err

return
}

// Process the network announcement to determine if this is either a
// new announcement from our PoV or an edges to a prior vertex/edge we
// previously proceeded.
newAnns, allow := d.processNetworkAnnouncement(nMsg)

log.Tracef("Processed network message %s, returned "+
"len(announcements)=%v, allowDependents=%v",
nMsg.msg.MsgType(), len(newAnns), allow)

// If this message had any dependencies, then we can now signal them to
// continue.
vb.SignalDependants(nMsg.msg, allow)

// If the announcement was accepted, then add the emitted announcements
// to our announce batch to be broadcast once the trickle timer ticks
// gain.
if newAnns != nil && shouldBroadcast {
// TODO(roasbeef): exclude peer that sent.
deDuped.AddMsgs(newAnns...)
} else if newAnns != nil {
log.Trace("Skipping broadcast of announcements received " +
"during initial graph sync")
}
}

// TODO(roasbeef): d/c peers that send updates not on our chain

// InitSyncState is called by outside sub-systems when a connection is
Expand Down Expand Up @@ -1824,10 +1819,6 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
nMsg *networkMsg) ([]networkMsg, bool) {

log.Debugf("Processing network message: peer=%v, source=%x, msg=%s, "+
"is_remote=%v", nMsg.peer, nMsg.source.SerializeCompressed(),
nMsg.msg.MsgType(), nMsg.isRemote)

// If this is a remote update, we set the scheduler option to lazily
// add it to the graph.
var schedulerOp []batch.SchedulerOption
Expand Down Expand Up @@ -1947,7 +1938,7 @@ func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
}
if err != nil {
log.Debugf("Unable to retrieve channel=%v from graph: "+
"%v", err)
"%v", chanInfo.ChannelID, err)
return false
}

Expand Down Expand Up @@ -2145,6 +2136,9 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,

timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)

log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)

// We'll quickly ask the router if it already has a newer update for
// this node so we can skip validating signatures if not required.
if d.cfg.Router.IsStaleNode(nodeAnn.NodeID, timestamp) {
Expand Down Expand Up @@ -2199,6 +2193,10 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,

nMsg.err <- nil
// TODO(roasbeef): get rid of the above

log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)

return announcements, true
}

Expand All @@ -2207,6 +2205,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
ann *lnwire.ChannelAnnouncement,
ops []batch.SchedulerOption) ([]networkMsg, bool) {

log.Debugf("Processing ChannelAnnouncement: peer=%v, short_chan_id=%v",
nMsg.peer, ann.ShortChannelID.ToUint64())

// We'll ignore any channel announcements that target any chain other
// than the set of chains we know of.
if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
Expand Down Expand Up @@ -2327,6 +2328,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
}
}

log.Debugf("Adding edge for short_chan_id: %v",
ann.ShortChannelID.ToUint64())

// We will add the edge to the channel router. If the nodes present in
// this channel are not present in the database, a partial node will be
// added to represent each node while we wait for a node announcement.
Expand All @@ -2338,6 +2342,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
err := d.cfg.Router.AddEdge(edge, ops...)
if err != nil {
log.Debugf("Router rejected edge for short_chan_id(%v): %v",
ann.ShortChannelID.ToUint64(), err)

defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())

// If the edge was rejected due to already being known, then it
Expand All @@ -2359,19 +2366,20 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
return nil, false
}

log.Debugf("Extracted %v announcements from rejected "+
"msgs", len(anns))

// If while processing this rejected edge, we realized
// there's a set of announcements we could extract,
// then we'll return those directly.
if len(anns) != 0 {
nMsg.err <- nil
return anns, true
}
//
// NOTE: since this is an ErrIgnored, we can return
// true here to signal "allow" to its dependants.
nMsg.err <- nil

// Otherwise, this is just a regular rejected edge.
log.Debugf("Router rejected channel edge: %v", err)
return anns, true
} else {
log.Debugf("Router rejected channel edge: %v", err)

// Otherwise, this is just a regular rejected edge.
key := newRejectCacheKey(
ann.ShortChannelID.ToUint64(),
sourceToPub(nMsg.source),
Expand All @@ -2386,6 +2394,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
// If err is nil, release the lock immediately.
d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())

log.Debugf("Finish adding edge for short_chan_id: %v",
ann.ShortChannelID.ToUint64())

// If we earlier received any ChannelUpdates for this channel, we can
// now process them, as the channel is added to the graph.
shortChanID := ann.ShortChannelID.ToUint64()
Expand Down Expand Up @@ -2456,6 +2467,10 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
}

nMsg.err <- nil

log.Debugf("Processed ChannelAnnouncement: peer=%v, short_chan_id=%v",
nMsg.peer, ann.ShortChannelID.ToUint64())

return announcements, true
}

Expand All @@ -2464,6 +2479,9 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
upd *lnwire.ChannelUpdate,
ops []batch.SchedulerOption) ([]networkMsg, bool) {

log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
nMsg.peer, upd.ShortChannelID.ToUint64())

// We'll ignore any channel updates that target any chain other than
// the set of chains we know of.
if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
Expand Down Expand Up @@ -2523,10 +2541,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
graphScid, timestamp, upd.ChannelFlags,
) {

log.Debugf("Ignored stale edge policy: peer=%v, source=%x, "+
"msg=%s, is_remote=%v", nMsg.peer,
nMsg.source.SerializeCompressed(), nMsg.msg.MsgType(),
nMsg.isRemote,
log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
"peer=%v, source=%x, msg=%s, is_remote=%v", shortChanID,
nMsg.peer, nMsg.source.SerializeCompressed(),
nMsg.msg.MsgType(), nMsg.isRemote,
)

nMsg.err <- nil
Expand Down Expand Up @@ -2649,6 +2667,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
edgeToUpdate = e2
}

log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+
"edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(),
edgeToUpdate != nil)

// Validate the channel announcement with the expected public key and
// channel capacity. In the case of an invalid channel update, we'll
// return an error to the caller and exit early.
Expand Down Expand Up @@ -2743,7 +2765,8 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
routing.ErrVBarrierShuttingDown,
) {

log.Debug(err)
log.Debugf("Update edge for short_chan_id(%v) got: %v",
shortChanID, err)
} else {
// Since we know the stored SCID in the graph, we'll
// cache that SCID.
Expand All @@ -2753,7 +2776,8 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
)
_, _ = d.recentRejects.Put(key, &cachedReject{})

log.Error(err)
log.Errorf("Update edge for short_chan_id(%v) got: %v",
shortChanID, err)
}

nMsg.err <- err
Expand Down Expand Up @@ -2801,8 +2825,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
)

log.Debugf("The message %v has no AuthProof, sending the "+
"update to remote peer %x", upd.MsgType(),
remotePubKey)
"update to remote peer %x", upd.MsgType(), remotePubKey)

// Now we'll attempt to send the channel update message
// reliably to the remote peer in the background, so that we
Expand Down Expand Up @@ -2832,6 +2855,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
}

nMsg.err <- nil

log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
"timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
timestamp)
return announcements, true
}

Expand All @@ -2848,7 +2875,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
prefix = "remote"
}

log.Infof("Received new %v channel announcement for %v", prefix,
log.Infof("Received new %v announcement signature for %v", prefix,
ann.ShortChannelID)

// By the specification, channel announcement proofs should be sent
Expand Down
3 changes: 3 additions & 0 deletions discovery/reliable_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (s *reliableSender) Start() error {
// Stop halts the reliable sender from sending messages to peers.
func (s *reliableSender) Stop() {
s.stop.Do(func() {
log.Debugf("reliableSender is stopping")
defer log.Debugf("reliableSender stopped")

close(s.quit)
s.wg.Wait()
})
Expand Down
3 changes: 3 additions & 0 deletions discovery/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ func (m *SyncManager) Start() {
// Stop stops the SyncManager from performing its duties.
func (m *SyncManager) Stop() {
m.stop.Do(func() {
log.Debugf("SyncManager is stopping")
defer log.Debugf("SyncManager stopped")

close(m.quit)
m.wg.Wait()

Expand Down
3 changes: 3 additions & 0 deletions discovery/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ func (g *GossipSyncer) Start() {
// exited.
func (g *GossipSyncer) Stop() {
g.stopped.Do(func() {
log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:])
defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:])

close(g.quit)
g.wg.Wait()
})
Expand Down
Loading

0 comments on commit d9febbb

Please sign in to comment.