Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Peer removal race #6316

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 5 additions & 30 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2721,8 +2721,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// keep stream assignment current
sa = mset.streamAssignment()

// keep peer list up to date with config
js.checkPeers(mset.raftGroup())
// We get this when we have a new stream assignment caused by an update.
// We want to know if we are migrating.
if migrating := mset.isMigrating(); migrating {
Expand Down Expand Up @@ -2810,7 +2808,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Check if we have a quorom.
if current >= neededCurrent {
s.Noticef("Transfer of stream leader for '%s > %s' to '%s'", accName, sa.Config.Name, newLeader)
n.UpdateKnownPeers(newPeers)
n.ProposeKnownPeers(newPeers)
n.StepDown(newLeaderPeer)
}
}
Expand Down Expand Up @@ -3345,22 +3343,6 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo {
return replicas
}

// Will check our node peers and see if we should remove a peer.
func (js *jetStream) checkPeers(rg *raftGroup) {
js.mu.Lock()
defer js.mu.Unlock()

// FIXME(dlc) - Single replicas?
if rg == nil || rg.node == nil {
return
}
for _, peer := range rg.node.Peers() {
if !rg.isMember(peer.ID) {
rg.node.ProposeRemovePeer(peer.ID)
}
}
}

// Process a leader change for the clustered stream.
func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
if mset == nil {
Expand Down Expand Up @@ -3393,8 +3375,6 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
if isLeader {
s.Noticef("JetStream cluster new stream leader for '%s > %s'", account, streamName)
s.sendStreamLeaderElectAdvisory(mset)
// Check for peer removal and process here if needed.
js.checkPeers(sa.Group)
mset.checkAllowMsgCompress(peers)
} else {
// We are stepping down.
Expand Down Expand Up @@ -3611,7 +3591,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
js.processClusterCreateStream(acc, sa)
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
// We have one here even though we are not a member. This can happen on re-assignment.
s.removeStream(ourID, mset, sa)
s.removeStream(mset, sa)
}

// If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected.
Expand Down Expand Up @@ -3699,13 +3679,13 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
js.processClusterUpdateStream(acc, osa, sa)
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
// We have one here even though we are not a member. This can happen on re-assignment.
s.removeStream(ourID, mset, sa)
s.removeStream(mset, sa)
}
}

// Common function to remove ourself from this server.
// Common function to remove ourselves from this server.
// This can happen on re-assignment, move, etc
func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) {
func (s *Server) removeStream(mset *stream, nsa *streamAssignment) {
if mset == nil {
return
}
Expand All @@ -3715,7 +3695,6 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment)
if node.Leader() {
node.StepDown(nsa.Group.Preferred)
}
node.ProposeRemovePeer(ourID)
// shutdown monitor by shutting down raft.
node.Delete()
}
Expand Down Expand Up @@ -5051,8 +5030,6 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
// We get this when we have a new consumer assignment caused by an update.
// We want to know if we are migrating.
rg := o.raftGroup()
// keep peer list up to date with config
js.checkPeers(rg)
// If we are migrating, monitor for the new peers to be caught up.
replicas, err := o.replica()
if err != nil {
Expand Down Expand Up @@ -5369,8 +5346,6 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
if isLeader {
s.Noticef("JetStream cluster new consumer leader for '%s > %s > %s'", ca.Client.serviceAccount(), streamName, consumerName)
s.sendConsumerLeaderElectAdvisory(o)
// Check for peer removal and process here if needed.
js.checkPeers(ca.Group)
} else {
// We are stepping down.
// Make sure if we are doing so because we have lost quorum that we send the appropriate advisories.
Expand Down
54 changes: 54 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6945,6 +6945,60 @@ func TestJetStreamClusterConsumerInfoAfterCreate(t *testing.T) {
require_NoError(t, err)
}

func TestJetStreamClusterStreamUpscalePeersAfterDownscale(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R5S", 5)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

checkPeerSet := func() {
t.Helper()
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
return err
}
mset, err := acc.lookupStream("TEST")
if err != nil {
return err
}
peers := mset.raftNode().Peers()
if len(peers) != 5 {
return fmt.Errorf("expected 5 peers, got %d", len(peers))
}
}
return nil
})
}

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 5,
})
require_NoError(t, err)

checkPeerSet()

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.UpdateStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 5,
})
require_NoError(t, err)

checkPeerSet()
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
17 changes: 11 additions & 6 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type RaftNode interface {
ID() string
Group() string
Peers() []*Peer
ProposeKnownPeers(knownPeers []string)
UpdateKnownPeers(knownPeers []string)
ProposeAddPeer(peer string) error
ProposeRemovePeer(peer string) error
Expand Down Expand Up @@ -1699,19 +1700,23 @@ func (n *raft) Peers() []*Peer {
return peers
}

// Update and propose our known set of peers.
func (n *raft) ProposeKnownPeers(knownPeers []string) {
// If we are the leader update and send this update out.
if n.State() != Leader {
return
}
n.UpdateKnownPeers(knownPeers)
n.sendPeerState()
}

// Update our known set of peers.
func (n *raft) UpdateKnownPeers(knownPeers []string) {
n.Lock()
// Process like peer state update.
ps := &peerState{knownPeers, len(knownPeers), n.extSt}
n.processPeerState(ps)
isLeader := n.State() == Leader
n.Unlock()

// If we are the leader send this update out as well.
if isLeader {
n.sendPeerState()
}
}

// ApplyQ returns the apply queue that new commits will be sent to for the
Expand Down
Loading