From 6d1ae91e3eea63227446e5339c5f77195d5d6187 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 31 Oct 2024 11:26:05 +0100 Subject: [PATCH] NRG: Don't delete RAFT state if stream/consumer creation failed during shutdown Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 17 ++++++++++- server/jetstream_cluster_4_test.go | 48 ++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 9b03f6bee69..fa06a93a0d1 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2831,7 +2831,7 @@ func (mset *stream) resetClusteredState(err error) bool { // If we detect we are shutting down just return. if js != nil && js.isShuttingDown() { - s.Debugf("Will not reset stream, jetstream shutting down") + s.Debugf("Will not reset stream, JetStream shutting down") return false } @@ -3842,6 +3842,14 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme // This is an error condition. if err != nil { + // If we're shutting down we could get a variety of errors, for example: + // 'JetStream not enabled for account' when looking up the stream. + // Normally we can continue and delete state, but need to be careful when shutting down. + if js.isShuttingDown() { + s.Debugf("Could not create stream, JetStream shutting down") + return + } + if IsNatsErr(err, JSStreamStoreFailedF) { s.Warnf("Stream create failed for '%s > %s': %v", sa.Client.serviceAccount(), sa.Config.Name, err) err = errStreamStoreFailed @@ -4433,6 +4441,13 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state } if err != nil { + // If we're shutting down we could get a variety of errors. + // Normally we can continue and delete state, but need to be careful when shutting down. + if js.isShuttingDown() { + s.Debugf("Could not create consumer, JetStream shutting down") + return + } + if IsNatsErr(err, JSConsumerStoreFailedErrF) { s.Warnf("Consumer create failed for '%s > %s > %s': %v", ca.Client.serviceAccount(), ca.Stream, ca.Name, err) err = errConsumerStoreFailed diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 9b3a0bd0168..29e3a973138 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4342,3 +4342,51 @@ func TestJetStreamClusterDesyncAfterRestartReplacesLeaderSnapshot(t *testing.T) return checkState(t, c, globalAccountName, "TEST") }) } + +func TestJetStreamClusterKeepRaftStateIfStreamCreationFailedDuringShutdown(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + nc.Close() + + // Capture RAFT storage directory and JetStream handle before shutdown. + s := c.randomNonStreamLeader(globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + sd := mset.node.(*raft).sd + jss := s.getJetStream() + + // Shutdown the server. + // Normally there are no actions taken anymore after shutdown completes, + // but still do so to simulate actions taken while shutdown is in progress. + s.Shutdown() + s.WaitForShutdown() + + // Check RAFT state is kept. + files, err := os.ReadDir(sd) + require_NoError(t, err) + require_True(t, len(files) > 0) + + // Simulate server shutting down, JetStream being disabled and a stream being created. + sa := &streamAssignment{ + Config: &StreamConfig{Name: "TEST"}, + Group: &raftGroup{node: &raft{}}, + } + jss.processClusterCreateStream(acc, sa) + + // Check RAFT state is not deleted due to failing stream creation. + files, err = os.ReadDir(sd) + require_NoError(t, err) + require_True(t, len(files) > 0) +}