Skip to content

Commit

Permalink
[IMPROVED] Memory based streams and NRG behavior during server restar…
Browse files Browse the repository at this point in the history
…ts (#5506)

Improvements to catchups and health checks.

Improvements to handling snapshots for memory based wals.
With memory based wals we can not use snapshots on restarts, but we do
use them while they are running.
However if a server becomes a leader with no snapshot it will be forced
to stepdown when asked to catchup a follower. So we now inherit a
leaders snapshot.

Also when we tried to truncate on a mismatch, we needed to truncate the
previous index, not current.
When we fail due to the previous entry being compacted away, we would
reset. We now reset the wal to the prior index and use the truncate term
and index.

Lastly if we receive a heartbeat with correct index but newer term just
inherit.
For stream health checks for replicated streams make sure that the
monitor routine is running.
When waiting on consumer assignments at the beginning of the stream
monitor, make sure the consumer monitor is running as well if
replicated.

On a consumer snapshot, register pre-acks as needed.
On stream checkInterestState reset an empty stream to the low ack floor
from all consumers.

Last fix consistency bug with memstore when skipping msgs on empty
stream to ensure first == last + 1.

Signed-off-by: Derek Collison <[email protected]>

---------

Signed-off-by: Derek Collison <[email protected]>
  • Loading branch information
derekcollison authored Jun 10, 2024
1 parent 743292b commit 6f05a82
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 32 deletions.
54 changes: 43 additions & 11 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,12 +534,18 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
return false
}

