diff --git a/etcdserver/server.go b/etcdserver/server.go index e862b2927fa..0d4de02e0da 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -923,6 +923,18 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { } return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") } + if s.ID() != types.ID(m.To) { + if lg := s.getLogger(); lg != nil { + lg.Warn( + "rejected Raft message to mismatch member", + zap.String("local-member-id", s.ID().String()), + zap.String("mismatch-member-id", types.ID(m.To).String()), + ) + } else { + plog.Warningf("rejected message to mismatch member %s", types.ID(m.From).String()) + } + return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message to mismatch member") + } if m.Type == raftpb.MsgApp { s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size()) } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index d082f1a6ec8..7210d153408 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" "net/http" "os" "path" @@ -49,6 +50,7 @@ import ( "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" "go.uber.org/zap" + "go.uber.org/zap/zaptest" ) // TestDoLocalAction tests requests which do not need to go through raft to be applied, @@ -1327,6 +1329,86 @@ func TestAddMember(t *testing.T) { } } +func realisticRaftNode(lg *zap.Logger, id uint64, snap *raftpb.Snapshot) *raftNode { + storage := raft.NewMemoryStorage() + storage.SetHardState(raftpb.HardState{Commit: 0, Term: 0}) + if snap != nil { + err := storage.ApplySnapshot(*snap) + if err != nil { + panic(err) + } + } + c := &raft.Config{ + ID: id, + ElectionTick: 10, + HeartbeatTick: 1, + Storage: storage, + MaxSizePerMsg: math.MaxUint64, + MaxInflightMsgs: 256, + } + n := raft.RestartNode(c) + r := newRaftNode(raftNodeConfig{ + lg: lg, + Node: n, + transport: newNopTransporter(), + }) + return r +} + +// TestProcessIgnoreMismatchMessage tests Process must ignore messages to +// mismatch member. +func TestProcessIgnoreMismatchMessage(t *testing.T) { + lg := zaptest.NewLogger(t) + cl := newTestCluster(nil) + st := v2store.New() + cl.SetStore(st) + + // Bootstrap a 3-node cluster, member IDs: 1 2 3. + cl.AddMember(&membership.Member{ID: 1}) + cl.AddMember(&membership.Member{ID: 2}) + cl.AddMember(&membership.Member{ID: 3}) + // r is initialized with ID 1. + r := realisticRaftNode(lg, 1, &raftpb.Snapshot{ + Metadata: raftpb.SnapshotMetadata{ + Index: 11, // Magic number. + Term: 11, // Magic number. + ConfState: raftpb.ConfState{ + // Member ID list. + Voters: []uint64{1, 2, 3}, + }, + }, + }) + + var cindex consistentIndex + cindex.setConsistentIndex(0) + s := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: lg, + id: 1, + r: *r, + v2store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + consistIndex: cindex, + } + // Mock a mad switch dispatching messages to wrong node. + m := raftpb.Message{ + Type: raftpb.MsgHeartbeat, + To: 2, // Wrong ID, s.MemberId() is 1. + From: 3, + Term: 11, + Commit: 42, // Commit is larger than the last index 11. + } + if types.ID(m.To) == s.ID() { + t.Fatalf("m.To (%d) is expected to mismatch s.MemberId (%d)", m.To, s.ID()) + } + err := s.Process(context.Background(), m) + if err == nil { + t.Fatalf("Must ignore the message and return an error") + } +} + // TestRemoveMember tests RemoveMember can propose and perform node removal. func TestRemoveMember(t *testing.T) { n := newNodeConfChangeCommitterRecorder()