Skip to content

Commit

Permalink
Improvements to interest policy and workqueue clustered streams.
Browse files Browse the repository at this point in the history
Specifically around interest and limited streams that are clustered. Since we operate consumers and streams independently, when a stream is removing msgs when acked and acks are coming into different servers at different times, etc. Messages can get stranded.

We introduce a better check of the interest state based on a collective low ack floor that will purge our copy of the stream to catch up.
We run this periodically in monitorStream and during certain conditions.

Thanks to Wally for introducing the test that stresses this part of the system.

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison committed Apr 3, 2024
1 parent 29db045 commit b6ff2ef
Show file tree
Hide file tree
Showing 7 changed files with 536 additions and 72 deletions.
111 changes: 68 additions & 43 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,10 @@ func (o *consumer) setLeader(isLeader bool) {
} else if o.srv.gateway.enabled {
stopAndClearTimer(&o.gwdtmr)
}
// If we were the leader make sure to drain queued up acks.
if wasLeader {
o.ackMsgs.drain()
}
o.mu.Unlock()

// Unregister as a leader with our parent stream.
Expand Down Expand Up @@ -2534,19 +2538,18 @@ func (o *consumer) applyState(state *ConsumerState) {
return
}

// If o.sseq is greater don't update. Don't go backwards on o.sseq.
if o.sseq <= state.Delivered.Stream {
// If o.sseq is greater don't update. Don't go backwards on o.sseq if leader.
if !o.isLeader() || o.sseq <= state.Delivered.Stream {
o.sseq = state.Delivered.Stream + 1
}
o.dseq = state.Delivered.Consumer + 1

o.adflr = state.AckFloor.Consumer
o.asflr = state.AckFloor.Stream
o.pending = state.Pending
o.rdc = state.Redelivered

// Setup tracking timer if we have restored pending.
if len(o.pending) > 0 {
if o.isLeader() && len(o.pending) > 0 {
// This is on startup or leader change. We want to check pending
// sooner in case there are inconsistencies etc. Pick between 500ms - 1.5s
delay := 500*time.Millisecond + time.Duration(rand.Int63n(1000))*time.Millisecond
Expand Down Expand Up @@ -2788,6 +2791,12 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
return
}

mset := o.mset
if mset == nil || mset.closed.Load() {
o.mu.Unlock()
return
}

var sagap uint64
var needSignal bool

Expand All @@ -2803,17 +2812,20 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
delete(o.pending, sseq)
// Use the original deliver sequence from our pending record.
dseq = p.Sequence
}
if len(o.pending) == 0 {
o.adflr, o.asflr = o.dseq-1, o.sseq-1
} else if dseq == o.adflr+1 {
o.adflr, o.asflr = dseq, sseq
for ss := sseq + 1; ss < o.sseq; ss++ {
if p, ok := o.pending[ss]; ok {
if p.Sequence > 0 {
o.adflr, o.asflr = p.Sequence-1, ss-1
// Only move floors if we matched an existing pending.
if dseq == o.adflr+1 {
o.adflr, o.asflr = dseq, sseq
for ss := sseq + 1; ss < o.sseq; ss++ {
if p, ok := o.pending[ss]; ok {
if p.Sequence > 0 {
o.adflr, o.asflr = p.Sequence-1, ss-1
}
break
}
break
}
// If nothing left set to current delivered.
if len(o.pending) == 0 {
o.adflr, o.asflr = o.dseq-1, o.sseq-1
}
}
}
Expand Down Expand Up @@ -2845,7 +2857,6 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, doSample bool) {
// Update underlying store.
o.updateAcks(dseq, sseq)

mset := o.mset
clustered := o.node != nil

// In case retention changes for a stream, this ought to have been updated
Expand Down Expand Up @@ -3092,10 +3103,16 @@ func (wq *waitQueue) add(wr *waitingRequest) error {
}

func (wq *waitQueue) isFull() bool {
if wq == nil {
return false
}
return wq.n == wq.max
}

func (wq *waitQueue) isEmpty() bool {
if wq == nil {
return true
}
return wq.n == 0
}

Expand Down Expand Up @@ -3627,7 +3644,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
break
}
}

}

// Don't sort the o.subjf if it's only one entry
Expand Down Expand Up @@ -4034,8 +4050,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
pmsg, dc, err = o.getNextMsg()

// We can release the lock now under getNextMsg so need to check this condition again here.
// consumer is closed when mset is set to nil.
if o.mset == nil {
if o.closed || o.mset == nil {
o.mu.Unlock()
return
}
Expand Down Expand Up @@ -4286,50 +4301,55 @@ func (o *consumer) streamNumPending() uint64 {
o.npc, o.npf = 0, 0
return 0
}
npc, npf := o.calculateNumPending()
o.npc, o.npf = int64(npc), npf
return o.numPending()
}

// Will calculate num pending but only requires a read lock.
// Depends on delivery policy, for last per subject we calculate differently.
// At least RLock should be held.
func (o *consumer) calculateNumPending() (npc, npf uint64) {
if o.mset == nil || o.mset.store == nil {
return 0, 0
}

isLastPerSubject := o.cfg.DeliverPolicy == DeliverLastPerSubject

// Deliver Last Per Subject calculates num pending differently.
if isLastPerSubject {
o.npc, o.npf = 0, 0
// Consumer without filters.
if o.subjf == nil {
npc, npf := o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject)
o.npc, o.npf = int64(npc), npf
return o.numPending()
return o.mset.store.NumPending(o.sseq, _EMPTY_, isLastPerSubject)
}
// Consumer with filters.
for _, filter := range o.subjf {
npc, npf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject)
o.npc += int64(npc)
if npf > o.npf {
o.npf = npf // Always last
lnpc, lnpf := o.mset.store.NumPending(o.sseq, filter.subject, isLastPerSubject)
npc += lnpc
if lnpf > npf {
npf = lnpf // Always last
}
}
return o.numPending()
return npc, npf
}
// Every other Delivery Policy is handled here.
// Consumer without filters.
if o.subjf == nil {
npc, npf := o.mset.store.NumPending(o.sseq, o.cfg.FilterSubject, isLastPerSubject)
o.npc, o.npf = int64(npc), npf
return o.numPending()
return o.mset.store.NumPending(o.sseq, _EMPTY_, false)
}
// Consumer with filters.
o.npc, o.npf = 0, 0
for _, filter := range o.subjf {
// We might loose state of o.subjf, so if we do recover from o.sseq
if filter.currentSeq < o.sseq {
filter.currentSeq = o.sseq
}
npc, npf := o.mset.store.NumPending(filter.currentSeq, filter.subject, isLastPerSubject)
o.npc += int64(npc)
if npf > o.npf {
o.npf = npf // Always last
lnpc, lnpf := o.mset.store.NumPending(filter.currentSeq, filter.subject, false)
npc += lnpc
if lnpf > npf {
npf = lnpf // Always last
}
}

return o.numPending()
return npc, npf
}

func convertToHeadersOnly(pmsg *jsPubMsg) {
Expand Down Expand Up @@ -4381,6 +4401,10 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,

// Cant touch pmsg after this sending so capture what we need.
seq, ts := pmsg.seq, pmsg.ts

// Update delivered first.
o.updateDelivered(dseq, seq, dc, ts)

// Send message.
o.outq.send(pmsg)

Expand All @@ -4401,9 +4425,6 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
o.waiting.last = time.Now()
}

// FIXME(dlc) - Capture errors?
o.updateDelivered(dseq, seq, dc, ts)

// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && rp != LimitsPolicy {
if o.node == nil || o.cfg.Direct {
Expand Down Expand Up @@ -4646,7 +4667,11 @@ func (o *consumer) checkPending() {
check := len(o.pending) > 1024
for seq, p := range o.pending {
if check && atomic.LoadInt64(&o.awl) > 0 {
o.ptmr.Reset(100 * time.Millisecond)
if o.ptmr == nil {
o.ptmr = time.AfterFunc(100*time.Millisecond, o.checkPending)
} else {
o.ptmr.Reset(100 * time.Millisecond)
}
return
}
// Check if these are no longer valid.
Expand Down Expand Up @@ -5391,7 +5416,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {

// If it was pending process it like an ack.
if wasPending {
// We could have lock for stream so do this in a go routine.
// We could have the lock for the stream so do this in a go routine.
// TODO(dlc) - We should do this with ipq vs naked go routines.
go o.processTerm(sseq, p.Sequence, rdc, ackTermUnackedLimitsReason)
}
Expand Down Expand Up @@ -5549,7 +5574,7 @@ func (o *consumer) checkStateForInterestStream() error {
return errAckFloorHigherThanLastSeq
}

for seq := ss.FirstSeq; seq <= asflr; seq++ {
for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ {
mset.ackMsg(o, seq)
}

Expand Down
8 changes: 2 additions & 6 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,10 +1410,6 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
if !cfg.Created.IsZero() {
obs.setCreatedTime(cfg.Created)
}
lseq := e.mset.lastSeq()
obs.mu.Lock()
err = obs.readStoredState(lseq)
obs.mu.Unlock()
if err != nil {
s.Warnf(" Error restoring consumer %q state: %v", cfg.Name, err)
}
Expand Down Expand Up @@ -2290,8 +2286,8 @@ func (jsa *jsAccount) delete() {
jsa.templates = nil
jsa.mu.Unlock()

for _, ms := range streams {
ms.stop(false, false)
for _, mset := range streams {
mset.stop(false, false)
}

for _, t := range ts {
Expand Down
12 changes: 11 additions & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1946,7 +1946,17 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
return
}
}
config := mset.config()

mset.mu.RLock()
config := mset.cfg
checkAcks := config.Retention != LimitsPolicy && mset.hasLimitsSet()
mset.mu.RUnlock()

// Check if we are a clustered interest retention stream with limits.
// If so, check ack floors against our state.
if checkAcks {
mset.checkInterestState()
}

resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
Expand Down
7 changes: 7 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8471,6 +8471,7 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
mset.mu.RLock()
sysc, js, sa, config := mset.sysc, mset.srv.js.Load(), mset.sa, mset.cfg
isLeader := mset.isLeader()
checkAcks := isLeader && config.Retention != LimitsPolicy && mset.isClustered() && mset.hasLimitsSet()
mset.mu.RUnlock()

// By design all members will receive this. Normally we only want the leader answering.
Expand All @@ -8484,6 +8485,12 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
time.Sleep(500 * time.Millisecond)
}

// Check if we are a clustered interest retention stream with limits.
// If so, check ack floors against our state.
if checkAcks {
mset.checkInterestState()
}

si := &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Expand Down
Loading

0 comments on commit b6ff2ef

Please sign in to comment.