Skip to content

Commit

Permalink
EVM-584 Panic -> send on closed channel inside syncer (#1364)
Browse files Browse the repository at this point in the history
  • Loading branch information
igorcrevar authored Apr 7, 2023
1 parent eb45dba commit 3765811
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
19 changes: 13 additions & 6 deletions network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"reflect"
"sync"
"sync/atomic"

"github.com/hashicorp/go-hclog"
Expand All @@ -22,10 +23,11 @@ const (
type Topic struct {
logger hclog.Logger

topic *pubsub.Topic
typ reflect.Type
closeCh chan struct{}
closed *uint64
topic *pubsub.Topic
typ reflect.Type
closeCh chan struct{}
closed *uint64
waitGroup sync.WaitGroup
}

func (t *Topic) createObj() proto.Message {
Expand All @@ -43,12 +45,14 @@ func (t *Topic) Close() {
return
}

close(t.closeCh) // close all subscribers
t.waitGroup.Wait() // wait for all the subscribers to finish

// if all subscribers are finished, close the topic
if t.topic != nil {
t.topic.Close()
t.topic = nil
}

close(t.closeCh)
}

func (t *Topic) Publish(obj proto.Message) error {
Expand All @@ -75,6 +79,9 @@ func (t *Topic) Subscribe(handler func(obj interface{}, from peer.ID)) error {
}

func (t *Topic) readLoop(sub *pubsub.Subscription, handler func(obj interface{}, from peer.ID)) {
t.waitGroup.Add(1)
defer t.waitGroup.Done()

ctx, cancelFn := context.WithCancel(context.Background())

go func() {
Expand Down
26 changes: 17 additions & 9 deletions syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type syncPeerClient struct {
shouldEmitBlocks bool // flag for emitting blocks in the topic
closeCh chan struct{}
closed *uint64 // ACTIVE == 0, CLOSED == non-zero.

peerStatusUpdateChLock sync.Mutex
peerStatusUpdateChClosed bool
}

func NewSyncPeerClient(
Expand All @@ -56,6 +59,9 @@ func NewSyncPeerClient(
shouldEmitBlocks: true,
closeCh: make(chan struct{}),
closed: new(uint64),

peerStatusUpdateChLock: sync.Mutex{},
peerStatusUpdateChClosed: false,
}
}

Expand Down Expand Up @@ -95,7 +101,10 @@ func (m *syncPeerClient) Close() {
close(m.closeCh)
}

m.peerStatusUpdateChLock.Lock()
m.peerStatusUpdateChClosed = true
close(m.peerStatusUpdateCh)
m.peerStatusUpdateChLock.Unlock()
}

// DisablePublishingPeerStatus disables publishing own status via gossip
Expand Down Expand Up @@ -210,16 +219,15 @@ func (m *syncPeerClient) handleStatusUpdate(obj interface{}, from peer.ID) {
return
}

if atomic.LoadUint64(m.closed) > 0 {
m.logger.Debug("received status from peer after client was closed, ignoring", "id", from)
m.peerStatusUpdateChLock.Lock()
defer m.peerStatusUpdateChLock.Unlock()

return
}

m.peerStatusUpdateCh <- &NoForkPeer{
ID: from,
Number: status.Number,
Distance: m.network.GetPeerDistance(from),
if !m.peerStatusUpdateChClosed {
m.peerStatusUpdateCh <- &NoForkPeer{
ID: from,
Number: status.Number,
Distance: m.network.GetPeerDistance(from),
}
}
}

Expand Down

0 comments on commit 3765811

Please sign in to comment.