Skip to content

Commit

Permalink
NRG: Don't delete RAFT state if stream/consumer creation failed durin…
Browse files Browse the repository at this point in the history
…g shutdown

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen committed Oct 31, 2024
1 parent 1ee2b8a commit 6d1ae91
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
17 changes: 16 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2831,7 +2831,7 @@ func (mset *stream) resetClusteredState(err error) bool {

// If we detect we are shutting down just return.
if js != nil && js.isShuttingDown() {
s.Debugf("Will not reset stream, jetstream shutting down")
s.Debugf("Will not reset stream, JetStream shutting down")
return false
}

Expand Down Expand Up @@ -3842,6 +3842,14 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme

// This is an error condition.
if err != nil {
// If we're shutting down we could get a variety of errors, for example:
// 'JetStream not enabled for account' when looking up the stream.
// Normally we can continue and delete state, but need to be careful when shutting down.
if js.isShuttingDown() {
s.Debugf("Could not create stream, JetStream shutting down")
return
}

if IsNatsErr(err, JSStreamStoreFailedF) {
s.Warnf("Stream create failed for '%s > %s': %v", sa.Client.serviceAccount(), sa.Config.Name, err)
err = errStreamStoreFailed
Expand Down Expand Up @@ -4433,6 +4441,13 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
}

if err != nil {
// If we're shutting down we could get a variety of errors.
// Normally we can continue and delete state, but need to be careful when shutting down.
if js.isShuttingDown() {
s.Debugf("Could not create consumer, JetStream shutting down")
return
}

if IsNatsErr(err, JSConsumerStoreFailedErrF) {
s.Warnf("Consumer create failed for '%s > %s > %s': %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err)
err = errConsumerStoreFailed
Expand Down
48 changes: 48 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4342,3 +4342,51 @@ func TestJetStreamClusterDesyncAfterRestartReplacesLeaderSnapshot(t *testing.T)
return checkState(t, c, globalAccountName, "TEST")
})
}

func TestJetStreamClusterKeepRaftStateIfStreamCreationFailedDuringShutdown(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
nc.Close()

// Capture RAFT storage directory and JetStream handle before shutdown.
s := c.randomNonStreamLeader(globalAccountName, "TEST")
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
sd := mset.node.(*raft).sd
jss := s.getJetStream()

// Shutdown the server.
// Normally there are no actions taken anymore after shutdown completes,
// but still do so to simulate actions taken while shutdown is in progress.
s.Shutdown()
s.WaitForShutdown()

// Check RAFT state is kept.
files, err := os.ReadDir(sd)
require_NoError(t, err)
require_True(t, len(files) > 0)

// Simulate server shutting down, JetStream being disabled and a stream being created.
sa := &streamAssignment{
Config: &StreamConfig{Name: "TEST"},
Group: &raftGroup{node: &raft{}},
}
jss.processClusterCreateStream(acc, sa)

// Check RAFT state is not deleted due to failing stream creation.
files, err = os.ReadDir(sd)
require_NoError(t, err)
require_True(t, len(files) > 0)
}

0 comments on commit 6d1ae91

Please sign in to comment.