Skip to content

Commit

Permalink
emit event for peer disconnectionsa and act upon them in the blocksyn…
Browse files Browse the repository at this point in the history
…c peer tracker
  • Loading branch information
vyzo committed Nov 6, 2020
1 parent 426c2e8 commit cd71644
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 9 deletions.
21 changes: 18 additions & 3 deletions chain/exchange/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/fx"
"go.uber.org/multierr"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/peermgr"
Expand Down Expand Up @@ -38,20 +39,34 @@ func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeer
pmgr: pmgr,
}

sub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer))
addSub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer))
if err != nil {
panic(err)
}

go func() {
for newPeer := range sub.Out() {
for newPeer := range addSub.Out() {
bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id)
}
}()

rmSub, err := h.EventBus().Subscribe(new(peermgr.RemoveFilPeer))
if err != nil {
panic(err)
}

go func() {
for rmPeer := range rmSub.Out() {
bsPt.removePeer(rmPeer.(peermgr.RemoveFilPeer).Id)
}
}()

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return sub.Close()
return multierr.Combine(
addSub.Close(),
rmSub.Close(),
)
},
})

Expand Down
33 changes: 27 additions & 6 deletions lib/peermgr/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type PeerMgr struct {
dht *dht.IpfsDHT

notifee *net.NotifyBundle
filPeerEmitter event.Emitter
addPeerEmitter event.Emitter
rmPeerEmitter event.Emitter

done chan struct{}
}
Expand All @@ -63,6 +64,10 @@ type NewFilPeer struct {
Id peer.ID
}

type RemoveFilPeer struct {
Id peer.ID
}

func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) {
pm := &PeerMgr{
h: h,
Expand All @@ -81,12 +86,19 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
if err != nil {
return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err)
}
pm.filPeerEmitter = emitter
pm.addPeerEmitter = emitter

emitter, err = h.EventBus().Emitter(new(RemoveFilPeer))
if err != nil {
return nil, xerrors.Errorf("creating RemoveFilPeer emitter: %w", err)
}
pm.rmPeerEmitter = emitter

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return multierr.Combine(
pm.filPeerEmitter.Close(),
pm.addPeerEmitter.Close(),
pm.rmPeerEmitter.Close(),
pm.Stop(ctx),
)
},
Expand All @@ -104,7 +116,7 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
}

func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
_ = pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck
_ = pmgr.addPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
pmgr.peers[p] = time.Duration(0)
Expand All @@ -127,10 +139,19 @@ func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) {
}

func (pmgr *PeerMgr) Disconnect(p peer.ID) {
disconnected := false

if pmgr.h.Network().Connectedness(p) == net.NotConnected {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
delete(pmgr.peers, p)
_, disconnected = pmgr.peers[p]
if disconnected {
delete(pmgr.peers, p)
}
pmgr.peersLk.Unlock()
}

if disconnected {
_ = pmgr.rmPeerEmitter.Emit(RemoveFilPeer{Id: p}) //nolint:errcheck
}
}

Expand Down

0 comments on commit cd71644

Please sign in to comment.