Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] When scaling down a stream make sure consumers are adjusted properly. #5927

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 65 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6357,8 +6357,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
}

if isReplicaChange {
isScaleUp := newCfg.Replicas > len(rg.Peers)
// We are adding new peers here.
if newCfg.Replicas > len(rg.Peers) {
if isScaleUp {
// Check that we have the allocation available.
if err := js.jsClusteredStreamLimitsCheck(acc, newCfg); err != nil {
resp.Error = err
Expand Down Expand Up @@ -6434,22 +6435,82 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su

// Need to remap any consumers.
for _, ca := range osa.consumers {
// Ephemerals are R=1, so only auto-remap durables, or R>1, unless stream is interest or workqueue policy.
// Legacy ephemerals are R=1 but present as R=0, so only auto-remap named consumers, or if we are downsizing the consumer peers.
// If stream is interest or workqueue policy always remaps since they require peer parity with stream.
numPeers := len(ca.Group.Peers)
if ca.Config.Durable != _EMPTY_ || numPeers > 1 || cfg.Retention != LimitsPolicy {
isAutoScale := ca.Config.Replicas == 0 && (ca.Config.Durable != _EMPTY_ || ca.Config.Name != _EMPTY_)
if isAutoScale || numPeers > len(rg.Peers) || cfg.Retention != LimitsPolicy {
cca := ca.copyGroup()
// Adjust preferred as needed.
if numPeers == 1 && len(rg.Peers) > 1 {
if numPeers == 1 && isScaleUp {
cca.Group.Preferred = ca.Group.Peers[0]
} else {
cca.Group.Preferred = _EMPTY_
}
// Assign new peers.
cca.Group.Peers = rg.Peers
// If the replicas was not 0 make sure it matches here.
if cca.Config.Replicas != 0 {
cca.Config.Replicas = len(rg.Peers)
}
// We can not propose here before the stream itself so we collect them.
consumers = append(consumers, cca)

} else if !isScaleUp {
// We decided to leave this consumer's peer group alone but we are also scaling down.
// We need to make sure we do not have any peers that are no longer part of the stream.
// Note we handle down scaling of a consumer above if its number of peers were > new stream peers.
var needReplace []string
for _, rp := range ca.Group.Peers {
// Check if we have an orphaned peer now for this consumer.
if !rg.isMember(rp) {
needReplace = append(needReplace, rp)
}
}
if len(needReplace) > 0 {
newPeers := copyStrings(rg.Peers)
rand.Shuffle(len(newPeers), func(i, j int) { newPeers[i], newPeers[j] = newPeers[j], newPeers[i] })
// If we had a small size then the peer set, restrict to the same number.
if lp := len(ca.Group.Peers); lp < len(newPeers) {
newPeers = newPeers[:lp]
}
cca := ca.copyGroup()
// Assign new peers.
cca.Group.Peers = newPeers
// If the replicas was not 0 make sure it matches here.
if cca.Config.Replicas != 0 {
cca.Config.Replicas = len(newPeers)
}
// Check if all peers are invalid. This can happen with R1 under replicated streams that are being scaled down.
if len(needReplace) == len(ca.Group.Peers) {
// We have to transfer state to new peers.
// we will grab our state and attach to the new assignment.
// TODO(dlc) - In practice we would want to make sure the consumer is paused.
// Need to release js lock.
js.mu.Unlock()
if ci, err := sysRequest[ConsumerInfo](s, clusterConsumerInfoT, acc, osa.Config.Name, ca.Name); err != nil {
s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, osa.Config.Name, ca.Name, err)
} else if ci != nil {
cca.State = &ConsumerState{
Delivered: SequencePair{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to include the Redelivered state here too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have access to that state here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So not perfect, and we should revisit for 2.11, but better then what we had for sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Consumer: ci.Delivered.Consumer,
Stream: ci.Delivered.Stream,
},
AckFloor: SequencePair{
Consumer: ci.AckFloor.Consumer,
Stream: ci.AckFloor.Stream,
},
}
}
// Re-acquire here.
js.mu.Lock()
}
// We can not propose here before the stream itself so we collect them.
consumers = append(consumers, cca)
}
}
}

} else if isMoveRequest {
if len(peerSet) == 0 {
nrg, err := js.createGroupForStream(ci, newCfg)
Expand Down
125 changes: 125 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3961,3 +3961,128 @@ func TestJetStreamPendingRequestsInJsz(t *testing.T) {
require_True(t, m.Stats.JetStream != nil)
require_NotEqual(t, m.Stats.JetStream.Meta.Pending, 0)
}

func TestJetStreamConsumerReplicasAfterScale(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomNonLeader())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 5,
})
require_NoError(t, err)

// Put some messages in to test consumer state transfer.
for i := 0; i < 100; i++ {
js.PublishAsync("foo", []byte("ok"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

// Create four different consumers.
// Normal where we inherit replicas from parent.
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dur",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0)
require_Equal(t, len(ci.Cluster.Replicas), 4)

// Ephemeral
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0) // Legacy ephemeral is 0 here too.
require_Equal(t, len(ci.Cluster.Replicas), 0)
eName := ci.Name

// R1
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "r1",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 1,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 1)
require_Equal(t, len(ci.Cluster.Replicas), 0)

// R3
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "r3",
AckPolicy: nats.AckExplicitPolicy,
Replicas: 3,
})
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 3)
require_Equal(t, len(ci.Cluster.Replicas), 2)

// Now create some state on r1 consumer.
sub, err := js.PullSubscribe("foo", "r1")
require_NoError(t, err)

fetch := rand.Intn(99) + 1 // Needs to be at least 1.
msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second))
require_NoError(t, err)
require_Equal(t, len(msgs), fetch)
ack := rand.Intn(fetch)
for i := 0; i <= ack; i++ {
msgs[i].AckSync()
}
r1ci, err := js.ConsumerInfo("TEST", "r1")
require_NoError(t, err)
r1ci.Delivered.Last, r1ci.AckFloor.Last = nil, nil

// Now scale stream to R3.
_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

c.waitOnStreamLeader(globalAccountName, "TEST")

// Now check each.
c.waitOnConsumerLeader(globalAccountName, "TEST", "dur")
ci, err = js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0)
require_Equal(t, len(ci.Cluster.Replicas), 2)

c.waitOnConsumerLeader(globalAccountName, "TEST", eName)
ci, err = js.ConsumerInfo("TEST", eName)
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 0)
require_Equal(t, len(ci.Cluster.Replicas), 0)

c.waitOnConsumerLeader(globalAccountName, "TEST", "r1")
ci, err = js.ConsumerInfo("TEST", "r1")
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 1)
require_Equal(t, len(ci.Cluster.Replicas), 0)
// Now check that state transferred correctly.
ci.Delivered.Last, ci.AckFloor.Last = nil, nil
if ci.Delivered != r1ci.Delivered {
t.Fatalf("Delivered state for R1 incorrect, wanted %+v got %+v",
r1ci.Delivered, ci.Delivered)
}
if ci.AckFloor != r1ci.AckFloor {
t.Fatalf("AckFloor state for R1 incorrect, wanted %+v got %+v",
r1ci.AckFloor, ci.AckFloor)
}

c.waitOnConsumerLeader(globalAccountName, "TEST", "r3")
ci, err = js.ConsumerInfo("TEST", "r3")
require_NoError(t, err)
require_Equal(t, ci.Config.Replicas, 3)
require_Equal(t, len(ci.Cluster.Replicas), 2)
}