diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ad58699447b..6833100caa4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -6357,8 +6357,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su } if isReplicaChange { + isScaleUp := newCfg.Replicas > len(rg.Peers) // We are adding new peers here. - if newCfg.Replicas > len(rg.Peers) { + if isScaleUp { // Check that we have the allocation available. if err := js.jsClusteredStreamLimitsCheck(acc, newCfg); err != nil { resp.Error = err @@ -6434,22 +6435,82 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su // Need to remap any consumers. for _, ca := range osa.consumers { - // Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy. + // Legacy ephemerals are R=1 but present as R=0, so only auto-remap named consumers, or if we are downsizing the consumer peers. + // If stream is interest or workqueue policy always remaps since they require peer parity with stream. numPeers := len(ca.Group.Peers) - if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy { + isAutoScale := ca.Config.Replicas == 0 && (ca.Config.Durable != _EMPTY_ || ca.Config.Name != _EMPTY_) + if isAutoScale || numPeers > len(rg.Peers) || cfg.Retention != LimitsPolicy { cca := ca.copyGroup() // Adjust preferred as needed. - if numPeers == 1 && len(rg.Peers) > 1 { + if numPeers == 1 && isScaleUp { cca.Group.Preferred = ca.Group.Peers[0] } else { cca.Group.Preferred = _EMPTY_ } // Assign new peers. cca.Group.Peers = rg.Peers + // If the replicas was not 0 make sure it matches here. + if cca.Config.Replicas != 0 { + cca.Config.Replicas = len(rg.Peers) + } // We can not propose here before the stream itself so we collect them. consumers = append(consumers, cca) + + } else if !isScaleUp { + // We decided to leave this consumer's peer group alone but we are also scaling down. + // We need to make sure we do not have any peers that are no longer part of the stream. + // Note we handle down scaling of a consumer above if its number of peers were > new stream peers. + var needReplace []string + for _, rp := range ca.Group.Peers { + // Check if we have an orphaned peer now for this consumer. + if !rg.isMember(rp) { + needReplace = append(needReplace, rp) + } + } + if len(needReplace) > 0 { + newPeers := copyStrings(rg.Peers) + rand.Shuffle(len(newPeers), func(i, j int) { newPeers[i], newPeers[j] = newPeers[j], newPeers[i] }) + // If we had a small size then the peer set, restrict to the same number. + if lp := len(ca.Group.Peers); lp < len(newPeers) { + newPeers = newPeers[:lp] + } + cca := ca.copyGroup() + // Assign new peers. + cca.Group.Peers = newPeers + // If the replicas was not 0 make sure it matches here. + if cca.Config.Replicas != 0 { + cca.Config.Replicas = len(newPeers) + } + // Check if all peers are invalid. This can happen with R1 under replicated streams that are being scaled down. + if len(needReplace) == len(ca.Group.Peers) { + // We have to transfer state to new peers. + // we will grab our state and attach to the new assignment. + // TODO(dlc) - In practice we would want to make sure the consumer is paused. + // Need to release js lock. + js.mu.Unlock() + if ci, err := sysRequest[ConsumerInfo](s, clusterConsumerInfoT, acc, osa.Config.Name, ca.Name); err != nil { + s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, osa.Config.Name, ca.Name, err) + } else if ci != nil { + cca.State = &ConsumerState{ + Delivered: SequencePair{ + Consumer: ci.Delivered.Consumer, + Stream: ci.Delivered.Stream, + }, + AckFloor: SequencePair{ + Consumer: ci.AckFloor.Consumer, + Stream: ci.AckFloor.Stream, + }, + } + } + // Re-acquire here. + js.mu.Lock() + } + // We can not propose here before the stream itself so we collect them. + consumers = append(consumers, cca) + } } } + } else if isMoveRequest { if len(peerSet) == 0 { nrg, err := js.createGroupForStream(ci, newCfg) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 84a694e4362..e9a980aae5c 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3961,3 +3961,128 @@ func TestJetStreamPendingRequestsInJsz(t *testing.T) { require_True(t, m.Stats.JetStream != nil) require_NotEqual(t, m.Stats.JetStream.Meta.Pending, 0) } + +func TestJetStreamConsumerReplicasAfterScale(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R5S", 5) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomNonLeader()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 5, + }) + require_NoError(t, err) + + // Put some messages in to test consumer state transfer. + for i := 0; i < 100; i++ { + js.PublishAsync("foo", []byte("ok")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Create four different consumers. + // Normal where we inherit replicas from parent. + ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "dur", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + require_Equal(t, ci.Config.Replicas, 0) + require_Equal(t, len(ci.Cluster.Replicas), 4) + + // Ephemeral + ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + require_Equal(t, ci.Config.Replicas, 0) // Legacy ephemeral is 0 here too. + require_Equal(t, len(ci.Cluster.Replicas), 0) + eName := ci.Name + + // R1 + ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "r1", + AckPolicy: nats.AckExplicitPolicy, + Replicas: 1, + }) + require_NoError(t, err) + require_Equal(t, ci.Config.Replicas, 1) + require_Equal(t, len(ci.Cluster.Replicas), 0) + + // R3 + ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Name: "r3", + AckPolicy: nats.AckExplicitPolicy, + Replicas: 3, + }) + require_NoError(t, err) + require_Equal(t, ci.Config.Replicas, 3) + require_Equal(t, len(ci.Cluster.Replicas), 2) + + // Now create some state on r1 consumer. + sub, err := js.PullSubscribe("foo", "r1") + require_NoError(t, err) + + fetch := rand.Intn(99) + 1 // Needs to be at least 1. + msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second)) + require_NoError(t, err) + require_Equal(t, len(msgs), fetch) + ack := rand.Intn(fetch) + for i := 0; i <= ack; i++ { + msgs[i].AckSync() + } + r1ci, err := js.ConsumerInfo("TEST", "r1") + require_NoError(t, err) + r1ci.Delivered.Last, r1ci.AckFloor.Last = nil, nil + + // Now scale stream to R3. + _, err = js.UpdateStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + c.waitOnStreamLeader(globalAccountName, "TEST") + + // Now check each. + c.waitOnConsumerLeader(globalAccountName, "TEST", "dur") + ci, err = js.ConsumerInfo("TEST", "dur") + require_NoError(t, err) + require_Equal(t, ci.Config.Replicas, 0) + require_Equal(t, len(ci.Cluster.Replicas), 2) + + c.waitOnConsumerLeader(globalAccountName, "TEST", eName) + ci, err = js.ConsumerInfo("TEST", eName) + require_NoError(t, err) + require_Equal(t, ci.Config.Replicas, 0) + require_Equal(t, len(ci.Cluster.Replicas), 0) + + c.waitOnConsumerLeader(globalAccountName, "TEST", "r1") + ci, err = js.ConsumerInfo("TEST", "r1") + require_NoError(t, err) + require_Equal(t, ci.Config.Replicas, 1) + require_Equal(t, len(ci.Cluster.Replicas), 0) + // Now check that state transferred correctly. + ci.Delivered.Last, ci.AckFloor.Last = nil, nil + if ci.Delivered != r1ci.Delivered { + t.Fatalf("Delivered state for R1 incorrect, wanted %+v got %+v", + r1ci.Delivered, ci.Delivered) + } + if ci.AckFloor != r1ci.AckFloor { + t.Fatalf("AckFloor state for R1 incorrect, wanted %+v got %+v", + r1ci.AckFloor, ci.AckFloor) + } + + c.waitOnConsumerLeader(globalAccountName, "TEST", "r3") + ci, err = js.ConsumerInfo("TEST", "r3") + require_NoError(t, err) + require_Equal(t, ci.Config.Replicas, 3) + require_Equal(t, len(ci.Cluster.Replicas), 2) +}