Skip to content

Commit

Permalink
[FIXED] Avoid concurrent consumer setLeader calls to run multiple lea…
Browse files Browse the repository at this point in the history
…ders. (#4703)

Be more stringent on setting the leader. Use atomic swap to make sure
only one is running as leader.
Had a situation where the consumer processed multiple setLeader calls
and had multiple loopAndGather routines running.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Oct 25, 2023
2 parents 8b734aa + 875f744 commit 8d9a6cd
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ type consumer struct {
// This will be checked in checkPending to abort processing
// and let ack be processed in priority.
awl int64
leader atomic.Bool
mu sync.RWMutex
js *jetStream
mset *stream
Expand Down Expand Up @@ -1131,19 +1132,21 @@ func (o *consumer) isLeader() bool {

func (o *consumer) setLeader(isLeader bool) {
o.mu.RLock()
mset := o.mset
isRunning := o.ackSub != nil
mset, closed := o.mset, o.closed
movingToClustered := o.node != nil && o.pch == nil
wasLeader := o.leader.Swap(isLeader)
o.mu.RUnlock()

// If we are here we have a change in leader status.
if isLeader {
if mset == nil {
if closed || mset == nil {
return
}
if isRunning {

if wasLeader {
// If we detect we are scaling up, make sure to create clustered routines and channels.
o.mu.Lock()
if o.node != nil && o.pch == nil {
if movingToClustered {
o.mu.Lock()
// We are moving from R1 to clustered.
o.pch = make(chan struct{}, 1)
go o.loopAndForwardProposals(o.qch)
Expand All @@ -1153,8 +1156,8 @@ func (o *consumer) setLeader(isLeader bool) {
default:
}
}
o.mu.Unlock()
}
o.mu.Unlock()
return
}

Expand Down

0 comments on commit 8d9a6cd

Please sign in to comment.