Skip to content

Commit

Permalink
don't use a context for closing the ObservedAddressManager
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Sep 5, 2021
1 parent 07dbdf2 commit 60b5098
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 34 deletions.
8 changes: 3 additions & 5 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,23 @@ func NewIDService(h host.Host, opts ...Option) (*IDService, error) {
userAgent = cfg.userAgent
}

hostCtx, cancel := context.WithCancel(context.Background())
s := &IDService{
Host: h,
UserAgent: userAgent,

ctx: hostCtx,
ctxCancel: cancel,
conns: make(map[network.Conn]chan struct{}),
conns: make(map[network.Conn]chan struct{}),

disableSignedPeerRecord: cfg.disableSignedPeerRecord,

addPeerHandlerCh: make(chan addPeerHandlerReq),
rmPeerHandlerCh: make(chan rmPeerHandlerReq),
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

// handle local protocol handler updates, and push deltas to peers.
var err error

observedAddrs, err := NewObservedAddrManager(hostCtx, h)
observedAddrs, err := NewObservedAddrManager(h)
if err != nil {
return nil, fmt.Errorf("failed to create observed address manager: %s", err)
}
Expand Down
38 changes: 19 additions & 19 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ type newObservation struct {
type ObservedAddrManager struct {
host host.Host

refCount sync.WaitGroup
ctx context.Context // the context is canceled when Close is called
ctxCancel context.CancelFunc

// latest observation from active connections
// we'll "re-observe" these when we gc
activeConnsMu sync.Mutex
Expand All @@ -123,7 +127,7 @@ type ObservedAddrManager struct {

// NewObservedAddrManager returns a new address manager using
// peerstore.OwnObservedAddressTTL as the TTL.
func NewObservedAddrManager(ctx context.Context, host host.Host) (*ObservedAddrManager, error) {
func NewObservedAddrManager(host host.Host) (*ObservedAddrManager, error) {
oas := &ObservedAddrManager{
addrs: make(map[string][]*observedAddr),
ttl: peerstore.OwnObservedAddrTTL,
Expand All @@ -133,6 +137,7 @@ func NewObservedAddrManager(ctx context.Context, host host.Host) (*ObservedAddrM
// refresh every ttl/2 so we don't forget observations from connected peers
refreshTimer: time.NewTimer(peerstore.OwnObservedAddrTTL / 2),
}
oas.ctx, oas.ctxCancel = context.WithCancel(context.Background())

reachabilitySub, err := host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
if err != nil {
Expand All @@ -147,7 +152,8 @@ func NewObservedAddrManager(ctx context.Context, host host.Host) (*ObservedAddrM
oas.emitNATDeviceTypeChanged = emitter

oas.host.Network().Notify((*obsAddrNotifiee)(oas))
go oas.worker(ctx)
oas.refCount.Add(1)
go oas.worker()
return oas, nil
}

Expand Down Expand Up @@ -239,22 +245,12 @@ func (oas *ObservedAddrManager) Record(conn network.Conn, observed ma.Multiaddr)
}
}

func (oas *ObservedAddrManager) teardown() {
oas.host.Network().StopNotify((*obsAddrNotifiee)(oas))
oas.reachabilitySub.Close()

oas.mu.Lock()
oas.refreshTimer.Stop()
oas.mu.Unlock()
}

func (oas *ObservedAddrManager) worker(ctx context.Context) {
defer oas.teardown()
func (oas *ObservedAddrManager) worker() {
defer oas.refCount.Done()

ticker := time.NewTicker(GCInterval)
defer ticker.Stop()

hostClosing := oas.host.Network().Process().Closing()
subChan := oas.reachabilitySub.Out()
for {
select {
Expand All @@ -265,17 +261,13 @@ func (oas *ObservedAddrManager) worker(ctx context.Context) {
}
ev := evt.(event.EvtLocalReachabilityChanged)
oas.reachability = ev.Reachability

case obs := <-oas.wch:
oas.maybeRecordObservation(obs.conn, obs.observed)

case <-ticker.C:
oas.gc()
case <-oas.refreshTimer.C:
oas.refresh()
case <-hostClosing:
return
case <-ctx.Done():
case <-oas.ctx.Done():
return
}
}
Expand Down Expand Up @@ -534,6 +526,14 @@ func (oas *ObservedAddrManager) emitSpecificNATType(addrs []*observedAddr, proto
return false, 0
}

func (oas *ObservedAddrManager) Close() error {
oas.ctxCancel()
oas.refCount.Wait()
oas.reachabilitySub.Close()
oas.refreshTimer.Stop()
return nil
}

// observerGroup is a function that determines what part of
// a multiaddr counts as a different observer. for example,
// two ipfs nodes at the same IP/TCP transport would get
Expand Down
17 changes: 7 additions & 10 deletions p2p/protocol/identify/obsaddr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,11 @@ func (h *harness) observeInbound(observed ma.Multiaddr, observer peer.ID) networ
func newHarness(ctx context.Context, t *testing.T) harness {
mn := mocknet.New(ctx)
sk, err := p2putil.RandTestBogusPrivateKey()
if err != nil {
t.Fatal(err)
}

require.NoError(t, err)
h, err := mn.AddPeer(sk, ma.StringCast("/ip4/127.0.0.1/tcp/10086"))
if err != nil {
t.Fatal(err)
}

oas, err := identify.NewObservedAddrManager(ctx, h)
require.NoError(t, err)

oas, err := identify.NewObservedAddrManager(h)
require.NoError(t, err)
return harness{
oas: oas,
mocknet: mn,
Expand Down Expand Up @@ -142,6 +135,7 @@ func TestObsAddrSet(t *testing.T) {
defer cancel()

harness := newHarness(ctx, t)
defer harness.oas.Close()

if !addrsMatch(harness.oas.Addrs(), nil) {
t.Error("addrs should be empty")
Expand Down Expand Up @@ -243,6 +237,7 @@ func TestObservedAddrFiltering(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
harness := newHarness(ctx, t)
defer harness.oas.Close()
require.Empty(t, harness.oas.Addrs())

// IP4/TCP
Expand Down Expand Up @@ -344,6 +339,7 @@ func TestEmitNATDeviceTypeSymmetric(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
harness := newHarness(ctx, t)
defer harness.oas.Close()
require.Empty(t, harness.oas.Addrs())
emitter, err := harness.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)
require.NoError(t, err)
Expand Down Expand Up @@ -390,6 +386,7 @@ func TestEmitNATDeviceTypeCone(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
harness := newHarness(ctx, t)
defer harness.oas.Close()
require.Empty(t, harness.oas.Addrs())
emitter, err := harness.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)
require.NoError(t, err)
Expand Down

0 comments on commit 60b5098

Please sign in to comment.