diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 224e4576b52..baf15307d52 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1136,6 +1136,7 @@ func (js *jetStream) isMetaRecovering() bool { type recoveryUpdates struct { removeStreams map[string]*streamAssignment removeConsumers map[string]map[string]*consumerAssignment + addStreams map[string]*streamAssignment updateStreams map[string]*streamAssignment updateConsumers map[string]map[string]*consumerAssignment } @@ -1343,6 +1344,7 @@ func (js *jetStream) monitorCluster() { ru := &recoveryUpdates{ removeStreams: make(map[string]*streamAssignment), removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), updateStreams: make(map[string]*streamAssignment), updateConsumers: make(map[string]map[string]*consumerAssignment), } @@ -1381,6 +1383,10 @@ func (js *jetStream) monitorCluster() { for _, sa := range ru.removeStreams { js.processStreamRemoval(sa) } + // Process stream additions. + for _, sa := range ru.addStreams { + js.processStreamAssignment(sa) + } // Process pending updates. for _, sa := range ru.updateStreams { js.processUpdateStreamAssignment(sa) @@ -1637,6 +1643,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove key := sa.recoveryKey() ru.removeStreams[key] = sa delete(ru.updateConsumers, key) + delete(ru.addStreams, key) delete(ru.updateStreams, key) } else { js.processStreamRemoval(sa) @@ -1661,6 +1668,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove if isRecovering { key := sa.recoveryKey() ru.updateStreams[key] = sa + delete(ru.addStreams, key) delete(ru.removeStreams, key) } else { js.processUpdateStreamAssignment(sa) @@ -1945,9 +1953,10 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo } if isRecovering { js.setStreamAssignmentRecovering(sa) - delete(ru.removeStreams, sa.recoveryKey()) - } - if js.processStreamAssignment(sa) { + key := sa.recoveryKey() + ru.addStreams[key] = sa + delete(ru.removeStreams, key) + } else if js.processStreamAssignment(sa) { didRemoveStream = true } case removeStreamOp: @@ -1960,6 +1969,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo js.setStreamAssignmentRecovering(sa) key := sa.recoveryKey() ru.removeStreams[key] = sa + delete(ru.addStreams, key) delete(ru.updateStreams, key) delete(ru.updateConsumers, key) } else { @@ -2031,6 +2041,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo js.setStreamAssignmentRecovering(sa) key := sa.recoveryKey() ru.updateStreams[key] = sa + delete(ru.addStreams, key) delete(ru.removeStreams, key) } else { js.processUpdateStreamAssignment(sa) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 4d4f7b3e2cc..31d68eaab1c 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6527,6 +6527,7 @@ func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) { ru := &recoveryUpdates{ removeStreams: make(map[string]*streamAssignment), removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), updateStreams: make(map[string]*streamAssignment), updateConsumers: make(map[string]map[string]*consumerAssignment), } @@ -6544,6 +6545,75 @@ func TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers(t *testing.T) { require_Len(t, len(ru.updateConsumers), 0) } +func TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + js := c.leader().getJetStream() + + createFileStream := []*Entry{ + {EntryNormal, encodeAddStreamAssignment(&streamAssignment{ + Config: &StreamConfig{Name: "TEST", Storage: FileStorage}, + })}, + } + + deleteFileStream := []*Entry{ + {EntryNormal, encodeDeleteStreamAssignment(&streamAssignment{ + Config: &StreamConfig{Name: "TEST", Storage: FileStorage}, + })}, + } + + createMemoryStream := []*Entry{ + {EntryNormal, encodeAddStreamAssignment(&streamAssignment{ + Config: &StreamConfig{Name: "TEST", Storage: FileStorage}, + })}, + } + + createConsumer := []*Entry{ + {EntryNormal, encodeAddConsumerAssignment(&consumerAssignment{ + Stream: "TEST", + Config: &ConsumerConfig{Name: "consumer"}, + })}, + } + + // Need to be recovering so that we accumulate recoveryUpdates. + js.setMetaRecovering() + ru := &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]map[string]*consumerAssignment), + } + + // We created a file-based stream first, but deleted it shortly after. + _, _, _, err := js.applyMetaEntries(createFileStream, ru) + require_NoError(t, err) + require_Len(t, len(ru.addStreams), 1) + require_Len(t, len(ru.removeStreams), 0) + + // Now push another recovery entry that deletes the stream. + // The file-based stream should not have been created. + _, _, _, err = js.applyMetaEntries(deleteFileStream, ru) + require_NoError(t, err) + require_Len(t, len(ru.addStreams), 0) + require_Len(t, len(ru.removeStreams), 1) + + // Now stage a memory-based stream to be created. + _, _, _, err = js.applyMetaEntries(createMemoryStream, ru) + require_NoError(t, err) + require_Len(t, len(ru.addStreams), 1) + require_Len(t, len(ru.removeStreams), 0) + require_Len(t, len(ru.updateConsumers), 0) + + // Also create a consumer on that memory-based stream. + _, _, _, err = js.applyMetaEntries(createConsumer, ru) + require_NoError(t, err) + require_Len(t, len(ru.addStreams), 1) + require_Len(t, len(ru.removeStreams), 0) + require_Len(t, len(ru.updateConsumers), 1) +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value.