diff --git a/network/gossip.go b/network/gossip.go index b044949515..31b9409129 100644 --- a/network/gossip.go +++ b/network/gossip.go @@ -4,6 +4,7 @@ import ( "context" "errors" "reflect" + "sync" "sync/atomic" "github.com/hashicorp/go-hclog" @@ -22,10 +23,11 @@ const ( type Topic struct { logger hclog.Logger - topic *pubsub.Topic - typ reflect.Type - closeCh chan struct{} - closed *uint64 + topic *pubsub.Topic + typ reflect.Type + closeCh chan struct{} + closed *uint64 + waitGroup sync.WaitGroup } func (t *Topic) createObj() proto.Message { @@ -43,12 +45,14 @@ func (t *Topic) Close() { return } + close(t.closeCh) // close all subscribers + t.waitGroup.Wait() // wait for all the subscribers to finish + + // if all subscribers are finished, close the topic if t.topic != nil { t.topic.Close() t.topic = nil } - - close(t.closeCh) } func (t *Topic) Publish(obj proto.Message) error { @@ -75,6 +79,9 @@ func (t *Topic) Subscribe(handler func(obj interface{}, from peer.ID)) error { } func (t *Topic) readLoop(sub *pubsub.Subscription, handler func(obj interface{}, from peer.ID)) { + t.waitGroup.Add(1) + defer t.waitGroup.Done() + ctx, cancelFn := context.WithCancel(context.Background()) go func() { diff --git a/syncer/client.go b/syncer/client.go index 916de61654..14a45f4ab1 100644 --- a/syncer/client.go +++ b/syncer/client.go @@ -39,6 +39,9 @@ type syncPeerClient struct { shouldEmitBlocks bool // flag for emitting blocks in the topic closeCh chan struct{} closed *uint64 // ACTIVE == 0, CLOSED == non-zero. + + peerStatusUpdateChLock sync.Mutex + peerStatusUpdateChClosed bool } func NewSyncPeerClient( @@ -56,6 +59,9 @@ func NewSyncPeerClient( shouldEmitBlocks: true, closeCh: make(chan struct{}), closed: new(uint64), + + peerStatusUpdateChLock: sync.Mutex{}, + peerStatusUpdateChClosed: false, } } @@ -95,7 +101,10 @@ func (m *syncPeerClient) Close() { close(m.closeCh) } + m.peerStatusUpdateChLock.Lock() + m.peerStatusUpdateChClosed = true close(m.peerStatusUpdateCh) + m.peerStatusUpdateChLock.Unlock() } // DisablePublishingPeerStatus disables publishing own status via gossip @@ -210,16 +219,15 @@ func (m *syncPeerClient) handleStatusUpdate(obj interface{}, from peer.ID) { return } - if atomic.LoadUint64(m.closed) > 0 { - m.logger.Debug("received status from peer after client was closed, ignoring", "id", from) + m.peerStatusUpdateChLock.Lock() + defer m.peerStatusUpdateChLock.Unlock() - return - } - - m.peerStatusUpdateCh <- &NoForkPeer{ - ID: from, - Number: status.Number, - Distance: m.network.GetPeerDistance(from), + if !m.peerStatusUpdateChClosed { + m.peerStatusUpdateCh <- &NoForkPeer{ + ID: from, + Number: status.Number, + Distance: m.network.GetPeerDistance(from), + } } }