// If we are catching up return false.
if mset.isCatchingUp() {
// If R1 we are good.
if node == nil {
return true
}

// Here we are a replicated stream.
// First make sure our monitor routine is running.
if !mset.isMonitorRunning() {
return false
}

if node == nil || node.Healthy() {
if node.Healthy() {
// Check if we are processing a snapshot and are catching up.
if !mset.isCatchingUp() {
return true
Expand All @@ -553,7 +559,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
js.restartStream(acc, sa)
}
}

return false
}

Expand Down Expand Up @@ -863,6 +868,8 @@ func (js *jetStream) setupMetaGroup() error {
atomic.StoreInt32(&js.clustered, 1)
c.registerWithAccount(sacc)

// Set to true before we start.
js.metaRecovering = true
js.srv.startGoRoutine(
js.monitorCluster,
pprofLabels{
Expand Down Expand Up @@ -2164,7 +2171,7 @@ func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPee
// Should only be called from monitorStream.
func (mset *stream) waitOnConsumerAssignments() {
mset.mu.RLock()
s, js, acc, sa, name := mset.srv, mset.js, mset.acc, mset.sa, mset.cfg.Name
s, js, acc, sa, name, replicas := mset.srv, mset.js, mset.acc, mset.sa, mset.cfg.Name, mset.cfg.Replicas
mset.mu.RUnlock()

if s == nil || js == nil || acc == nil || sa == nil {
Expand All @@ -2186,6 +2193,9 @@ func (mset *stream) waitOnConsumerAssignments() {
for _, o := range mset.getConsumers() {
// Make sure we are registered with our consumer assignment.
if ca := o.consumerAssignment(); ca != nil {
if replicas > 1 && !o.isMonitorRunning() {
break
}
numReady++
} else {
break
Expand Down Expand Up @@ -2373,7 +2383,8 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// since we process streams first then consumers as an asset class.
mset.waitOnConsumerAssignments()
// Setup a periodic check here.
cist = time.NewTicker(30 * time.Second)
// We will fire in 5s the first time then back off to 30s
cist = time.NewTicker(5 * time.Second)
cistc = cist.C
}

Expand Down Expand Up @@ -2496,6 +2507,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}

case <-cistc:
cist.Reset(30 * time.Second)
// We may be adjusting some things with consumers so do this in its own go routine.
go mset.checkInterestState()

Expand Down Expand Up @@ -4924,7 +4936,22 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
}
}
// Check our interest state if applicable.
o.checkStateForInterestStream()
if err := o.checkStateForInterestStream(); err == errAckFloorHigherThanLastSeq {
o.mu.RLock()
mset := o.mset
o.mu.RUnlock()
// Register pre-acks unless no state at all for the stream and we would create alot of pre-acks.
mset.mu.Lock()
var ss StreamState
mset.store.FastState(&ss)
// Only register if we have a valid FirstSeq.
if ss.FirstSeq > 0 {
for seq := ss.FirstSeq; seq < state.AckFloor.Stream; seq++ {
mset.registerPreAck(o, seq)
}
}
mset.mu.Unlock()
}
}

} else if e.Type == EntryRemovePeer {
Expand Down Expand Up @@ -8165,8 +8192,11 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
var sub *subscription
var err error

const activityInterval = 30 * time.Second
notActive := time.NewTimer(activityInterval)
const (
startInterval = 5 * time.Second
activityInterval = 30 * time.Second
)
notActive := time.NewTimer(startInterval)
defer notActive.Stop()

defer func() {
Expand Down Expand Up @@ -8249,7 +8279,7 @@ RETRY:
default:
}
}
notActive.Reset(activityInterval)
notActive.Reset(startInterval)

// Grab sync request again on failures.
if sreq == nil {
Expand Down Expand Up @@ -8294,8 +8324,10 @@ RETRY:
// Send our sync request.
b, _ := json.Marshal(sreq)
s.sendInternalMsgLocked(subject, reply, nil, b)

// Remember when we sent this out to avoid loop spins on errors below.
reqSendTime := time.Now()

// Clear our sync request.
sreq = nil

Expand Down Expand Up @@ -8844,7 +8876,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
done = maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait
if !done {
// Wait for a small bit.
time.Sleep(50 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
} else {
// GC friendly.
mw.Stop()
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2273,7 +2273,7 @@ func TestJetStreamClusterStreamLastSequenceResetAfterStorageWipe(t *testing.T) {
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
mset.store.FastState(&state)
if state.LastSeq != 222 {
return fmt.Errorf("%v LAST SEQ WRONG %d for %q - STATE %+v", s, state.LastSeq, stream, state)
return fmt.Errorf("%v Wrong last sequence %d for %q - State %+v", s, state.LastSeq, stream, state)
}
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (ms *memStore) SkipMsg() uint64 {
ms.state.LastSeq = seq
ms.state.LastTime = now
if ms.state.Msgs == 0 {
ms.state.FirstSeq = seq
ms.state.FirstSeq = seq + 1
ms.state.FirstTime = now
} else {
ms.dmap.Insert(seq)
Expand Down
25 changes: 25 additions & 0 deletions server/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,31 @@ func TestMemStorePurgeExWithDeletedMsgs(t *testing.T) {
require_Equal(t, state.Msgs, 1)
}

// When all messages are deleted we should have a state of first = last + 1.
func TestMemStoreDeleteAllFirstSequenceCheck(t *testing.T) {
cfg := &StreamConfig{
Name: "zzz",
Subjects: []string{"foo"},
Storage: MemoryStorage,
}
ms, err := newMemStore(cfg)
require_NoError(t, err)
defer ms.Stop()

msg := []byte("abc")
for i := 1; i <= 10; i++ {
ms.StoreMsg("foo", nil, msg)
}
for seq := uint64(1); seq <= 10; seq++ {
ms.RemoveMsg(seq)
}
var state StreamState
ms.FastState(&state)
require_Equal(t, state.FirstSeq, 11)
require_Equal(t, state.LastSeq, 10)
require_Equal(t, state.Msgs, 0)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
17 changes: 17 additions & 0 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3490,6 +3490,23 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
return health
}

// Are we still recovering meta layer?
if js.isMetaRecovering() {
if !details {
health.Status = na
health.Error = "JetStream is still recovering meta layer"

} else {
health.Errors = []HealthzError{
{
Type: HealthzErrorJetStream,
Error: "JetStream is still recovering meta layer",
},
}
}
return health
}

// Range across all accounts, the streams assigned to them, and the consumers.
// If they are assigned to this server check their status.
ourID := meta.ID()
Expand Down
149 changes: 149 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10569,3 +10569,152 @@ func TestNoRaceLargeNumDeletesStreamCatchups(t *testing.T) {
return nil
})
}

func TestNoRaceJetStreamClusterMemoryStreamLastSequenceResetAfterRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

numStreams := 250
var wg sync.WaitGroup
wg.Add(numStreams)

for i := 1; i <= numStreams; i++ {
go func(n int) {
defer wg.Done()
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST:%d", n),
Storage: nats.MemoryStorage,
Subjects: []string{fmt.Sprintf("foo.%d.*", n)},
Replicas: 3,
}, nats.MaxWait(30*time.Second))
require_NoError(t, err)
subj := fmt.Sprintf("foo.%d.bar", n)
for i := 0; i < 222; i++ {
js.Publish(subj, nil)
}
}(i)
}
wg.Wait()

// Make sure all streams have a snapshot in place to stress the snapshot logic for memory based streams.
for _, s := range c.servers {
for i := 1; i <= numStreams; i++ {
stream := fmt.Sprintf("TEST:%d", i)
mset, err := s.GlobalAccount().lookupStream(stream)
require_NoError(t, err)
node := mset.raftNode()
require_NotNil(t, node)
node.InstallSnapshot(mset.stateSnapshot())
}
}

// Do 5 rolling restarts waiting on healthz in between.
for i := 0; i < 5; i++ {
// Walk the servers and shut each down, and wipe the storage directory.
for _, s := range c.servers {
s.Shutdown()
s.WaitForShutdown()
s = c.restartServer(s)
checkFor(t, 30*time.Second, time.Second, func() error {
hs := s.healthz(nil)
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
// Make sure all streams are current after healthz returns ok.
for i := 1; i <= numStreams; i++ {
stream := fmt.Sprintf("TEST:%d", i)
mset, err := s.GlobalAccount().lookupStream(stream)
require_NoError(t, err)
var state StreamState
checkFor(t, 30*time.Second, time.Second, func() error {
mset.store.FastState(&state)
if state.LastSeq != 222 {
return fmt.Errorf("%v Wrong last sequence %d for %q - State %+v", s, state.LastSeq, stream, state)
}
return nil
})
}
}
}
}

func TestNoRaceJetStreamClusterMemoryWorkQueueLastSequenceResetAfterRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

numStreams := 50
var wg sync.WaitGroup
wg.Add(numStreams)

for i := 1; i <= numStreams; i++ {
go func(n int) {
defer wg.Done()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST:%d", n),
Storage: nats.MemoryStorage,
Retention: nats.WorkQueuePolicy,
Subjects: []string{fmt.Sprintf("foo.%d.*", n)},
Replicas: 3,
}, nats.MaxWait(30*time.Second))
require_NoError(t, err)
subj := fmt.Sprintf("foo.%d.bar", n)
for i := 0; i < 22; i++ {
js.Publish(subj, nil)
}
// Now consumer them all as well.
sub, err := js.PullSubscribe(subj, "wq")
require_NoError(t, err)
msgs, err := sub.Fetch(22, nats.MaxWait(20*time.Second))
require_NoError(t, err)
require_Equal(t, len(msgs), 22)
for _, m := range msgs {
err := m.AckSync()
require_NoError(t, err)
}
}(i)
}
wg.Wait()

// Do 2 rolling restarts waiting on healthz in between.
for i := 0; i < 2; i++ {
// Walk the servers and shut each down, and wipe the storage directory.
for _, s := range c.servers {
s.Shutdown()
s.WaitForShutdown()
s = c.restartServer(s)
checkFor(t, 30*time.Second, time.Second, func() error {
hs := s.healthz(nil)
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
// Make sure all streams are current after healthz returns ok.
for i := 1; i <= numStreams; i++ {
stream := fmt.Sprintf("TEST:%d", i)
mset, err := s.GlobalAccount().lookupStream(stream)
require_NoError(t, err)
var state StreamState
checkFor(t, 20*time.Second, time.Second, func() error {
mset.store.FastState(&state)
if state.LastSeq != 22 {
return fmt.Errorf("%v Wrong last sequence %d for %q - State %+v", s, state.LastSeq, stream, state)
}
if state.FirstSeq != 23 {
return fmt.Errorf("%v Wrong first sequence %d for %q - State %+v", s, state.FirstSeq, stream, state)
}
return nil
})
}
}
}
}
Loading

0 comments on commit 6f05a82

Please sign in to comment.