Skip to content

Commit

Permalink
Merge pull request #19 from libp2p/fix/16
Browse files Browse the repository at this point in the history
fix: serialize publishing
  • Loading branch information
magik6k authored Jun 27, 2019
2 parents 0988e77 + 02effd2 commit 0a6e078
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt
out.nodes[i] = n
}, func(n *node) {
if n.keepLast {
l := n.last.Load()
l := n.last
if l == nil {
return
}
Expand Down Expand Up @@ -223,15 +223,15 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve

type node struct {
// Note: make sure to NEVER lock basicBus.lk when this lock is held
lk sync.RWMutex
lk sync.Mutex

typ reflect.Type

// emitter ref count
nEmitters int32

keepLast bool
last atomic.Value
last interface{}

sinks []chan interface{}
}
Expand All @@ -248,13 +248,13 @@ func (n *node) emit(event interface{}) {
panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, typ))
}

n.lk.RLock()
n.lk.Lock()
if n.keepLast {
n.last.Store(event)
n.last = event
}

for _, ch := range n.sinks {
ch <- event
}
n.lk.RUnlock()
n.lk.Unlock()
}

0 comments on commit 0a6e078

Please sign in to comment.