From 355be6369fe3f151766e351d656395b19ebeca54 Mon Sep 17 00:00:00 2001 From: Chun-Hung Tseng Date: Wed, 17 Apr 2024 13:24:10 +0200 Subject: [PATCH] [backport] server: ignore raft messages if member id mismatch #17078 Signed-off-by: Chun-Hung Tseng --- server/etcdserver/server.go | 8 ++++ server/etcdserver/server_test.go | 67 ++++++++++++++++++++++++++++++-- 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index ae2076a9b076..e32249f627e9 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -991,6 +991,14 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { ) return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") } + if s.ID() != types.ID(m.To) { + lg.Warn( + "rejected Raft message to mismatch member", + zap.String("local-member-id", s.ID().String()), + zap.String("mismatch-member-id", types.ID(m.To).String()), + ) + return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message to mismatch member") + } if m.Type == raftpb.MsgApp { s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size()) } diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index ca0cf07cd9f9..9319d0a25a74 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -657,7 +657,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { lgMu: new(sync.RWMutex), lg: lg, id: 1, - r: *realisticRaftNode(lg), + r: *realisticRaftNode(lg, 1, nil), cluster: cl, w: wait.New(), consistIndex: ci, @@ -700,11 +700,17 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { assert.Equal(t, consistIndex, rindex) } -func realisticRaftNode(lg *zap.Logger) *raftNode { +func realisticRaftNode(lg *zap.Logger, id uint64, snap *raftpb.Snapshot) *raftNode { storage := raft.NewMemoryStorage() storage.SetHardState(raftpb.HardState{Commit: 0, Term: 0}) + if snap != nil { + err := storage.ApplySnapshot(*snap) + if err != nil { + panic(err) + } + } c := &raft.Config{ - ID: 1, + ID: id, ElectionTick: 10, HeartbeatTick: 1, Storage: storage, @@ -1381,6 +1387,61 @@ func TestAddMember(t *testing.T) { } } +// TestProcessIgnoreMismatchMessage tests Process must ignore messages to +// mismatch member. +func TestProcessIgnoreMismatchMessage(t *testing.T) { + lg := zaptest.NewLogger(t) + cl := newTestCluster(t, nil) + st := v2store.New() + cl.SetStore(st) + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + cl.SetBackend(be) + + // Bootstrap a 3-node cluster, member IDs: 1 2 3. + cl.AddMember(&membership.Member{ID: types.ID(1)}, true) + cl.AddMember(&membership.Member{ID: types.ID(2)}, true) + cl.AddMember(&membership.Member{ID: types.ID(3)}, true) + // r is initialized with ID 1. + r := realisticRaftNode(lg, 1, &raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{ + Index: 11, // Magic number. + Term: 11, // Magic number. + ConfState: raftpb.ConfState{ + // Member ID list. + Voters: []uint64{1, 2, 3}, + }, + }, + }) + s := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: lg, + id: 1, + r: *r, + v2store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + consistIndex: cindex.NewFakeConsistentIndex(0), + beHooks: &backendHooks{lg: lg}, + } + // Mock a mad switch dispatching messages to wrong node. + m := raftpb.Message{ + Type: raftpb.MsgHeartbeat, + To: 2, // Wrong ID, s.MemberId() is 1. + From: 3, + Term: 11, + Commit: 42, // Commit is larger than the last index 11. + } + if types.ID(m.To) == s.ID() { + t.Fatalf("m.To (%d) is expected to mismatch s.MemberId (%d)", m.To, s.ID()) + } + err := s.Process(context.Background(), m) + if err == nil { + t.Fatalf("Must ignore the message and return an error") + } +} + // TestRemoveMember tests RemoveMember can propose and perform node removal. func TestRemoveMember(t *testing.T) { lg := zaptest.NewLogger(t)