Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't use a context for closing the ObservedAddrManager #1175

Merged
merged 3 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ type IDService struct {

ctx context.Context
ctxCancel context.CancelFunc
// ensure we shutdown ONLY once
closeSync sync.Once
// track resources that need to be shut down before we shut down
refCount sync.WaitGroup

Expand Down Expand Up @@ -126,25 +124,23 @@ func NewIDService(h host.Host, opts ...Option) (*IDService, error) {
userAgent = cfg.userAgent
}

hostCtx, cancel := context.WithCancel(context.Background())
s := &IDService{
Host: h,
UserAgent: userAgent,

ctx: hostCtx,
ctxCancel: cancel,
conns: make(map[network.Conn]chan struct{}),
conns: make(map[network.Conn]chan struct{}),

disableSignedPeerRecord: cfg.disableSignedPeerRecord,

addPeerHandlerCh: make(chan addPeerHandlerReq),
rmPeerHandlerCh: make(chan rmPeerHandlerReq),
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

// handle local protocol handler updates, and push deltas to peers.
var err error

observedAddrs, err := NewObservedAddrManager(hostCtx, h)
observedAddrs, err := NewObservedAddrManager(h)
if err != nil {
return nil, fmt.Errorf("failed to create observed address manager: %s", err)
}
Expand Down Expand Up @@ -276,10 +272,8 @@ func (ids *IDService) loop() {

// Close shuts down the IDService
func (ids *IDService) Close() error {
ids.closeSync.Do(func() {
ids.ctxCancel()
ids.refCount.Wait()
})
ids.ctxCancel()
ids.refCount.Wait()
return nil
}

Expand Down
42 changes: 23 additions & 19 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ type newObservation struct {
type ObservedAddrManager struct {
host host.Host

closeOnce sync.Once
refCount sync.WaitGroup
ctx context.Context // the context is canceled when Close is called
ctxCancel context.CancelFunc

// latest observation from active connections
// we'll "re-observe" these when we gc
activeConnsMu sync.Mutex
Expand All @@ -123,7 +128,7 @@ type ObservedAddrManager struct {

// NewObservedAddrManager returns a new address manager using
// peerstore.OwnObservedAddressTTL as the TTL.
func NewObservedAddrManager(ctx context.Context, host host.Host) (*ObservedAddrManager, error) {
func NewObservedAddrManager(host host.Host) (*ObservedAddrManager, error) {
oas := &ObservedAddrManager{
addrs: make(map[string][]*observedAddr),
ttl: peerstore.OwnObservedAddrTTL,
Expand All @@ -133,6 +138,7 @@ func NewObservedAddrManager(ctx context.Context, host host.Host) (*ObservedAddrM
// refresh every ttl/2 so we don't forget observations from connected peers
refreshTimer: time.NewTimer(peerstore.OwnObservedAddrTTL / 2),
}
oas.ctx, oas.ctxCancel = context.WithCancel(context.Background())

reachabilitySub, err := host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
if err != nil {
Expand All @@ -147,7 +153,8 @@ func NewObservedAddrManager(ctx context.Context, host host.Host) (*ObservedAddrM
oas.emitNATDeviceTypeChanged = emitter

oas.host.Network().Notify((*obsAddrNotifiee)(oas))
go oas.worker(ctx)
oas.refCount.Add(1)
go oas.worker()
return oas, nil
}

Expand Down Expand Up @@ -239,22 +246,12 @@ func (oas *ObservedAddrManager) Record(conn network.Conn, observed ma.Multiaddr)
}
}

func (oas *ObservedAddrManager) teardown() {
oas.host.Network().StopNotify((*obsAddrNotifiee)(oas))
oas.reachabilitySub.Close()

oas.mu.Lock()
oas.refreshTimer.Stop()
oas.mu.Unlock()
}

func (oas *ObservedAddrManager) worker(ctx context.Context) {
defer oas.teardown()
func (oas *ObservedAddrManager) worker() {
defer oas.refCount.Done()

ticker := time.NewTicker(GCInterval)
defer ticker.Stop()

hostClosing := oas.host.Network().Process().Closing()
subChan := oas.reachabilitySub.Out()
for {
select {
Expand All @@ -265,17 +262,13 @@ func (oas *ObservedAddrManager) worker(ctx context.Context) {
}
ev := evt.(event.EvtLocalReachabilityChanged)
oas.reachability = ev.Reachability

case obs := <-oas.wch:
oas.maybeRecordObservation(obs.conn, obs.observed)

case <-ticker.C:
oas.gc()
case <-oas.refreshTimer.C:
oas.refresh()
case <-hostClosing:
return
case <-ctx.Done():
case <-oas.ctx.Done():
return
}
}
Expand Down Expand Up @@ -534,6 +527,17 @@ func (oas *ObservedAddrManager) emitSpecificNATType(addrs []*observedAddr, proto
return false, 0
}

func (oas *ObservedAddrManager) Close() error {
oas.closeOnce.Do(func() {
oas.ctxCancel()
oas.refCount.Wait()
oas.reachabilitySub.Close()
oas.refreshTimer.Stop()
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
oas.host.Network().StopNotify((*obsAddrNotifiee)(oas))
})
return nil
}

// observerGroup is a function that determines what part of
// a multiaddr counts as a different observer. for example,
// two ipfs nodes at the same IP/TCP transport would get
Expand Down
17 changes: 7 additions & 10 deletions p2p/protocol/identify/obsaddr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,11 @@ func (h *harness) observeInbound(observed ma.Multiaddr, observer peer.ID) networ
func newHarness(ctx context.Context, t *testing.T) harness {
mn := mocknet.New(ctx)
sk, err := p2putil.RandTestBogusPrivateKey()
if err != nil {
t.Fatal(err)
}

require.NoError(t, err)
h, err := mn.AddPeer(sk, ma.StringCast("/ip4/127.0.0.1/tcp/10086"))
if err != nil {
t.Fatal(err)
}

oas, err := identify.NewObservedAddrManager(ctx, h)
require.NoError(t, err)

oas, err := identify.NewObservedAddrManager(h)
require.NoError(t, err)
return harness{
oas: oas,
mocknet: mn,
Expand Down Expand Up @@ -142,6 +135,7 @@ func TestObsAddrSet(t *testing.T) {
defer cancel()

harness := newHarness(ctx, t)
defer harness.oas.Close()

if !addrsMatch(harness.oas.Addrs(), nil) {
t.Error("addrs should be empty")
Expand Down Expand Up @@ -243,6 +237,7 @@ func TestObservedAddrFiltering(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
harness := newHarness(ctx, t)
defer harness.oas.Close()
require.Empty(t, harness.oas.Addrs())

// IP4/TCP
Expand Down Expand Up @@ -344,6 +339,7 @@ func TestEmitNATDeviceTypeSymmetric(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
harness := newHarness(ctx, t)
defer harness.oas.Close()
require.Empty(t, harness.oas.Addrs())
emitter, err := harness.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)
require.NoError(t, err)
Expand Down Expand Up @@ -390,6 +386,7 @@ func TestEmitNATDeviceTypeCone(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
harness := newHarness(ctx, t)
defer harness.oas.Close()
require.Empty(t, harness.oas.Addrs())
emitter, err := harness.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)
require.NoError(t, err)
Expand Down