diff --git a/p2p/net/connmgr/connmgr.go b/p2p/net/connmgr/connmgr.go index c3587ad4e3..3a1acf0db0 100644 --- a/p2p/net/connmgr/connmgr.go +++ b/p2p/net/connmgr/connmgr.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "github.com/benbjohnson/clock" "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -27,6 +28,8 @@ var log = logging.Logger("connmgr") type BasicConnMgr struct { *decayer + clock clock.Clock + cfg *config segments segments @@ -74,7 +77,7 @@ func (ss *segments) countPeers() (count int) { return count } -func (s *segment) tagInfoFor(p peer.ID) *peerInfo { +func (s *segment) tagInfoFor(p peer.ID, now time.Time) *peerInfo { pi, ok := s.peers[p] if ok { return pi @@ -82,7 +85,7 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo { // create a temporary peer to buffer early tags before the Connected notification arrives. pi = &peerInfo{ id: p, - firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives. + firstSeen: now, // this timestamp will be updated when the first Connected notification arrives. temp: true, tags: make(map[string]int), decaying: make(map[*decayingTag]*connmgr.DecayingValue), @@ -102,6 +105,7 @@ func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) { lowWater: low, gracePeriod: time.Minute, silencePeriod: 10 * time.Second, + clock: clock.New(), } for _, o := range opts { if err := o(cfg); err != nil { @@ -116,6 +120,7 @@ func NewConnManager(low, hi int, opts ...Option) (*BasicConnMgr, error) { cm := &BasicConnMgr{ cfg: cfg, + clock: cfg.clock, protected: make(map[peer.ID]map[string]struct{}, 16), segments: func() (ret segments) { for i := range ret { @@ -167,7 +172,7 @@ func (cm *BasicConnMgr) memoryEmergency() { // finally, update the last trim time. cm.lastTrimMu.Lock() - cm.lastTrim = time.Now() + cm.lastTrim = cm.clock.Now() cm.lastTrimMu.Unlock() } @@ -311,7 +316,7 @@ func (cm *BasicConnMgr) background() { interval = cm.cfg.silencePeriod } - ticker := time.NewTicker(interval) + ticker := cm.clock.Ticker(interval) defer ticker.Stop() for { @@ -336,7 +341,7 @@ func (cm *BasicConnMgr) doTrim() { if count == atomic.LoadUint64(&cm.trimCount) { cm.trim() cm.lastTrimMu.Lock() - cm.lastTrim = time.Now() + cm.lastTrim = cm.clock.Now() cm.lastTrimMu.Unlock() atomic.AddUint64(&cm.trimCount, 1) } @@ -427,7 +432,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn { candidates := make(peerInfos, 0, cm.segments.countPeers()) var ncandidates int - gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod) + gracePeriodStart := cm.clock.Now().Add(-cm.cfg.gracePeriod) cm.plk.RLock() for _, s := range cm.segments { @@ -529,7 +534,7 @@ func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) { s.Lock() defer s.Unlock() - pi := s.tagInfoFor(p) + pi := s.tagInfoFor(p, cm.clock.Now()) // Update the total value of the peer. pi.value += val - pi.tags[tag] @@ -559,7 +564,7 @@ func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) { s.Lock() defer s.Unlock() - pi := s.tagInfoFor(p) + pi := s.tagInfoFor(p, cm.clock.Now()) oldval := pi.tags[tag] newval := upsert(oldval) @@ -629,7 +634,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) { if !ok { pinfo = &peerInfo{ id: id, - firstSeen: time.Now(), + firstSeen: cm.clock.Now(), tags: make(map[string]int), decaying: make(map[*decayingTag]*connmgr.DecayingValue), conns: make(map[network.Conn]time.Time), @@ -640,7 +645,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) { // Connected notification arrived: flip the temporary flag, and update the firstSeen // timestamp to the real one. pinfo.temp = false - pinfo.firstSeen = time.Now() + pinfo.firstSeen = cm.clock.Now() } _, ok = pinfo.conns[c] @@ -649,7 +654,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) { return } - pinfo.conns[c] = time.Now() + pinfo.conns[c] = cm.clock.Now() atomic.AddInt32(&cm.connCount, 1) } diff --git a/p2p/net/connmgr/connmgr_test.go b/p2p/net/connmgr/connmgr_test.go index 92c273c5f0..312bdc1f3b 100644 --- a/p2p/net/connmgr/connmgr_test.go +++ b/p2p/net/connmgr/connmgr_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/benbjohnson/clock" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -411,7 +412,8 @@ func TestDisconnected(t *testing.T) { func TestGracePeriod(t *testing.T) { const gp = 100 * time.Millisecond - cm, err := NewConnManager(10, 20, WithGracePeriod(gp), WithSilencePeriod(time.Hour)) + mockClock := clock.NewMock() + cm, err := NewConnManager(10, 20, WithGracePeriod(gp), WithSilencePeriod(time.Hour), WithClock(mockClock)) require.NoError(t, err) defer cm.Close() @@ -425,7 +427,7 @@ func TestGracePeriod(t *testing.T) { conns = append(conns, rc) not.Connected(nil, rc) - time.Sleep(2 * gp) + mockClock.Add(2 * gp) if rc.(*tconn).isClosed() { t.Fatal("expected conn to remain open") @@ -447,7 +449,7 @@ func TestGracePeriod(t *testing.T) { } } - time.Sleep(200 * time.Millisecond) + mockClock.Add(200 * time.Millisecond) cm.TrimOpenConns(context.Background()) @@ -465,7 +467,8 @@ func TestGracePeriod(t *testing.T) { // see https://github.com/libp2p/go-libp2p-connmgr/issues/23 func TestQuickBurstRespectsSilencePeriod(t *testing.T) { - cm, err := NewConnManager(10, 20, WithGracePeriod(0)) + mockClock := clock.NewMock() + cm, err := NewConnManager(10, 20, WithGracePeriod(0), WithClock(mockClock)) require.NoError(t, err) defer cm.Close() not := cm.Notifee() @@ -480,7 +483,7 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) { } // wait for a few seconds - time.Sleep(time.Second * 3) + mockClock.Add(3 * time.Second) // only the first trim is allowed in; make sure we close at most 20 connections, not all of them. var closed int diff --git a/p2p/net/connmgr/decay.go b/p2p/net/connmgr/decay.go index 9841c2f1fa..0819bd2136 100644 --- a/p2p/net/connmgr/decay.go +++ b/p2p/net/connmgr/decay.go @@ -221,7 +221,7 @@ func (d *decayer) process() { s := d.mgr.segments.get(peer) s.Lock() - p := s.tagInfoFor(peer) + p := s.tagInfoFor(peer, d.clock.Now()) v, ok := p.decaying[tag] if !ok { v = &connmgr.DecayingValue{ @@ -244,7 +244,7 @@ func (d *decayer) process() { s := d.mgr.segments.get(rm.peer) s.Lock() - p := s.tagInfoFor(rm.peer) + p := s.tagInfoFor(rm.peer, d.clock.Now()) v, ok := p.decaying[rm.tag] if !ok { s.Unlock() diff --git a/p2p/net/connmgr/options.go b/p2p/net/connmgr/options.go index 76b4ef386d..cde1fd792a 100644 --- a/p2p/net/connmgr/options.go +++ b/p2p/net/connmgr/options.go @@ -3,6 +3,8 @@ package connmgr import ( "errors" "time" + + "github.com/benbjohnson/clock" ) // config is the configuration struct for the basic connection manager. @@ -13,6 +15,7 @@ type config struct { silencePeriod time.Duration decayer *DecayerCfg emergencyTrim bool + clock clock.Clock } // Option represents an option for the basic connection manager. @@ -26,6 +29,14 @@ func DecayerConfig(opts *DecayerCfg) Option { } } +// WithClock sets the internal clock impl +func WithClock(c clock.Clock) Option { + return func(cfg *config) error { + cfg.clock = c + return nil + } +} + // WithGracePeriod sets the grace period. // The grace period is the time a newly opened connection is given before it becomes // subject to pruning.