From 0ea6fa542acbbb8e7c8b33985a0e1b57897b9335 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 18 Oct 2022 20:23:57 -0400 Subject: [PATCH 01/17] raft: clean up IsLocalMsg and IsResponseMsg logic Use array indexing to clean up the code and make it constant time. Also, add a test for IsResponseMsg. Signed-off-by: Nathan VanBenschoten --- raft/raftpb/raft.proto | 3 +++ raft/util.go | 27 ++++++++++++++++++++++++--- raft/util_test.go | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index f21f3afe5827..3b058ba6281b 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -60,6 +60,9 @@ enum MessageType { MsgReadIndexResp = 16; MsgPreVote = 17; MsgPreVoteResp = 18; + // NOTE: when adding new message types, remember to update the isLocalMsg and + // isResponseMsg arrays in raft/util.go and update the corresponding tests in + // raft/util_test.go. } message Message { diff --git a/raft/util.go b/raft/util.go index 6e728fb010a1..0510d3f7e5e6 100644 --- a/raft/util.go +++ b/raft/util.go @@ -40,13 +40,34 @@ func max(a, b uint64) uint64 { return b } +var isLocalMsg = [...]bool{ + pb.MsgHup: true, + pb.MsgBeat: true, + pb.MsgUnreachable: true, + pb.MsgSnapStatus: true, + pb.MsgCheckQuorum: true, +} + +var isResponseMsg = [...]bool{ + pb.MsgAppResp: true, + pb.MsgVoteResp: true, + pb.MsgHeartbeatResp: true, + pb.MsgUnreachable: true, + pb.MsgReadIndexResp: true, + pb.MsgPreVoteResp: true, +} + +func isMsgInArray(msgt pb.MessageType, arr []bool) bool { + i := int(msgt) + return i < len(arr) && arr[i] +} + func IsLocalMsg(msgt pb.MessageType) bool { - return msgt == pb.MsgHup || msgt == pb.MsgBeat || msgt == pb.MsgUnreachable || - msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum + return isMsgInArray(msgt, isLocalMsg[:]) } func IsResponseMsg(msgt pb.MessageType) bool { - return msgt == pb.MsgAppResp || msgt == pb.MsgVoteResp || msgt == pb.MsgHeartbeatResp || msgt == pb.MsgUnreachable || msgt == pb.MsgPreVoteResp + return isMsgInArray(msgt, isResponseMsg[:]) } // voteResponseType maps vote and prevote message types to their corresponding responses. diff --git a/raft/util_test.go b/raft/util_test.go index 65bc95501bf3..1b914100174e 100644 --- a/raft/util_test.go +++ b/raft/util_test.go @@ -104,3 +104,37 @@ func TestIsLocalMsg(t *testing.T) { } } } + +func TestIsResponseMsg(t *testing.T) { + tests := []struct { + msgt pb.MessageType + isResponse bool + }{ + {pb.MsgHup, false}, + {pb.MsgBeat, false}, + {pb.MsgUnreachable, true}, + {pb.MsgSnapStatus, false}, + {pb.MsgCheckQuorum, false}, + {pb.MsgTransferLeader, false}, + {pb.MsgProp, false}, + {pb.MsgApp, false}, + {pb.MsgAppResp, true}, + {pb.MsgVote, false}, + {pb.MsgVoteResp, true}, + {pb.MsgSnap, false}, + {pb.MsgHeartbeat, false}, + {pb.MsgHeartbeatResp, true}, + {pb.MsgTimeoutNow, false}, + {pb.MsgReadIndex, false}, + {pb.MsgReadIndexResp, true}, + {pb.MsgPreVote, false}, + {pb.MsgPreVoteResp, true}, + } + + for i, tt := range tests { + got := IsResponseMsg(tt.msgt) + if got != tt.isResponse { + t.Errorf("#%d: got %v, want %v", i, got, tt.isResponse) + } + } +} From 3d5f293f3ee6bce73dff23d4956ba77e19711d41 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Fri, 11 Nov 2022 17:40:20 +0800 Subject: [PATCH 02/17] changelog: add item for 3.5 to cover the revision inconsistency issue Signed-off-by: Benjamin Wang --- CHANGELOG/CHANGELOG-3.5.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG/CHANGELOG-3.5.md b/CHANGELOG/CHANGELOG-3.5.md index 72c9d8065c0f..8d3a67fed615 100644 --- a/CHANGELOG/CHANGELOG-3.5.md +++ b/CHANGELOG/CHANGELOG-3.5.md @@ -10,6 +10,7 @@ Previous change logs can be found at [CHANGELOG-3.4](https://github.com/etcd-io/ - Fix [auth invalid token and old revision errors in watch](https://github.com/etcd-io/etcd/pull/14547) - Fix [avoid closing a watch with ID 0 incorrectly](https://github.com/etcd-io/etcd/pull/14563) - Fix [auth: fix data consistency issue caused by recovery from snapshot](https://github.com/etcd-io/etcd/pull/14648) +- Fix [revision might be inconsistency between members when etcd crashes during processing defragmentation operation](https://github.com/etcd-io/etcd/pull/14733) ### Package `netutil` - Fix [netutil: add url comparison without resolver to URLStringsEqual](https://github.com/etcd-io/etcd/pull/14573) From 95c5fed3cf306774e98010296ca7b71411346067 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 29 Sep 2022 23:53:10 -0400 Subject: [PATCH 03/17] raft: remove IsEmptySnap check from raftLog.hasPendingSnapshot unstable.snapshot is never an empty snapshot. This check made it look like it could be. Signed-off-by: Nathan VanBenschoten --- raft/log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/log.go b/raft/log.go index 118f8d078834..912eb50b0787 100644 --- a/raft/log.go +++ b/raft/log.go @@ -201,7 +201,7 @@ func (l *raftLog) hasNextCommittedEnts() bool { // hasPendingSnapshot returns if there is pending snapshot waiting for applying. func (l *raftLog) hasPendingSnapshot() bool { - return l.unstable.snapshot != nil && !IsEmptySnap(*l.unstable.snapshot) + return l.unstable.snapshot != nil } func (l *raftLog) snapshot() (pb.Snapshot, error) { From 539a8410f40a8b5708ce3f6b44c2885f81094ee4 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 18 Oct 2022 14:19:00 -0400 Subject: [PATCH 04/17] raft: don't apply entries when applying snapshot This commit removes the ability to apply log entries at the same time as applying a snapshot. Doing so it possible, but it leads to complex code and raises questions about what should be applied first. It also raises additional complexity when we start allowing concurrent, asynchronous log appends and log application. It's easiest to just disallow this. Signed-off-by: Nathan VanBenschoten --- raft/log.go | 19 ++++++++++++++----- raft/log_test.go | 40 ++++++++++++++++++++++++++++------------ 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/raft/log.go b/raft/log.go index 912eb50b0787..6bf9d153e8cd 100644 --- a/raft/log.go +++ b/raft/log.go @@ -181,9 +181,13 @@ func (l *raftLog) unstableEntries() []pb.Entry { // If applied is smaller than the index of snapshot, it returns all committed // entries after the index of snapshot. func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) { - off := max(l.applied+1, l.firstIndex()) - if l.committed+1 > off { - ents, err := l.slice(off, l.committed+1, l.maxNextCommittedEntsSize) + if l.hasPendingSnapshot() { + // See comment in hasNextCommittedEnts. + return nil + } + if l.committed > l.applied { + lo, hi := l.applied+1, l.committed+1 // [lo, hi) + ents, err := l.slice(lo, hi, l.maxNextCommittedEntsSize) if err != nil { l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) } @@ -195,8 +199,13 @@ func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) { // hasNextCommittedEnts returns if there is any available entries for execution. // This is a fast check without heavy raftLog.slice() in nextCommittedEnts(). func (l *raftLog) hasNextCommittedEnts() bool { - off := max(l.applied+1, l.firstIndex()) - return l.committed+1 > off + if l.hasPendingSnapshot() { + // If we have a snapshot to apply, don't also return any committed + // entries. Doing so raises questions about what should be applied + // first. + return false + } + return l.committed > l.applied } // hasPendingSnapshot returns if there is pending snapshot waiting for applying. diff --git a/raft/log_test.go b/raft/log_test.go index adeaee3ea95f..546b1c3039a2 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -349,13 +349,16 @@ func TestHasNextCommittedEnts(t *testing.T) { {Term: 1, Index: 6}, } tests := []struct { - applied uint64 - hasNext bool + applied uint64 + snap bool + whasNext bool }{ - {0, true}, - {3, true}, - {4, true}, - {5, false}, + {applied: 0, snap: false, whasNext: true}, + {applied: 3, snap: false, whasNext: true}, + {applied: 4, snap: false, whasNext: true}, + {applied: 5, snap: false, whasNext: false}, + // With snapshot. + {applied: 3, snap: true, whasNext: false}, } for i, tt := range tests { storage := NewMemoryStorage() @@ -364,10 +367,15 @@ func TestHasNextCommittedEnts(t *testing.T) { raftLog.append(ents...) raftLog.maybeCommit(5, 1) raftLog.appliedTo(tt.applied) + if tt.snap { + newSnap := snap + newSnap.Metadata.Index++ + raftLog.restore(newSnap) + } hasNext := raftLog.hasNextCommittedEnts() - if hasNext != tt.hasNext { - t.Errorf("#%d: hasNext = %v, want %v", i, hasNext, tt.hasNext) + if hasNext != tt.whasNext { + t.Errorf("#%d: hasNext = %v, want %v", i, hasNext, tt.whasNext) } } } @@ -383,12 +391,15 @@ func TestNextCommittedEnts(t *testing.T) { } tests := []struct { applied uint64 + snap bool wents []pb.Entry }{ - {0, ents[:2]}, - {3, ents[:2]}, - {4, ents[1:2]}, - {5, nil}, + {applied: 0, snap: false, wents: ents[:2]}, + {applied: 3, snap: false, wents: ents[:2]}, + {applied: 4, snap: false, wents: ents[1:2]}, + {applied: 5, snap: false, wents: nil}, + // With snapshot. + {applied: 3, snap: true, wents: nil}, } for i, tt := range tests { storage := NewMemoryStorage() @@ -397,6 +408,11 @@ func TestNextCommittedEnts(t *testing.T) { raftLog.append(ents...) raftLog.maybeCommit(5, 1) raftLog.appliedTo(tt.applied) + if tt.snap { + newSnap := snap + newSnap.Metadata.Index++ + raftLog.restore(newSnap) + } nents := raftLog.nextCommittedEnts() if !reflect.DeepEqual(nents, tt.wents) { From 71d45461ff47247da6c309cce32b133d8d5c7664 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Sun, 13 Nov 2022 17:21:13 +0800 Subject: [PATCH 05/17] e2e: test DowngradeVersion with latest point release Signed-off-by: Wei Fu --- .github/workflows/e2e.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index 49bcb8c16255..01286ceadfbd 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -21,7 +21,7 @@ jobs: echo "${TARGET}" case "${TARGET}" in linux-amd64-e2e) - PASSES='build release e2e' MANUAL_VER=v3.5.0 CPU='4' EXPECT_DEBUG='true' COVER='false' RACE='true' ./scripts/test.sh 2>&1 | tee test.log + PASSES='build release e2e' CPU='4' EXPECT_DEBUG='true' COVER='false' RACE='true' ./scripts/test.sh 2>&1 | tee test.log ! grep -E "(--- FAIL:|FAIL:|DATA RACE|panic: test timed out|appears to have leaked)" -B50 -A10 test.log ;; linux-386-e2e) From d5a6d2518d43516f43184efc5d1341a2a2072355 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 30 Oct 2022 03:49:01 -0500 Subject: [PATCH 06/17] tests: Optimize checking failed writes Signed-off-by: Marek Siarkowicz --- tests/linearizability/model.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/linearizability/model.go b/tests/linearizability/model.go index 389f2fd8953a..eebb9c05183c 100644 --- a/tests/linearizability/model.go +++ b/tests/linearizability/model.go @@ -101,12 +101,11 @@ func step(state EtcdState, request etcdRequest, response etcdResponse) (bool, Et if state.Value == response.getData { return true, state } - for write := range state.FailedWrites { - if write == response.getData { - state.Value = response.getData - delete(state.FailedWrites, write) - return true, state - } + _, ok := state.FailedWrites[response.getData] + if ok { + state.Value = response.getData + delete(state.FailedWrites, response.getData) + return true, state } case Put: if response.err == nil { From bfb7b16f4f5653403a99065d4983c77a59ac600c Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 20 Oct 2022 16:29:40 +0100 Subject: [PATCH 07/17] raft/tracker: add byte size limit to Inflights type The Inflights type has limits on the message size and the number of inflight messages. However, a single large entry that exceeds the size limit can still be sent. In combination with the max messages count limit, many large messages can be sent in a row and overflow the receiver. In effect, the "max" values act as "target" rather than hard limits. This commit adds an additional soft limit on the total size of inflight messages, which catches such situations and prevents the receiver overflow. Signed-off-by: Pavel Kalinnikov --- raft/confchange/confchange.go | 2 +- raft/raft.go | 2 +- raft/tracker/inflights.go | 56 +++++++++----- raft/tracker/inflights_test.go | 137 ++++++++++++++++++++++++++------- raft/tracker/progress.go | 2 +- raft/tracker/progress_test.go | 18 ++--- 6 files changed, 157 insertions(+), 60 deletions(-) diff --git a/raft/confchange/confchange.go b/raft/confchange/confchange.go index dddbcc9d9b57..4e54e30f2af5 100644 --- a/raft/confchange/confchange.go +++ b/raft/confchange/confchange.go @@ -265,7 +265,7 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u // making the first index the better choice). Next: c.LastIndex, Match: 0, - Inflights: tracker.NewInflights(c.Tracker.MaxInflight), + Inflights: tracker.NewInflights(c.Tracker.MaxInflight, 0), // TODO: set maxBytes IsLearner: isLearner, // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked diff --git a/raft/raft.go b/raft/raft.go index 5b3139196ba4..33d608d41c9a 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -629,7 +629,7 @@ func (r *raft) reset(term uint64) { *pr = tracker.Progress{ Match: 0, Next: r.raftLog.lastIndex() + 1, - Inflights: tracker.NewInflights(r.prs.MaxInflight), + Inflights: tracker.NewInflights(r.prs.MaxInflight, 0), // TODO: set maxBytes IsLearner: pr.IsLearner, } if id == r.id { diff --git a/raft/tracker/inflights.go b/raft/tracker/inflights.go index 242d1cab1cd9..350728aec7dc 100644 --- a/raft/tracker/inflights.go +++ b/raft/tracker/inflights.go @@ -14,6 +14,12 @@ package tracker +// inflight describes an in-flight MsgApp message. +type inflight struct { + index uint64 // the index of the last entry inside the message + bytes uint64 // the total byte size of the entries in the message +} + // Inflights limits the number of MsgApp (represented by the largest index // contained within) sent to followers but not yet acknowledged by them. Callers // use Full() to check whether more messages can be sent, call Add() whenever @@ -22,21 +28,25 @@ package tracker type Inflights struct { // the starting index in the buffer start int - // number of inflights in the buffer - count int - // the size of the buffer - size int + count int // number of inflight messages in the buffer + bytes uint64 // number of inflight bytes + + size int // the max number of inflight messages + maxBytes uint64 // the max total byte size of inflight messages - // buffer contains the index of the last entry - // inside one message. - buffer []uint64 + // buffer is a ring buffer containing info about all in-flight messages. + buffer []inflight } -// NewInflights sets up an Inflights that allows up to 'size' inflight messages. -func NewInflights(size int) *Inflights { +// NewInflights sets up an Inflights that allows up to size inflight messages, +// with the total byte size up to maxBytes. If maxBytes is 0 then there is no +// byte size limit. The maxBytes limit is soft, i.e. we accept a single message +// that brings it from size < maxBytes to size >= maxBytes. +func NewInflights(size int, maxBytes uint64) *Inflights { return &Inflights{ - size: size, + size: size, + maxBytes: maxBytes, } } @@ -44,15 +54,15 @@ func NewInflights(size int) *Inflights { // the receiver. func (in *Inflights) Clone() *Inflights { ins := *in - ins.buffer = append([]uint64(nil), in.buffer...) + ins.buffer = append([]inflight(nil), in.buffer...) return &ins } -// Add notifies the Inflights that a new message with the given index is being -// dispatched. Full() must be called prior to Add() to verify that there is room -// for one more message, and consecutive calls to add Add() must provide a -// monotonic sequence of indexes. -func (in *Inflights) Add(inflight uint64) { +// Add notifies the Inflights that a new message with the given index and byte +// size is being dispatched. Full() must be called prior to Add() to verify that +// there is room for one more message, and consecutive calls to Add() must +// provide a monotonic sequence of indexes. +func (in *Inflights) Add(index, bytes uint64) { if in.Full() { panic("cannot add into a Full inflights") } @@ -64,8 +74,9 @@ func (in *Inflights) Add(inflight uint64) { if next >= len(in.buffer) { in.grow() } - in.buffer[next] = inflight + in.buffer[next] = inflight{index: index, bytes: bytes} in.count++ + in.bytes += bytes } // grow the inflight buffer by doubling up to inflights.size. We grow on demand @@ -78,24 +89,26 @@ func (in *Inflights) grow() { } else if newSize > in.size { newSize = in.size } - newBuffer := make([]uint64, newSize) + newBuffer := make([]inflight, newSize) copy(newBuffer, in.buffer) in.buffer = newBuffer } // FreeLE frees the inflights smaller or equal to the given `to` flight. func (in *Inflights) FreeLE(to uint64) { - if in.count == 0 || to < in.buffer[in.start] { + if in.count == 0 || to < in.buffer[in.start].index { // out of the left side of the window return } idx := in.start var i int + var bytes uint64 for i = 0; i < in.count; i++ { - if to < in.buffer[idx] { // found the first large inflight + if to < in.buffer[idx].index { // found the first large inflight break } + bytes += in.buffer[idx].bytes // increase index and maybe rotate size := in.size @@ -105,6 +118,7 @@ func (in *Inflights) FreeLE(to uint64) { } // free i inflights and set new start index in.count -= i + in.bytes -= bytes in.start = idx if in.count == 0 { // inflights is empty, reset the start index so that we don't grow the @@ -115,7 +129,7 @@ func (in *Inflights) FreeLE(to uint64) { // Full returns true if no more messages can be sent at the moment. func (in *Inflights) Full() bool { - return in.count == in.size + return in.count == in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes) } // Count returns the number of inflight messages. diff --git a/raft/tracker/inflights_test.go b/raft/tracker/inflights_test.go index fe2a1b5645fc..3514220df390 100644 --- a/raft/tracker/inflights_test.go +++ b/raft/tracker/inflights_test.go @@ -24,32 +24,38 @@ func TestInflightsAdd(t *testing.T) { // no rotating case in := &Inflights{ size: 10, - buffer: make([]uint64, 10), + buffer: make([]inflight, 10), } for i := 0; i < 5; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } wantIn := &Inflights{ start: 0, count: 5, + bytes: 510, size: 10, - // ↓------------ - buffer: []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0}, + []uint64{100, 101, 102, 103, 104, 0, 0, 0, 0, 0}), } require.Equal(t, wantIn, in) for i := 5; i < 10; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } wantIn2 := &Inflights{ start: 0, count: 10, + bytes: 1045, size: 10, - // ↓--------------------------- - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓--------------------------- + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn2, in) @@ -57,41 +63,47 @@ func TestInflightsAdd(t *testing.T) { in2 := &Inflights{ start: 5, size: 10, - buffer: make([]uint64, 10), + buffer: make([]inflight, 10), } for i := 0; i < 5; i++ { - in2.Add(uint64(i)) + in2.Add(uint64(i), uint64(100+i)) } wantIn21 := &Inflights{ start: 5, count: 5, + bytes: 510, size: 10, - // ↓------------ - buffer: []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4}, + []uint64{0, 0, 0, 0, 0, 100, 101, 102, 103, 104}), } require.Equal(t, wantIn21, in2) for i := 5; i < 10; i++ { - in2.Add(uint64(i)) + in2.Add(uint64(i), uint64(100+i)) } wantIn22 := &Inflights{ start: 5, count: 10, + bytes: 1045, size: 10, - // -------------- ↓------------ - buffer: []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4}, + buffer: inflightsBuffer( + // -------------- ↓------------ + []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4}, + []uint64{105, 106, 107, 108, 109, 100, 101, 102, 103, 104}), } require.Equal(t, wantIn22, in2) } func TestInflightFreeTo(t *testing.T) { // no rotating case - in := NewInflights(10) + in := NewInflights(10, 0) for i := 0; i < 10; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } in.FreeLE(0) @@ -99,9 +111,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn0 := &Inflights{ start: 1, count: 9, + bytes: 945, size: 10, - // ↓------------------------ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓------------------------ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn0, in) @@ -110,9 +125,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn := &Inflights{ start: 5, count: 5, + bytes: 535, size: 10, - // ↓------------ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn, in) @@ -121,15 +139,18 @@ func TestInflightFreeTo(t *testing.T) { wantIn2 := &Inflights{ start: 9, count: 1, + bytes: 109, size: 10, - // ↓ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn2, in) // rotating case for i := 10; i < 15; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } in.FreeLE(12) @@ -137,9 +158,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn3 := &Inflights{ start: 3, count: 2, + bytes: 227, size: 10, - // ↓----- - buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓----- + []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn3, in) @@ -149,8 +173,67 @@ func TestInflightFreeTo(t *testing.T) { start: 0, count: 0, size: 10, - // ↓ - buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓ + []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn4, in) } + +func TestInflightsFull(t *testing.T) { + for _, tc := range []struct { + name string + size int + maxBytes uint64 + fullAt int + freeLE uint64 + againAt int + }{ + {name: "always-full", size: 0, fullAt: 0}, + {name: "single-entry", size: 1, fullAt: 1, freeLE: 1, againAt: 2}, + {name: "single-entry-overflow", size: 1, maxBytes: 10, fullAt: 1, freeLE: 1, againAt: 2}, + {name: "multi-entry", size: 15, fullAt: 15, freeLE: 6, againAt: 22}, + {name: "slight-overflow", size: 8, maxBytes: 400, fullAt: 4, freeLE: 2, againAt: 7}, + {name: "exact-max-bytes", size: 8, maxBytes: 406, fullAt: 4, freeLE: 3, againAt: 8}, + {name: "larger-overflow", size: 15, maxBytes: 408, fullAt: 5, freeLE: 1, againAt: 6}, + } { + t.Run(tc.name, func(t *testing.T) { + in := NewInflights(tc.size, tc.maxBytes) + + addUntilFull := func(begin, end int) { + for i := begin; i < end; i++ { + if in.Full() { + t.Fatalf("full at %d, want %d", i, end) + } + in.Add(uint64(i), uint64(100+i)) + } + if !in.Full() { + t.Fatalf("not full at %d", end) + } + } + + addUntilFull(0, tc.fullAt) + in.FreeLE(tc.freeLE) + addUntilFull(tc.fullAt, tc.againAt) + + defer func() { + if r := recover(); r == nil { + t.Errorf("Add() did not panic") + } + }() + in.Add(100, 1024) + }) + } +} + +func inflightsBuffer(indices []uint64, sizes []uint64) []inflight { + if len(indices) != len(sizes) { + panic("len(indices) != len(sizes)") + } + buffer := make([]inflight, 0, len(indices)) + for i, idx := range indices { + buffer = append(buffer, inflight{index: idx, bytes: sizes[i]}) + } + return buffer +} diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index c6272d22daa2..adf9cb4baf00 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -141,7 +141,7 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { if entries > 0 { last := nextIndex + uint64(entries) - 1 pr.OptimisticUpdate(last) - pr.Inflights.Add(last) + pr.Inflights.Add(last, 0) // TODO: set bytes to sum(Entries[].Size()) } // If this message overflows the in-flights tracker, or it was already full, // consider this message being a probe, so that the flow is paused. diff --git a/raft/tracker/progress_test.go b/raft/tracker/progress_test.go index 974c383f0644..ed80e0da4676 100644 --- a/raft/tracker/progress_test.go +++ b/raft/tracker/progress_test.go @@ -21,8 +21,8 @@ import ( ) func TestProgressString(t *testing.T) { - ins := NewInflights(1) - ins.Add(123) + ins := NewInflights(1, 0) + ins.Add(123, 1) pr := &Progress{ Match: 1, Next: 2, @@ -53,9 +53,9 @@ func TestProgressIsPaused(t *testing.T) { } for i, tt := range tests { p := &Progress{ - State: tt.state, + State: tt.state, MsgAppFlowPaused: tt.paused, - Inflights: NewInflights(256), + Inflights: NewInflights(256, 0), } assert.Equal(t, tt.w, p.IsPaused(), i) } @@ -82,17 +82,17 @@ func TestProgressBecomeProbe(t *testing.T) { wnext uint64 }{ { - &Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256)}, + &Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256, 0)}, 2, }, { // snapshot finish - &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256)}, + &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256, 0)}, 11, }, { // snapshot failure - &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256)}, + &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256, 0)}, 2, }, } @@ -105,7 +105,7 @@ func TestProgressBecomeProbe(t *testing.T) { } func TestProgressBecomeReplicate(t *testing.T) { - p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)} + p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)} p.BecomeReplicate() assert.Equal(t, StateReplicate, p.State) assert.Equal(t, uint64(1), p.Match) @@ -113,7 +113,7 @@ func TestProgressBecomeReplicate(t *testing.T) { } func TestProgressBecomeSnapshot(t *testing.T) { - p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)} + p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)} p.BecomeSnapshot(10) assert.Equal(t, StateSnapshot, p.State) assert.Equal(t, uint64(1), p.Match) From 7bda0d7773e4017853343b219790f9cc7b28ace1 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 20 Oct 2022 16:36:44 +0100 Subject: [PATCH 08/17] raft/tracker: add MaxInflightBytes to ProgressTracker This commit plumbs the max total byte size of the Inflights type higher up the stack to the ProgressTracker. Signed-off-by: Pavel Kalinnikov --- raft/confchange/confchange.go | 2 +- raft/confchange/datadriven_test.go | 2 +- raft/confchange/quick_test.go | 2 +- raft/confchange/restore_test.go | 2 +- raft/raft.go | 9 +++++---- raft/raft_test.go | 2 +- raft/tracker/progress.go | 7 ++++--- raft/tracker/tracker.go | 8 +++++--- 8 files changed, 19 insertions(+), 15 deletions(-) diff --git a/raft/confchange/confchange.go b/raft/confchange/confchange.go index 4e54e30f2af5..bc60abf7fe2e 100644 --- a/raft/confchange/confchange.go +++ b/raft/confchange/confchange.go @@ -265,7 +265,7 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u // making the first index the better choice). Next: c.LastIndex, Match: 0, - Inflights: tracker.NewInflights(c.Tracker.MaxInflight, 0), // TODO: set maxBytes + Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes), IsLearner: isLearner, // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked diff --git a/raft/confchange/datadriven_test.go b/raft/confchange/datadriven_test.go index ab1524091c53..f179f1f43f87 100644 --- a/raft/confchange/datadriven_test.go +++ b/raft/confchange/datadriven_test.go @@ -28,7 +28,7 @@ import ( func TestConfChangeDataDriven(t *testing.T) { datadriven.Walk(t, "testdata", func(t *testing.T, path string) { - tr := tracker.MakeProgressTracker(10) + tr := tracker.MakeProgressTracker(10, 0) c := Changer{ Tracker: tr, LastIndex: 0, // incremented in this test with each cmd diff --git a/raft/confchange/quick_test.go b/raft/confchange/quick_test.go index 16d72c199ba0..76018f634d5a 100644 --- a/raft/confchange/quick_test.go +++ b/raft/confchange/quick_test.go @@ -89,7 +89,7 @@ func TestConfChangeQuick(t *testing.T) { wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) { return func(setup initialChanges, ccs confChanges) (*Changer, error) { - tr := tracker.MakeProgressTracker(10) + tr := tracker.MakeProgressTracker(10, 0) c := &Changer{ Tracker: tr, LastIndex: 10, diff --git a/raft/confchange/restore_test.go b/raft/confchange/restore_test.go index 50712c7941f4..ec45e5144ca6 100644 --- a/raft/confchange/restore_test.go +++ b/raft/confchange/restore_test.go @@ -86,7 +86,7 @@ func TestRestore(t *testing.T) { f := func(cs pb.ConfState) bool { chg := Changer{ - Tracker: tracker.MakeProgressTracker(20), + Tracker: tracker.MakeProgressTracker(20, 0), LastIndex: 10, } cfg, prs, err := Restore(chg, cs) diff --git a/raft/raft.go b/raft/raft.go index 33d608d41c9a..38d02ae27986 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -332,7 +332,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: tracker.MakeProgressTracker(c.MaxInflightMsgs), + prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, 0), // TODO: set maxBytes electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -484,7 +484,8 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // Send the actual MsgApp otherwise, and update the progress accordingly. next := pr.Next // save Next for later, as the progress update can change it - if err := pr.UpdateOnEntriesSend(len(ents), next); err != nil { + // TODO(pavelkalinnikov): set bytes to sum(Entries[].Size()) + if err := pr.UpdateOnEntriesSend(len(ents), 0 /* bytes */, next); err != nil { r.logger.Panicf("%x: %v", r.id, err) } r.send(pb.Message{ @@ -629,7 +630,7 @@ func (r *raft) reset(term uint64) { *pr = tracker.Progress{ Match: 0, Next: r.raftLog.lastIndex() + 1, - Inflights: tracker.NewInflights(r.prs.MaxInflight, 0), // TODO: set maxBytes + Inflights: tracker.NewInflights(r.prs.MaxInflight, r.prs.MaxInflightBytes), IsLearner: pr.IsLearner, } if id == r.id { @@ -1618,7 +1619,7 @@ func (r *raft) restore(s pb.Snapshot) bool { r.raftLog.restore(s) // Reset the configuration and add the (potentially updated) peers in anew. - r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) + r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight, r.prs.MaxInflightBytes) cfg, prs, err := confchange.Restore(confchange.Changer{ Tracker: r.prs, LastIndex: r.raftLog.lastIndex(), diff --git a/raft/raft_test.go b/raft/raft_test.go index 95408976b56d..29eec28c1a6c 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4706,7 +4706,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw learners[i] = true } v.id = id - v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight) + v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight, v.prs.MaxInflightBytes) if len(learners) > 0 { v.prs.Learners = map[uint64]struct{}{} } diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index adf9cb4baf00..f4e1e07d882c 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -134,14 +134,15 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { } // UpdateOnEntriesSend updates the progress on the given number of consecutive -// entries being sent in a MsgApp, appended at and after the given log index. -func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { +// entries being sent in a MsgApp, with the given total bytes size, appended at +// and after the given log index. +func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error { switch pr.State { case StateReplicate: if entries > 0 { last := nextIndex + uint64(entries) - 1 pr.OptimisticUpdate(last) - pr.Inflights.Add(last, 0) // TODO: set bytes to sum(Entries[].Size()) + pr.Inflights.Add(last, bytes) } // If this message overflows the in-flights tracker, or it was already full, // consider this message being a probe, so that the flow is paused. diff --git a/raft/tracker/tracker.go b/raft/tracker/tracker.go index 72dcc73b8667..938b7878c50c 100644 --- a/raft/tracker/tracker.go +++ b/raft/tracker/tracker.go @@ -121,13 +121,15 @@ type ProgressTracker struct { Votes map[uint64]bool - MaxInflight int + MaxInflight int + MaxInflightBytes uint64 } // MakeProgressTracker initializes a ProgressTracker. -func MakeProgressTracker(maxInflight int) ProgressTracker { +func MakeProgressTracker(maxInflight int, maxBytes uint64) ProgressTracker { p := ProgressTracker{ - MaxInflight: maxInflight, + MaxInflight: maxInflight, + MaxInflightBytes: maxBytes, Config: Config{ Voters: quorum.JointConfig{ quorum.MajorityConfig{}, From 8c9c557d85fa22d99a38a5255cdf175aaf63adf8 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 27 Oct 2022 14:52:55 +0100 Subject: [PATCH 09/17] raft: factor out payloadsSize helper Signed-off-by: Pavel Kalinnikov --- raft/raft.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 38d02ae27986..880d23c600f4 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1790,11 +1790,7 @@ func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Messa // Empty payloads are never refused. This is used both for appending an empty // entry at a new leader's term, as well as leaving a joint configuration. func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { - var s uint64 - for _, e := range ents { - s += uint64(PayloadSize(e)) - } - + s := payloadsSize(ents) if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize { // If the uncommitted tail of the Raft log is empty, allow any size // proposal. Otherwise, limit the size of the uncommitted tail of the @@ -1816,12 +1812,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) { // Fast-path for followers, who do not track or enforce the limit. return } - - var s uint64 - for _, e := range ents { - s += uint64(PayloadSize(e)) - } - if s > r.uncommittedSize { + if s := payloadsSize(ents); s > r.uncommittedSize { // uncommittedSize may underestimate the size of the uncommitted Raft // log tail but will never overestimate it. Saturate at 0 instead of // allowing overflow. @@ -1831,6 +1822,14 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) { } } +func payloadsSize(ents []pb.Entry) uint64 { + var s uint64 + for _, e := range ents { + s += uint64(PayloadSize(e)) + } + return s +} + func numOfPendingConf(ents []pb.Entry) int { n := 0 for i := range ents { From 68af01ca6e47d31509b6104a4392007b1fd561da Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 20 Oct 2022 16:39:49 +0100 Subject: [PATCH 10/17] raft: add MaxInflightBytes to Config This commit introduces the max inflight bytes setting at the Config level, and tests that raft flow control honours it. Signed-off-by: Pavel Kalinnikov --- raft/raft.go | 20 +++++++++-- raft/raft_test.go | 68 +++++++++++++++++++---------------- raft/tracker/progress_test.go | 4 +-- 3 files changed, 56 insertions(+), 36 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 880d23c600f4..180a96e9389c 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -160,6 +160,16 @@ type Config struct { // overflowing that sending buffer. TODO (xiangli): feedback to application to // limit the proposal rate? MaxInflightMsgs int + // MaxInflightBytes limits the number of in-flight bytes in append messages. + // Complements MaxInflightMsgs. Ignored if zero. + // + // This effectively bounds the bandwidth-delay product. Note that especially + // in high-latency deployments setting this too low can lead to a dramatic + // reduction in throughput. For example, with a peer that has a round-trip + // latency of 100ms to the leader and this setting is set to 1 MB, there is a + // throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops + // to 2.5 MB/s. See Little's law to understand the maths behind. + MaxInflightBytes uint64 // CheckQuorum specifies if the leader should check quorum activity. Leader // steps down when quorum is not active for an electionTimeout. @@ -228,6 +238,11 @@ func (c *Config) validate() error { if c.MaxInflightMsgs <= 0 { return errors.New("max inflight messages must be greater than 0") } + if c.MaxInflightBytes == 0 { + c.MaxInflightBytes = noLimit + } else if c.MaxInflightBytes < c.MaxSizePerMsg { + return errors.New("max inflight bytes must be >= max message size") + } if c.Logger == nil { c.Logger = getLogger() @@ -332,7 +347,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, 0), // TODO: set maxBytes + prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -484,8 +499,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // Send the actual MsgApp otherwise, and update the progress accordingly. next := pr.Next // save Next for later, as the progress update can change it - // TODO(pavelkalinnikov): set bytes to sum(Entries[].Size()) - if err := pr.UpdateOnEntriesSend(len(ents), 0 /* bytes */, next); err != nil { + if err := pr.UpdateOnEntriesSend(len(ents), payloadsSize(ents), next); err != nil { r.logger.Panicf("%x: %v", r.id, err) } r.send(pb.Message{ diff --git a/raft/raft_test.go b/raft/raft_test.go index 29eec28c1a6c..6563b1748998 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -130,6 +130,7 @@ func TestProgressFlowControl(t *testing.T) { cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) cfg.MaxInflightMsgs = 3 cfg.MaxSizePerMsg = 2048 + cfg.MaxInflightBytes = 9000 // A little over MaxInflightMsgs * MaxSizePerMsg. r := newRaft(cfg) r.becomeCandidate() r.becomeLeader() @@ -140,7 +141,12 @@ func TestProgressFlowControl(t *testing.T) { // While node 2 is in probe state, propose a bunch of entries. r.prs.Progress[2].BecomeProbe() blob := []byte(strings.Repeat("a", 1000)) - for i := 0; i < 10; i++ { + large := []byte(strings.Repeat("b", 5000)) + for i := 0; i < 22; i++ { + blob := blob + if i >= 10 && i < 16 { // Temporarily send large messages. + blob = large + } r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) } @@ -158,40 +164,40 @@ func TestProgressFlowControl(t *testing.T) { t.Fatalf("unexpected entry sizes: %v", ms[0].Entries) } - // When this append is acked, we change to replicate state and can - // send multiple messages at once. - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index}) - ms = r.readMessages() - if len(ms) != 3 { - t.Fatalf("expected 3 messages, got %d", len(ms)) - } - for i, m := range ms { - if m.Type != pb.MsgApp { - t.Errorf("%d: expected MsgApp, got %s", i, m.Type) + ackAndVerify := func(index uint64, expEntries ...int) uint64 { + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: index}) + ms := r.readMessages() + if got, want := len(ms), len(expEntries); got != want { + t.Fatalf("expected %d messages, got %d", want, got) } - if len(m.Entries) != 2 { - t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries)) + for i, m := range ms { + if got, want := m.Type, pb.MsgApp; got != want { + t.Errorf("%d: expected MsgApp, got %s", i, got) + } + if got, want := len(m.Entries), expEntries[i]; got != want { + t.Errorf("%d: expected %d entries, got %d", i, want, got) + } } - } - - // Ack all three of those messages together and get the last two - // messages (containing three entries). - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index}) - ms = r.readMessages() - if len(ms) != 2 { - t.Fatalf("expected 2 messages, got %d", len(ms)) - } - for i, m := range ms { - if m.Type != pb.MsgApp { - t.Errorf("%d: expected MsgApp, got %s", i, m.Type) + last := ms[len(ms)-1].Entries + if len(last) == 0 { + return index } + return last[len(last)-1].Index } - if len(ms[0].Entries) != 2 { - t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries)) - } - if len(ms[1].Entries) != 1 { - t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries)) - } + + // When this append is acked, we change to replicate state and can + // send multiple messages at once. + index := ackAndVerify(ms[0].Entries[1].Index, 2, 2, 2) + // Ack all three of those messages together and get another 3 messages. The + // third message contains a single large entry, in contrast to 2 before. + index = ackAndVerify(index, 2, 1, 1) + // All subsequent messages contain one large entry, and we cap at 2 messages + // because it overflows MaxInflightBytes. + index = ackAndVerify(index, 1, 1) + index = ackAndVerify(index, 1, 1) + // Start getting small messages again. + index = ackAndVerify(index, 1, 2, 2) + ackAndVerify(index, 2) } func TestUncommittedEntryLimit(t *testing.T) { diff --git a/raft/tracker/progress_test.go b/raft/tracker/progress_test.go index ed80e0da4676..49dedb536b25 100644 --- a/raft/tracker/progress_test.go +++ b/raft/tracker/progress_test.go @@ -53,9 +53,9 @@ func TestProgressIsPaused(t *testing.T) { } for i, tt := range tests { p := &Progress{ - State: tt.state, + State: tt.state, MsgAppFlowPaused: tt.paused, - Inflights: NewInflights(256, 0), + Inflights: NewInflights(256, 0), } assert.Equal(t, tt.w, p.IsPaused(), i) } From 0ef5df11a62c0bbd7ae8fcd0e24d8d619fca18f2 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 9 Nov 2022 18:36:32 +0000 Subject: [PATCH 11/17] raft: update changelog Signed-off-by: Pavel Kalinnikov --- CHANGELOG/CHANGELOG-3.6.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG/CHANGELOG-3.6.md b/CHANGELOG/CHANGELOG-3.6.md index 8d53c51cb7be..e51c819857b9 100644 --- a/CHANGELOG/CHANGELOG-3.6.md +++ b/CHANGELOG/CHANGELOG-3.6.md @@ -46,6 +46,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). - Package `wal` was moved to `storage/wal` - Package `datadir` was moved to `storage/datadir` +### Package `raft` +- Send empty `MsgApp` when entry in-flight limits are exceeded. See [pull/14633](https://github.com/etcd-io/etcd/pull/14633). +- Add [MaxInflightBytes](https://github.com/etcd-io/etcd/pull/14624) setting in `raft.Config` for better flow control of entries. + ### etcd server - Add [`etcd --log-format`](https://github.com/etcd-io/etcd/pull/13339) flag to support log format. From 3f18816e7d7d43c72cac3a560237ad99b5c21d95 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Mon, 14 Nov 2022 11:02:18 +0800 Subject: [PATCH 12/17] etcdserver: add gofail points before and after OnPreCommitUnsafe Signed-off-by: Benjamin Wang --- server/storage/backend/batch_tx.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 4ee05810f242..63b422fbb55a 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -337,7 +337,9 @@ func (t *batchTxBuffered) commit(stop bool) { func (t *batchTxBuffered) unsafeCommit(stop bool) { if t.backend.hooks != nil { + // gofail: var commitBeforePreCommitHook struct{} t.backend.hooks.OnPreCommitUnsafe(t) + // gofail: var commitAfterPreCommitHook struct{} } if t.backend.readTx.tx != nil { From 2532ca84d10fddc3f064b46825a4b39cd6874237 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Mon, 14 Nov 2022 11:11:48 +0800 Subject: [PATCH 13/17] test: add failpoints into linearizablity test for commitBeforePreCommitHook and commitAfterPreCommitHook Signed-off-by: Benjamin Wang --- tests/linearizability/failpoints.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/tests/linearizability/failpoints.go b/tests/linearizability/failpoints.go index a9d01392fe0d..17384619c18f 100644 --- a/tests/linearizability/failpoints.go +++ b/tests/linearizability/failpoints.go @@ -31,16 +31,19 @@ import ( ) var ( - KillFailpoint Failpoint = killFailpoint{} - DefragBeforeCopyPanic Failpoint = goFailpoint{"backend/defragBeforeCopy", "panic", triggerDefrag} - DefragBeforeRenamePanic Failpoint = goFailpoint{"backend/defragBeforeRename", "panic", triggerDefrag} - BeforeCommitPanic Failpoint = goFailpoint{"backend/beforeCommit", "panic", nil} - AfterCommitPanic Failpoint = goFailpoint{"backend/afterCommit", "panic", nil} - RaftBeforeSavePanic Failpoint = goFailpoint{"etcdserver/raftBeforeSave", "panic", nil} - RaftAfterSavePanic Failpoint = goFailpoint{"etcdserver/raftAfterSave", "panic", nil} - RandomFailpoint Failpoint = randomFailpoint{[]Failpoint{ + KillFailpoint Failpoint = killFailpoint{} + DefragBeforeCopyPanic Failpoint = goFailpoint{"backend/defragBeforeCopy", "panic", triggerDefrag} + DefragBeforeRenamePanic Failpoint = goFailpoint{"backend/defragBeforeRename", "panic", triggerDefrag} + BeforeCommitPanic Failpoint = goFailpoint{"backend/beforeCommit", "panic", nil} + AfterCommitPanic Failpoint = goFailpoint{"backend/afterCommit", "panic", nil} + RaftBeforeSavePanic Failpoint = goFailpoint{"etcdserver/raftBeforeSave", "panic", nil} + RaftAfterSavePanic Failpoint = goFailpoint{"etcdserver/raftAfterSave", "panic", nil} + CommitBeforePreCommitHookPanic Failpoint = goFailpoint{"backend/commitBeforePreCommitHook", "panic", nil} + CommitAfterPreCommitHookPanic Failpoint = goFailpoint{"backend/commitAfterPreCommitHook", "panic", nil} + RandomFailpoint Failpoint = randomFailpoint{[]Failpoint{ KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic, DefragBeforeCopyPanic, DefragBeforeRenamePanic, + CommitBeforePreCommitHookPanic, CommitAfterPreCommitHookPanic, }} // TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint raftBeforeLeaderSendPanic Failpoint = goFailpoint{"etcdserver/raftBeforeLeaderSend", "panic", nil} From c2f27a0070e1efe522055d752b1ec406614bb43c Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Mon, 14 Nov 2022 17:42:21 +0800 Subject: [PATCH 14/17] tests/e2e: disable testShellCompletion if enable cov flag The etcdctl and etcdutl built with `-tags cov` mode will show go-test result after each execution, like ``` ... PASS coverage: 0.0% of statements in ./... ``` Since the PASS is not real command, the `source completion` command will fail with command-not-found error. And there is no easy way to disable the (*testing.M).Run's output. Therefore, this patch uses build tag !cov to disable cases when enable coverage. Fixes: #14694 Signed-off-by: Wei Fu --- tests/e2e/ctl_v3_completion_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/e2e/ctl_v3_completion_test.go b/tests/e2e/ctl_v3_completion_test.go index b91edcde8217..fe41eb7685b9 100644 --- a/tests/e2e/ctl_v3_completion_test.go +++ b/tests/e2e/ctl_v3_completion_test.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !cov +// +build !cov + package e2e import ( @@ -33,6 +36,14 @@ func TestUtlV3CompletionBash(t *testing.T) { testShellCompletion(t, e2e.BinPath.Etcdutl, "bash") } +// testShellCompletion can only run in non-coverage mode. The etcdctl and etcdutl +// built with `-tags cov` mode will show go-test result after each execution, like +// +// PASS +// coverage: 0.0% of statements in ./... +// +// Since the PASS is not real command, the `source completion" fails with +// command-not-found error. func testShellCompletion(t *testing.T, binPath, shellName string) { e2e.BeforeTest(t) From 5b163aa50752c08705a0e8d8b6a6b36beb054b16 Mon Sep 17 00:00:00 2001 From: Thomas Jungblut Date: Mon, 14 Nov 2022 10:16:47 +0100 Subject: [PATCH 15/17] Expect exit code enhancement ExpectProcess and ExpectFunc now take the exit code of the process into account, not just the matching of the tty output. This also refactors the many tests that were previously succeeding on matching an output from a failing cmd execution. Signed-off-by: Thomas Jungblut --- pkg/expect/expect.go | 213 ++++++++++++++++++--------- pkg/expect/expect_test.go | 58 +++++++- tests/common/txn_test.go | 48 +++--- tests/e2e/ctl_v3_auth_test.go | 143 ++++++++---------- tests/e2e/ctl_v3_elect_test.go | 19 ++- tests/e2e/ctl_v3_kv_test.go | 33 +++-- tests/e2e/ctl_v3_lock_test.go | 18 ++- tests/e2e/ctl_v3_move_leader_test.go | 16 +- tests/e2e/ctl_v3_snapshot_test.go | 6 +- tests/e2e/ctl_v3_test.go | 6 +- tests/e2e/ctl_v3_txn_test.go | 20 ++- tests/e2e/etcd_config_test.go | 8 +- tests/e2e/gateway_test.go | 5 +- tests/e2e/no_quorum_ready_test.go | 4 +- tests/e2e/utl_migrate_test.go | 7 +- tests/e2e/v3_cipher_suite_test.go | 25 ++-- tests/e2e/v3_curl_maxstream_test.go | 3 +- tests/e2e/v3_curl_test.go | 6 +- tests/framework/e2e/cluster.go | 21 ++- tests/framework/e2e/curl.go | 24 ++- tests/framework/e2e/etcd_process.go | 16 +- tests/framework/e2e/util.go | 25 ++-- tests/linearizability/failpoints.go | 4 +- 23 files changed, 451 insertions(+), 277 deletions(-) diff --git a/pkg/expect/expect.go b/pkg/expect/expect.go index c565a73bfbc8..afc9dbe3b2db 100644 --- a/pkg/expect/expect.go +++ b/pkg/expect/expect.go @@ -19,6 +19,7 @@ package expect import ( "bufio" "context" + "errors" "fmt" "io" "os" @@ -33,6 +34,10 @@ import ( const DEBUG_LINES_TAIL = 40 +var ( + ErrProcessRunning = fmt.Errorf("process is still running") +) + type ExpectProcess struct { cfg expectConfig @@ -40,11 +45,12 @@ type ExpectProcess struct { fpty *os.File wg sync.WaitGroup - mu sync.Mutex // protects lines and err - lines []string - count int // increment whenever new line gets added - cur int // current read position - err error + mu sync.Mutex // protects lines, count, cur, exitErr and exitCode + lines []string + count int // increment whenever new line gets added + cur int // current read position + exitErr error // process exit error + exitCode int } // NewExpect creates a new process for expect testing. @@ -69,8 +75,9 @@ func NewExpectWithEnv(name string, args []string, env []string, serverProcessCon return nil, err } - ep.wg.Add(1) + ep.wg.Add(2) go ep.read() + go ep.waitSaveExitErr() return ep, nil } @@ -95,46 +102,83 @@ func (ep *ExpectProcess) Pid() int { func (ep *ExpectProcess) read() { defer ep.wg.Done() - printDebugLines := os.Getenv("EXPECT_DEBUG") != "" + defer func(fpty *os.File) { + err := fpty.Close() + if err != nil { + // we deliberately only log the error here, closing the PTY should mostly be (expected) broken pipes + fmt.Printf("error while closing fpty: %v", err) + } + }(ep.fpty) + r := bufio.NewReader(ep.fpty) for { - l, err := r.ReadString('\n') - ep.mu.Lock() - if l != "" { - if printDebugLines { - fmt.Printf("%s (%s) (%d): %s", ep.cmd.Path, ep.cfg.name, ep.cmd.Process.Pid, l) - } - ep.lines = append(ep.lines, l) - ep.count++ - } + err := ep.tryReadNextLine(r) if err != nil { - ep.err = err - ep.mu.Unlock() break } - ep.mu.Unlock() + } +} + +func (ep *ExpectProcess) tryReadNextLine(r *bufio.Reader) error { + printDebugLines := os.Getenv("EXPECT_DEBUG") != "" + l, err := r.ReadString('\n') + + ep.mu.Lock() + defer ep.mu.Unlock() + + if l != "" { + if printDebugLines { + fmt.Printf("%s (%s) (%d): %s", ep.cmd.Path, ep.cfg.name, ep.cmd.Process.Pid, l) + } + ep.lines = append(ep.lines, l) + ep.count++ + } + + // we're checking the error here at the bottom to ensure any leftover reads are still taken into account + return err +} + +func (ep *ExpectProcess) waitSaveExitErr() { + defer ep.wg.Done() + err := ep.waitProcess() + + ep.mu.Lock() + defer ep.mu.Unlock() + if err != nil { + ep.exitErr = err } } // ExpectFunc returns the first line satisfying the function f. func (ep *ExpectProcess) ExpectFunc(ctx context.Context, f func(string) bool) (string, error) { i := 0 - for { - ep.mu.Lock() - for i < len(ep.lines) { - line := ep.lines[i] - i++ - if f(line) { - ep.mu.Unlock() - return line, nil + line, errsFound := func() (string, bool) { + ep.mu.Lock() + defer ep.mu.Unlock() + + // check if this expect has been already closed + if ep.cmd == nil { + return "", true } + + for i < len(ep.lines) { + line := ep.lines[i] + i++ + if f(line) { + return line, false + } + } + return "", ep.exitErr != nil + }() + + if line != "" { + return line, nil } - if ep.err != nil { - ep.mu.Unlock() + + if errsFound { break } - ep.mu.Unlock() select { case <-ctx.Done(): @@ -143,16 +187,18 @@ func (ep *ExpectProcess) ExpectFunc(ctx context.Context, f func(string) bool) (s // continue loop } } + ep.mu.Lock() + defer ep.mu.Unlock() + lastLinesIndex := len(ep.lines) - DEBUG_LINES_TAIL if lastLinesIndex < 0 { lastLinesIndex = 0 } lastLines := strings.Join(ep.lines[lastLinesIndex:], "") - ep.mu.Unlock() - return "", fmt.Errorf("match not found."+ - " Set EXPECT_DEBUG for more info Err: %v, last lines:\n%s", - ep.err, lastLines) + return "", fmt.Errorf("match not found. "+ + " Set EXPECT_DEBUG for more info Errs: [%v], last lines:\n%s", + ep.exitErr, lastLines) } // ExpectWithContext returns the first line containing the given string. @@ -174,47 +220,85 @@ func (ep *ExpectProcess) LineCount() int { return ep.count } -// Stop kills the expect process and waits for it to exit. -func (ep *ExpectProcess) Stop() error { return ep.close(true) } +// ExitCode returns the exit code of this process. +// If the process is still running, it returns exit code 0 and ErrProcessRunning. +func (ep *ExpectProcess) ExitCode() (int, error) { + ep.mu.Lock() + defer ep.mu.Unlock() -// Signal sends a signal to the expect process -func (ep *ExpectProcess) Signal(sig os.Signal) error { - return ep.cmd.Process.Signal(sig) + if ep.cmd == nil { + return ep.exitCode, nil + } + + return 0, ErrProcessRunning } -func (ep *ExpectProcess) Wait() error { - _, err := ep.cmd.Process.Wait() +// ExitError returns the exit error of this process (if any). +// If the process is still running, it returns ErrProcessRunning instead. +func (ep *ExpectProcess) ExitError() error { + ep.mu.Lock() + defer ep.mu.Unlock() + + if ep.cmd == nil { + return ep.exitErr + } + + return ErrProcessRunning +} + +// Stop signals the process to terminate via SIGTERM +func (ep *ExpectProcess) Stop() error { + err := ep.Signal(syscall.SIGTERM) + if err != nil && strings.Contains(err.Error(), "os: process already finished") { + return nil + } return err } -// Close waits for the expect process to exit. -// Close currently does not return error if process exited with !=0 status. -// TODO: Close should expose underlying process failure by default. -func (ep *ExpectProcess) Close() error { return ep.close(false) } +// Signal sends a signal to the expect process +func (ep *ExpectProcess) Signal(sig os.Signal) error { + ep.mu.Lock() + defer ep.mu.Unlock() -func (ep *ExpectProcess) close(kill bool) error { if ep.cmd == nil { - return ep.err - } - if kill { - ep.Signal(syscall.SIGTERM) + return errors.New("expect process already closed") } - err := ep.cmd.Wait() - ep.fpty.Close() - ep.wg.Wait() + return ep.cmd.Process.Signal(sig) +} +func (ep *ExpectProcess) waitProcess() error { + state, err := ep.cmd.Process.Wait() if err != nil { - if !kill && strings.Contains(err.Error(), "exit status") { - // non-zero exit code - err = nil - } else if kill && strings.Contains(err.Error(), "signal:") { - err = nil - } + return err } + ep.mu.Lock() + defer ep.mu.Unlock() + ep.exitCode = state.ExitCode() + + if !state.Success() { + return fmt.Errorf("unexpected exit code [%d] after running [%s]", ep.exitCode, ep.cmd.String()) + } + + return nil +} + +// Wait waits for the process to finish. +func (ep *ExpectProcess) Wait() { + ep.wg.Wait() +} + +// Close waits for the expect process to exit and return its error. +func (ep *ExpectProcess) Close() error { + ep.wg.Wait() + + ep.mu.Lock() + defer ep.mu.Unlock() + + // this signals to other funcs that the process has finished ep.cmd = nil - return err + return ep.exitErr } func (ep *ExpectProcess) Send(command string) error { @@ -222,15 +306,6 @@ func (ep *ExpectProcess) Send(command string) error { return err } -func (ep *ExpectProcess) ProcessError() error { - if strings.Contains(ep.err.Error(), "input/output error") { - // TODO: The expect library should not return - // `/dev/ptmx: input/output error` when process just exits. - return nil - } - return ep.err -} - func (ep *ExpectProcess) Lines() []string { ep.mu.Lock() defer ep.mu.Unlock() diff --git a/pkg/expect/expect_test.go b/pkg/expect/expect_test.go index 65746851cef4..6fc6cdb83eb8 100644 --- a/pkg/expect/expect_test.go +++ b/pkg/expect/expect_test.go @@ -19,9 +19,11 @@ package expect import ( "context" "os" + "strings" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -65,9 +67,57 @@ func TestExpectFuncTimeout(t *testing.T) { require.ErrorAs(t, err, &context.DeadlineExceeded) - if err = ep.Stop(); err != nil { + if err := ep.Stop(); err != nil { + t.Fatal(err) + } + + err = ep.Close() + require.ErrorContains(t, err, "unexpected exit code [-1] after running [/usr/bin/tail -f /dev/null]") + require.Equal(t, -1, ep.exitCode) +} + +func TestExpectFuncExitFailure(t *testing.T) { + // tail -x should not exist and return a non-zero exit code + ep, err := NewExpect("tail", "-x") + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + _, err = ep.ExpectFunc(ctx, func(s string) bool { + return strings.Contains(s, "something entirely unexpected") + }) + require.ErrorContains(t, err, "unexpected exit code [1] after running [/usr/bin/tail -x]") + require.Equal(t, 1, ep.exitCode) +} + +func TestExpectFuncExitFailureStop(t *testing.T) { + // tail -x should not exist and return a non-zero exit code + ep, err := NewExpect("tail", "-x") + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + _, err = ep.ExpectFunc(ctx, func(s string) bool { + return strings.Contains(s, "something entirely unexpected") + }) + require.ErrorContains(t, err, "unexpected exit code [1] after running [/usr/bin/tail -x]") + exitCode, err := ep.ExitCode() + require.Equal(t, 0, exitCode) + require.Equal(t, err, ErrProcessRunning) + if err := ep.Stop(); err != nil { t.Fatal(err) } + err = ep.Close() + require.ErrorContains(t, err, "unexpected exit code [1] after running [/usr/bin/tail -x]") + exitCode, err = ep.ExitCode() + require.Equal(t, 1, exitCode) + require.NoError(t, err) } func TestEcho(t *testing.T) { @@ -138,10 +188,8 @@ func TestSignal(t *testing.T) { donec := make(chan struct{}) go func() { defer close(donec) - werr := "signal: interrupt" - if cerr := ep.Close(); cerr == nil || cerr.Error() != werr { - t.Errorf("got error %v, wanted error %s", cerr, werr) - } + err = ep.Close() + assert.ErrorContains(t, err, "unexpected exit code [-1] after running [/usr/bin/sleep 100]") }() select { case <-time.After(5 * time.Second): diff --git a/tests/common/txn_test.go b/tests/common/txn_test.go index 8e39429253ad..5ffee4e155a7 100644 --- a/tests/common/txn_test.go +++ b/tests/common/txn_test.go @@ -28,30 +28,30 @@ import ( ) type txnReq struct { - compare []string - ifSucess []string - ifFail []string - results []string + compare []string + ifSuccess []string + ifFail []string + results []string } func TestTxnSucc(t *testing.T) { testRunner.BeforeTest(t) reqs := []txnReq{ { - compare: []string{`value("key1") != "value2"`, `value("key2") != "value1"`}, - ifSucess: []string{"get key1", "get key2"}, - results: []string{"SUCCESS", "key1", "value1", "key2", "value2"}, + compare: []string{`value("key1") != "value2"`, `value("key2") != "value1"`}, + ifSuccess: []string{"get key1", "get key2"}, + results: []string{"SUCCESS", "key1", "value1", "key2", "value2"}, }, { - compare: []string{`version("key1") = "1"`, `version("key2") = "1"`}, - ifSucess: []string{"get key1", "get key2", `put "key \"with\" space" "value \x23"`}, - ifFail: []string{`put key1 "fail"`, `put key2 "fail"`}, - results: []string{"SUCCESS", "key1", "value1", "key2", "value2", "OK"}, + compare: []string{`version("key1") = "1"`, `version("key2") = "1"`}, + ifSuccess: []string{"get key1", "get key2", `put "key \"with\" space" "value \x23"`}, + ifFail: []string{`put key1 "fail"`, `put key2 "fail"`}, + results: []string{"SUCCESS", "key1", "value1", "key2", "value2", "OK"}, }, { - compare: []string{`version("key \"with\" space") = "1"`}, - ifSucess: []string{`get "key \"with\" space"`}, - results: []string{"SUCCESS", `key "with" space`, "value \x23"}, + compare: []string{`version("key \"with\" space") = "1"`}, + ifSuccess: []string{`get "key \"with\" space"`}, + results: []string{"SUCCESS", `key "with" space`, "value \x23"}, }, } for _, cfg := range clusterTestCases() { @@ -69,7 +69,7 @@ func TestTxnSucc(t *testing.T) { t.Fatalf("could not create key:%s, value:%s", "key2", "value2") } for _, req := range reqs { - resp, err := cc.Txn(ctx, req.compare, req.ifSucess, req.ifFail, config.TxnOptions{ + resp, err := cc.Txn(ctx, req.compare, req.ifSuccess, req.ifFail, config.TxnOptions{ Interactive: true, }) if err != nil { @@ -86,16 +86,16 @@ func TestTxnFail(t *testing.T) { testRunner.BeforeTest(t) reqs := []txnReq{ { - compare: []string{`version("key") < "0"`}, - ifSucess: []string{`put key "success"`}, - ifFail: []string{`put key "fail"`}, - results: []string{"FAILURE", "OK"}, + compare: []string{`version("key") < "0"`}, + ifSuccess: []string{`put key "success"`}, + ifFail: []string{`put key "fail"`}, + results: []string{"FAILURE", "OK"}, }, { - compare: []string{`value("key1") != "value1"`}, - ifSucess: []string{`put key1 "success"`}, - ifFail: []string{`put key1 "fail"`}, - results: []string{"FAILURE", "OK"}, + compare: []string{`value("key1") != "value1"`}, + ifSuccess: []string{`put key1 "success"`}, + ifFail: []string{`put key1 "fail"`}, + results: []string{"FAILURE", "OK"}, }, } for _, cfg := range clusterTestCases() { @@ -110,7 +110,7 @@ func TestTxnFail(t *testing.T) { t.Fatalf("could not create key:%s, value:%s", "key1", "value1") } for _, req := range reqs { - resp, err := cc.Txn(ctx, req.compare, req.ifSucess, req.ifFail, config.TxnOptions{ + resp, err := cc.Txn(ctx, req.compare, req.ifSuccess, req.ifFail, config.TxnOptions{ Interactive: true, }) if err != nil { diff --git a/tests/e2e/ctl_v3_auth_test.go b/tests/e2e/ctl_v3_auth_test.go index bf4f5251379f..facd71eadba6 100644 --- a/tests/e2e/ctl_v3_auth_test.go +++ b/tests/e2e/ctl_v3_auth_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -118,9 +119,8 @@ func authDisableTest(cx ctlCtx) { // test-user doesn't have the permission, it must fail cx.user, cx.pass = "test-user", "pass" - if err := ctlV3PutFailPerm(cx, "hoo", "bar"); err != nil { - cx.t.Fatal(err) - } + err := ctlV3PutFailPerm(cx, "hoo", "bar") + require.ErrorContains(cx.t, err, "permission denied") cx.user, cx.pass = "root", "root" if err := ctlV3AuthDisable(cx); err != nil { @@ -241,9 +241,9 @@ func authCredWriteKeyTest(cx ctlCtx) { // try invalid user cx.user, cx.pass = "a", "b" - if err := ctlV3PutFailAuth(cx, "foo", "bar"); err != nil { - cx.t.Fatal(err) - } + err := ctlV3PutFailAuth(cx, "foo", "bar") + require.ErrorContains(cx.t, err, "authentication failed") + // confirm put failed cx.user, cx.pass = "test-user", "pass" if err := ctlV3Get(cx, []string{"foo"}, []kv{{"foo", "bar"}}...); err != nil { @@ -262,9 +262,9 @@ func authCredWriteKeyTest(cx ctlCtx) { // try bad password cx.user, cx.pass = "test-user", "badpass" - if err := ctlV3PutFailAuth(cx, "foo", "baz"); err != nil { - cx.t.Fatal(err) - } + err = ctlV3PutFailAuth(cx, "foo", "baz") + require.ErrorContains(cx.t, err, "authentication failed") + // confirm put failed cx.user, cx.pass = "test-user", "pass" if err := ctlV3Get(cx, []string{"foo"}, []kv{{"foo", "bar2"}}...); err != nil { @@ -286,9 +286,8 @@ func authRoleUpdateTest(cx ctlCtx) { // try put to not granted key cx.user, cx.pass = "test-user", "pass" - if err := ctlV3PutFailPerm(cx, "hoo", "bar"); err != nil { - cx.t.Fatal(err) - } + err := ctlV3PutFailPerm(cx, "hoo", "bar") + require.ErrorContains(cx.t, err, "permission denied") // grant a new key cx.user, cx.pass = "root", "root" @@ -314,9 +313,8 @@ func authRoleUpdateTest(cx ctlCtx) { // try put to the revoked key cx.user, cx.pass = "test-user", "pass" - if err := ctlV3PutFailPerm(cx, "hoo", "bar"); err != nil { - cx.t.Fatal(err) - } + err = ctlV3PutFailPerm(cx, "hoo", "bar") + require.ErrorContains(cx.t, err, "permission denied") // confirm a key still granted can be accessed if err := ctlV3Get(cx, []string{"foo"}, []kv{{"foo", "bar"}}...); err != nil { @@ -355,9 +353,8 @@ func authUserDeleteDuringOpsTest(cx ctlCtx) { // check the user is deleted cx.user, cx.pass = "test-user", "pass" - if err := ctlV3PutFailAuth(cx, "foo", "baz"); err != nil { - cx.t.Fatal(err) - } + err = ctlV3PutFailAuth(cx, "foo", "baz") + require.ErrorContains(cx.t, err, "authentication failed") } func authRoleRevokeDuringOpsTest(cx ctlCtx) { @@ -415,9 +412,8 @@ func authRoleRevokeDuringOpsTest(cx ctlCtx) { // check the role is revoked and permission is lost from the user cx.user, cx.pass = "test-user", "pass" - if err := ctlV3PutFailPerm(cx, "foo", "baz"); err != nil { - cx.t.Fatal(err) - } + err = ctlV3PutFailPerm(cx, "foo", "baz") + require.ErrorContains(cx.t, err, "permission denied") // try a key that can be accessed from the remaining role cx.user, cx.pass = "test-user", "pass" @@ -492,45 +488,45 @@ func authTestTxn(cx ctlCtx) { cx.user, cx.pass = "test-user", "pass" rqs := txnRequests{ - compare: []string{`version("c2") = "1"`}, - ifSucess: []string{"get s2"}, - ifFail: []string{"get f2"}, - results: []string{"SUCCESS", "s2", "v"}, + compare: []string{`version("c2") = "1"`}, + ifSuccess: []string{"get s2"}, + ifFail: []string{"get f2"}, + results: []string{"SUCCESS", "s2", "v"}, } - if err := ctlV3Txn(cx, rqs); err != nil { + if err := ctlV3Txn(cx, rqs, false); err != nil { cx.t.Fatal(err) } // a key of compare case isn't granted rqs = txnRequests{ - compare: []string{`version("c1") = "1"`}, - ifSucess: []string{"get s2"}, - ifFail: []string{"get f2"}, - results: []string{"Error: etcdserver: permission denied"}, + compare: []string{`version("c1") = "1"`}, + ifSuccess: []string{"get s2"}, + ifFail: []string{"get f2"}, + results: []string{"Error: etcdserver: permission denied"}, } - if err := ctlV3Txn(cx, rqs); err != nil { + if err := ctlV3Txn(cx, rqs, true); err != nil { cx.t.Fatal(err) } // a key of success case isn't granted rqs = txnRequests{ - compare: []string{`version("c2") = "1"`}, - ifSucess: []string{"get s1"}, - ifFail: []string{"get f2"}, - results: []string{"Error: etcdserver: permission denied"}, + compare: []string{`version("c2") = "1"`}, + ifSuccess: []string{"get s1"}, + ifFail: []string{"get f2"}, + results: []string{"Error: etcdserver: permission denied"}, } - if err := ctlV3Txn(cx, rqs); err != nil { + if err := ctlV3Txn(cx, rqs, true); err != nil { cx.t.Fatal(err) } // a key of failure case isn't granted rqs = txnRequests{ - compare: []string{`version("c2") = "1"`}, - ifSucess: []string{"get s2"}, - ifFail: []string{"get f1"}, - results: []string{"Error: etcdserver: permission denied"}, + compare: []string{`version("c2") = "1"`}, + ifSuccess: []string{"get s2"}, + ifFail: []string{"get f1"}, + results: []string{"Error: etcdserver: permission denied"}, } - if err := ctlV3Txn(cx, rqs); err != nil { + if err := ctlV3Txn(cx, rqs, true); err != nil { cx.t.Fatal(err) } } @@ -559,9 +555,8 @@ func authTestPrefixPerm(cx ctlCtx) { } } - if err := ctlV3PutFailPerm(cx, clientv3.GetPrefixRangeEnd(prefix), "baz"); err != nil { - cx.t.Fatal(err) - } + err := ctlV3PutFailPerm(cx, clientv3.GetPrefixRangeEnd(prefix), "baz") + require.ErrorContains(cx.t, err, "permission denied") // grant the entire keys to test-user cx.user, cx.pass = "root", "root" @@ -679,11 +674,10 @@ func authTestCertCN(cx ctlCtx) { cx.t.Error(err) } - // try a non granted key + // try a non-granted key cx.user, cx.pass = "", "" - if err := ctlV3PutFailPerm(cx, "baz", "bar"); err != nil { - cx.t.Error(err) - } + err := ctlV3PutFailPerm(cx, "baz", "bar") + require.ErrorContains(cx.t, err, "permission denied") } func authTestRevokeWithDelete(cx ctlCtx) { @@ -766,9 +760,8 @@ func authTestFromKeyPerm(cx ctlCtx) { } // try a non granted key - if err := ctlV3PutFailPerm(cx, "x", "baz"); err != nil { - cx.t.Fatal(err) - } + err := ctlV3PutFailPerm(cx, "x", "baz") + require.ErrorContains(cx.t, err, "permission denied") // revoke the open ended permission cx.user, cx.pass = "root", "root" @@ -780,9 +773,8 @@ func authTestFromKeyPerm(cx ctlCtx) { cx.user, cx.pass = "test-user", "pass" for i := 0; i < 10; i++ { key := fmt.Sprintf("z%d", i) - if err := ctlV3PutFailPerm(cx, key, "val"); err != nil { - cx.t.Fatal(err) - } + err := ctlV3PutFailPerm(cx, key, "val") + require.ErrorContains(cx.t, err, "permission denied") } // grant the entire keys @@ -810,9 +802,8 @@ func authTestFromKeyPerm(cx ctlCtx) { cx.user, cx.pass = "test-user", "pass" for i := 0; i < 10; i++ { key := fmt.Sprintf("z%d", i) - if err := ctlV3PutFailPerm(cx, key, "val"); err != nil { - cx.t.Fatal(err) - } + err := ctlV3PutFailPerm(cx, key, "val") + require.ErrorContains(cx.t, err, "permission denied") } } @@ -848,9 +839,8 @@ func authLeaseTestTimeToLiveExpired(cx ctlCtx) { authSetupTestUser(cx) ttl := 3 - if err := leaseTestTimeToLiveExpire(cx, ttl); err != nil { - cx.t.Fatalf("leaseTestTimeToLiveExpire: error (%v)", err) - } + err := leaseTestTimeToLiveExpire(cx, ttl) + require.NoError(cx.t, err) } func leaseTestTimeToLiveExpire(cx ctlCtx, ttl int) error { @@ -984,14 +974,13 @@ func authTestWatch(cx ctlCtx) { var err error if tt.want { err = ctlV3Watch(cx, tt.args, tt.wkv...) - } else { - err = ctlV3WatchFailPerm(cx, tt.args) - } - - if err != nil { - if cx.dialTimeout > 0 && !isGRPCTimedout(err) { + if err != nil && cx.dialTimeout > 0 && !isGRPCTimedout(err) { cx.t.Errorf("watchTest #%d: ctlV3Watch error (%v)", i, err) } + } else { + err = ctlV3WatchFailPerm(cx, tt.args) + // this will not have any meaningful error output, but the process fails due to the cancellation + require.ErrorContains(cx.t, err, "unexpected exit code") } <-donec @@ -1025,9 +1014,8 @@ func authTestRoleGet(cx ctlCtx) { expected = []string{ "Error: etcdserver: permission denied", } - if err := e2e.SpawnWithExpects(append(cx.PrefixArgs(), "role", "get", "root"), cx.envMap, expected...); err != nil { - cx.t.Fatal(err) - } + err := e2e.SpawnWithExpects(append(cx.PrefixArgs(), "role", "get", "root"), cx.envMap, expected...) + require.ErrorContains(cx.t, err, "permission denied") } func authTestUserGet(cx ctlCtx) { @@ -1056,9 +1044,8 @@ func authTestUserGet(cx ctlCtx) { expected = []string{ "Error: etcdserver: permission denied", } - if err := e2e.SpawnWithExpects(append(cx.PrefixArgs(), "user", "get", "root"), cx.envMap, expected...); err != nil { - cx.t.Fatal(err) - } + err := e2e.SpawnWithExpects(append(cx.PrefixArgs(), "user", "get", "root"), cx.envMap, expected...) + require.ErrorContains(cx.t, err, "permission denied") } func authTestRoleList(cx ctlCtx) { @@ -1207,16 +1194,14 @@ func certCNAndUsername(cx ctlCtx, noPassword bool) { cx.t.Error(err) } - // try a non granted key for both of them + // try a non-granted key for both of them cx.user, cx.pass = "", "" - if err := ctlV3PutFailPerm(cx, "baz", "bar"); err != nil { - cx.t.Error(err) - } + err := ctlV3PutFailPerm(cx, "baz", "bar") + require.ErrorContains(cx.t, err, "permission denied") cx.user, cx.pass = "test-user", "pass" - if err := ctlV3PutFailPerm(cx, "baz", "bar"); err != nil { - cx.t.Error(err) - } + err = ctlV3PutFailPerm(cx, "baz", "bar") + require.ErrorContains(cx.t, err, "permission denied") } func authTestCertCNAndUsername(cx ctlCtx) { diff --git a/tests/e2e/ctl_v3_elect_test.go b/tests/e2e/ctl_v3_elect_test.go index 4aecae07dba5..d40b3ae90d8b 100644 --- a/tests/e2e/ctl_v3_elect_test.go +++ b/tests/e2e/ctl_v3_elect_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/pkg/v3/expect" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -32,7 +33,7 @@ func TestCtlV3Elect(t *testing.T) { func testElect(cx ctlCtx) { name := "a" - holder, ch, err := ctlV3Elect(cx, name, "p1") + holder, ch, err := ctlV3Elect(cx, name, "p1", false) if err != nil { cx.t.Fatal(err) } @@ -48,7 +49,7 @@ func testElect(cx ctlCtx) { } // blocked process that won't win the election - blocked, ch, err := ctlV3Elect(cx, name, "p2") + blocked, ch, err := ctlV3Elect(cx, name, "p2", true) if err != nil { cx.t.Fatal(err) } @@ -59,7 +60,7 @@ func testElect(cx ctlCtx) { } // overlap with a blocker that will win the election - blockAcquire, ch, err := ctlV3Elect(cx, name, "p2") + blockAcquire, ch, err := ctlV3Elect(cx, name, "p2", false) if err != nil { cx.t.Fatal(err) } @@ -74,8 +75,10 @@ func testElect(cx ctlCtx) { if err = blocked.Signal(os.Interrupt); err != nil { cx.t.Fatal(err) } - if err = e2e.CloseWithTimeout(blocked, time.Second); err != nil { - cx.t.Fatal(err) + err = e2e.CloseWithTimeout(blocked, time.Second) + if err != nil { + // due to being blocked, this can potentially get killed and thus exit non-zero sometimes + require.ErrorContains(cx.t, err, "unexpected exit code") } // kill the holder with clean shutdown @@ -98,7 +101,7 @@ func testElect(cx ctlCtx) { } // ctlV3Elect creates a elect process with a channel listening for when it wins the election. -func ctlV3Elect(cx ctlCtx, name, proposal string) (*expect.ExpectProcess, <-chan string, error) { +func ctlV3Elect(cx ctlCtx, name, proposal string, expectFailure bool) (*expect.ExpectProcess, <-chan string, error) { cmdArgs := append(cx.PrefixArgs(), "elect", name, proposal) proc, err := e2e.SpawnCmd(cmdArgs, cx.envMap) outc := make(chan string, 1) @@ -109,7 +112,9 @@ func ctlV3Elect(cx ctlCtx, name, proposal string) (*expect.ExpectProcess, <-chan go func() { s, xerr := proc.ExpectFunc(context.TODO(), func(string) bool { return true }) if xerr != nil { - cx.t.Errorf("expect failed (%v)", xerr) + if !expectFailure { + cx.t.Errorf("expect failed (%v)", xerr) + } } outc <- s }() diff --git a/tests/e2e/ctl_v3_kv_test.go b/tests/e2e/ctl_v3_kv_test.go index a52239cdc663..c7f3a951ffb9 100644 --- a/tests/e2e/ctl_v3_kv_test.go +++ b/tests/e2e/ctl_v3_kv_test.go @@ -15,10 +15,12 @@ package e2e import ( + "context" "fmt" - "strings" "testing" + "time" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -50,9 +52,9 @@ func TestCtlV3GetRevokedCRL(t *testing.T) { func testGetRevokedCRL(cx ctlCtx) { // test reject - if err := ctlV3Put(cx, "k", "v", ""); err == nil || !strings.Contains(err.Error(), "Error:") { - cx.t.Fatalf("expected reset connection on put, got %v", err) - } + err := ctlV3Put(cx, "k", "v", "") + require.ErrorContains(cx.t, err, "context deadline exceeded") + // test accept cx.epc.Cfg.IsClientCRL = false if err := ctlV3Put(cx, "k", "v", ""); err != nil { @@ -216,9 +218,13 @@ func getKeysOnlyTest(cx ctlCtx) { if err := e2e.SpawnWithExpectWithEnv(cmdArgs, cx.envMap, "key"); err != nil { cx.t.Fatal(err) } - if err := e2e.SpawnWithExpects(cmdArgs, cx.envMap, "val"); err == nil { - cx.t.Fatalf("got value but passed --keys-only") - } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + lines, err := e2e.SpawnWithExpectLines(ctx, cmdArgs, cx.envMap, "key") + require.NoError(cx.t, err) + require.NotContains(cx.t, lines, "val", "got value but passed --keys-only") } func getCountOnlyTest(cx ctlCtx) { @@ -250,13 +256,14 @@ func getCountOnlyTest(cx ctlCtx) { if err := e2e.SpawnWithExpects(cmdArgs, cx.envMap, "\"Count\" : 3"); err != nil { cx.t.Fatal(err) } - expected := []string{ - "\"Count\" : 3", - } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + cmdArgs = append(cx.PrefixArgs(), []string{"get", "--count-only", "key3", "--prefix", "--write-out=fields"}...) - if err := e2e.SpawnWithExpects(cmdArgs, cx.envMap, expected...); err == nil { - cx.t.Fatal(err) - } + lines, err := e2e.SpawnWithExpectLines(ctx, cmdArgs, cx.envMap, "\"Count\"") + require.NoError(cx.t, err) + require.NotContains(cx.t, lines, "\"Count\" : 3") } func delTest(cx ctlCtx) { diff --git a/tests/e2e/ctl_v3_lock_test.go b/tests/e2e/ctl_v3_lock_test.go index ef9d8a73fa89..82e6390b1e32 100644 --- a/tests/e2e/ctl_v3_lock_test.go +++ b/tests/e2e/ctl_v3_lock_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/pkg/v3/expect" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -79,8 +80,10 @@ func testLock(cx ctlCtx) { if err = blocked.Signal(os.Interrupt); err != nil { cx.t.Fatal(err) } - if err = e2e.CloseWithTimeout(blocked, time.Second); err != nil { - cx.t.Fatal(err) + err = e2e.CloseWithTimeout(blocked, time.Second) + if err != nil { + // due to being blocked, this can potentially get killed and thus exit non-zero sometimes + require.ErrorContains(cx.t, err, "unexpected exit code") } // kill the holder with clean shutdown @@ -113,9 +116,8 @@ func testLockWithCmd(cx ctlCtx) { code := 3 awkCmd := []string{"awk", fmt.Sprintf("BEGIN{exit %d}", code)} expect := fmt.Sprintf("Error: exit status %d", code) - if err := ctlV3LockWithCmd(cx, awkCmd, expect); err != nil { - cx.t.Fatal(err) - } + err := ctlV3LockWithCmd(cx, awkCmd, expect) + require.ErrorContains(cx.t, err, expect) } // ctlV3Lock creates a lock process with a channel listening for when it acquires the lock. @@ -130,7 +132,7 @@ func ctlV3Lock(cx ctlCtx, name string) (*expect.ExpectProcess, <-chan string, er go func() { s, xerr := proc.ExpectFunc(context.TODO(), func(string) bool { return true }) if xerr != nil { - cx.t.Errorf("expect failed (%v)", xerr) + require.ErrorContains(cx.t, xerr, "Error: context canceled") } outc <- s }() @@ -142,5 +144,7 @@ func ctlV3LockWithCmd(cx ctlCtx, execCmd []string, as ...string) error { // use command as lock name cmdArgs := append(cx.PrefixArgs(), "lock", execCmd[0]) cmdArgs = append(cmdArgs, execCmd...) - return e2e.SpawnWithExpects(cmdArgs, cx.envMap, as...) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return e2e.SpawnWithExpectsContext(ctx, cmdArgs, cx.envMap, as...) } diff --git a/tests/e2e/ctl_v3_move_leader_test.go b/tests/e2e/ctl_v3_move_leader_test.go index d9fa6f4f2f1a..59cb29ac1789 100644 --- a/tests/e2e/ctl_v3_move_leader_test.go +++ b/tests/e2e/ctl_v3_move_leader_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/client/v3" @@ -110,27 +111,34 @@ func testCtlV3MoveLeader(t *testing.T, cfg e2e.EtcdProcessClusterConfig, envVars } tests := []struct { - eps []string - expect string + eps []string + expect string + expectErr bool }{ { // request to non-leader []string{cx.epc.EndpointsV3()[(leadIdx+1)%3]}, "no leader endpoint given at ", + true, }, { // request to leader []string{cx.epc.EndpointsV3()[leadIdx]}, fmt.Sprintf("Leadership transferred from %s to %s", types.ID(leaderID), types.ID(transferee)), + false, }, { // request to all endpoints cx.epc.EndpointsV3(), fmt.Sprintf("Leadership transferred"), + false, }, } for i, tc := range tests { prefix := cx.prefixArgs(tc.eps) cmdArgs := append(prefix, "move-leader", types.ID(transferee).String()) - if err := e2e.SpawnWithExpectWithEnv(cmdArgs, cx.envMap, tc.expect); err != nil { - t.Fatalf("#%d: %v", i, err) + err := e2e.SpawnWithExpectWithEnv(cmdArgs, cx.envMap, tc.expect) + if tc.expectErr { + require.ErrorContains(t, err, tc.expect) + } else { + require.Nilf(t, err, "#%d: %v", i, err) } } } diff --git a/tests/e2e/ctl_v3_snapshot_test.go b/tests/e2e/ctl_v3_snapshot_test.go index 7c7957ca9c3f..e7401b456467 100644 --- a/tests/e2e/ctl_v3_snapshot_test.go +++ b/tests/e2e/ctl_v3_snapshot_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/etcdutl/v3/snapshot" "go.etcd.io/etcd/pkg/v3/expect" "go.etcd.io/etcd/tests/v3/framework/e2e" @@ -90,10 +91,7 @@ func snapshotCorruptTest(cx ctlCtx) { fpath), cx.envMap, "expected sha256") - - if serr != nil { - cx.t.Fatal(serr) - } + require.ErrorContains(cx.t, serr, "Error: expected sha256") } // This test ensures that the snapshot status does not modify the snapshot file diff --git a/tests/e2e/ctl_v3_test.go b/tests/e2e/ctl_v3_test.go index 978ea741c4a6..cdbf1688b38a 100644 --- a/tests/e2e/ctl_v3_test.go +++ b/tests/e2e/ctl_v3_test.go @@ -248,9 +248,8 @@ func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx cx.envMap = make(map[string]string) } if cx.epc != nil { - if errC := cx.epc.Close(); errC != nil { - t.Fatalf("error closing etcd processes (%v)", errC) - } + cx.epc.Stop() + cx.epc.Close() } }() @@ -270,6 +269,7 @@ func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx } t.Log("closing test cluster...") + assert.NoError(t, cx.epc.Stop()) assert.NoError(t, cx.epc.Close()) cx.epc = nil t.Log("closed test cluster...") diff --git a/tests/e2e/ctl_v3_txn_test.go b/tests/e2e/ctl_v3_txn_test.go index 3b9cc8216c3d..bc05cb055751 100644 --- a/tests/e2e/ctl_v3_txn_test.go +++ b/tests/e2e/ctl_v3_txn_test.go @@ -19,13 +19,13 @@ import ( ) type txnRequests struct { - compare []string - ifSucess []string - ifFail []string - results []string + compare []string + ifSuccess []string + ifFail []string + results []string } -func ctlV3Txn(cx ctlCtx, rqs txnRequests) error { +func ctlV3Txn(cx ctlCtx, rqs txnRequests, expectedExitErr bool) error { // TODO: support non-interactive mode cmdArgs := append(cx.PrefixArgs(), "txn") if cx.interactive { @@ -52,7 +52,7 @@ func ctlV3Txn(cx ctlCtx, rqs txnRequests) error { if err != nil { return err } - for _, req := range rqs.ifSucess { + for _, req := range rqs.ifSuccess { if err = proc.Send(req + "\r"); err != nil { return err } @@ -80,5 +80,11 @@ func ctlV3Txn(cx ctlCtx, rqs txnRequests) error { return err } } - return proc.Close() + + err = proc.Close() + if expectedExitErr { + return nil + } + + return err } diff --git a/tests/e2e/etcd_config_test.go b/tests/e2e/etcd_config_test.go index 822bc45bb118..ac0096cfd5ac 100644 --- a/tests/e2e/etcd_config_test.go +++ b/tests/e2e/etcd_config_test.go @@ -21,6 +21,7 @@ import ( "strings" "testing" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/pkg/v3/expect" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -57,6 +58,7 @@ func TestEtcdMultiPeer(t *testing.T) { for i := range procs { if procs[i] != nil { procs[i].Stop() + procs[i].Close() } } }() @@ -128,6 +130,7 @@ func TestEtcdPeerCNAuth(t *testing.T) { for i := range procs { if procs[i] != nil { procs[i].Stop() + procs[i].Close() } } }() @@ -206,6 +209,7 @@ func TestEtcdPeerNameAuth(t *testing.T) { for i := range procs { if procs[i] != nil { procs[i].Stop() + procs[i].Close() } os.RemoveAll(tmpdirs[i]) } @@ -287,9 +291,7 @@ func TestGrpcproxyAndCommonName(t *testing.T) { } err := e2e.SpawnWithExpect(argsWithNonEmptyCN, "cert has non empty Common Name") - if err != nil { - t.Errorf("Unexpected error: %s", err) - } + require.ErrorContains(t, err, "cert has non empty Common Name") p, err := e2e.SpawnCmd(argsWithEmptyCN, nil) defer func() { diff --git a/tests/e2e/gateway_test.go b/tests/e2e/gateway_test.go index 2e2cc360be74..f28ebb63570b 100644 --- a/tests/e2e/gateway_test.go +++ b/tests/e2e/gateway_test.go @@ -37,7 +37,10 @@ func TestGateway(t *testing.T) { eps := strings.Join(ec.EndpointsV3(), ",") p := startGateway(t, eps) - defer p.Stop() + defer func() { + p.Stop() + p.Close() + }() err = e2e.SpawnWithExpect([]string{e2e.BinPath.Etcdctl, "--endpoints=" + defaultGatewayEndpoint, "put", "foo", "bar"}, "OK\r\n") if err != nil { diff --git a/tests/e2e/no_quorum_ready_test.go b/tests/e2e/no_quorum_ready_test.go index c60b9d7e25b8..ff1b32abf353 100644 --- a/tests/e2e/no_quorum_ready_test.go +++ b/tests/e2e/no_quorum_ready_test.go @@ -28,6 +28,8 @@ func TestInitDaemonNotifyWithoutQuorum(t *testing.T) { t.Fatalf("Failed to initilize the etcd cluster: %v", err) } + defer epc.Close() + // Remove two members, so that only one etcd will get started epc.Procs = epc.Procs[:1] @@ -40,6 +42,4 @@ func TestInitDaemonNotifyWithoutQuorum(t *testing.T) { e2e.AssertProcessLogs(t, epc.Procs[0], "startEtcd: timed out waiting for the ready notification") // Expect log message indicating systemd notify message has been sent e2e.AssertProcessLogs(t, epc.Procs[0], "notifying init daemon") - - epc.Close() } diff --git a/tests/e2e/utl_migrate_test.go b/tests/e2e/utl_migrate_test.go index b8129e99b836..70b28ac91405 100644 --- a/tests/e2e/utl_migrate_test.go +++ b/tests/e2e/utl_migrate_test.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/server/v3/storage/backend" @@ -155,7 +156,11 @@ func TestEtctlutlMigrate(t *testing.T) { } err = e2e.SpawnWithExpect(args, tc.expectLogsSubString) if err != nil { - t.Fatal(err) + if tc.expectLogsSubString != "" { + require.ErrorContains(t, err, tc.expectLogsSubString) + } else { + t.Fatal(err) + } } t.Log("etcdutl migrate...") diff --git a/tests/e2e/v3_cipher_suite_test.go b/tests/e2e/v3_cipher_suite_test.go index 4b804c015b54..7c6b106b5153 100644 --- a/tests/e2e/v3_cipher_suite_test.go +++ b/tests/e2e/v3_cipher_suite_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -52,24 +53,16 @@ func cipherSuiteTestValid(cx ctlCtx) { MetricsURLScheme: cx.cfg.MetricsURLScheme, Ciphers: "ECDHE-RSA-AES128-GCM-SHA256", // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 }); err != nil { - cx.t.Fatalf("failed get with curl (%v)", err) + require.ErrorContains(cx.t, err, fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)) } } func cipherSuiteTestMismatch(cx ctlCtx) { - var err error - for _, exp := range []string{"alert handshake failure", "failed setting cipher list"} { - err = e2e.CURLGet(cx.epc, e2e.CURLReq{ - Endpoint: "/metrics", - Expected: exp, - MetricsURLScheme: cx.cfg.MetricsURLScheme, - Ciphers: "ECDHE-RSA-DES-CBC3-SHA", // TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA - }) - if err == nil { - break - } - } - if err != nil { - cx.t.Fatalf("failed get with curl (%v)", err) - } + err := e2e.CURLGet(cx.epc, e2e.CURLReq{ + Endpoint: "/metrics", + Expected: "failed setting cipher list", + MetricsURLScheme: cx.cfg.MetricsURLScheme, + Ciphers: "ECDHE-RSA-DES-CBC3-SHA", // TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA + }) + require.ErrorContains(cx.t, err, "curl: (59) failed setting cipher list") } diff --git a/tests/e2e/v3_curl_maxstream_test.go b/tests/e2e/v3_curl_maxstream_test.go index 651b98dc4ddf..ee535dcb79f7 100644 --- a/tests/e2e/v3_curl_maxstream_test.go +++ b/tests/e2e/v3_curl_maxstream_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/tests/v3/framework/e2e" @@ -212,7 +213,7 @@ func submitRangeAfterConcurrentWatch(cx ctlCtx, expectedValue string) { cx.t.Log("Submitting range request...") if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: "/v3/kv/range", Value: string(rangeData), Expected: expectedValue, Timeout: 5}); err != nil { - cx.t.Fatalf("testV3CurlMaxStream get failed, error: %v", err) + require.ErrorContains(cx.t, err, expectedValue) } cx.t.Log("range request done") } diff --git a/tests/e2e/v3_curl_test.go b/tests/e2e/v3_curl_test.go index 4777dc30a6c7..9733c66b6bb6 100644 --- a/tests/e2e/v3_curl_test.go +++ b/tests/e2e/v3_curl_test.go @@ -24,6 +24,7 @@ import ( "strconv" "testing" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/api/v3/authpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" @@ -141,9 +142,8 @@ func testV3CurlWatch(cx ctlCtx) { cx.t.Fatalf("failed testV3CurlWatch put with curl using prefix (%s) (%v)", p, err) } // expects "bar", timeout after 2 seconds since stream waits forever - if err = e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/watch"), Value: wstr, Expected: `"YmFy"`, Timeout: 2}); err != nil { - cx.t.Fatalf("failed testV3CurlWatch watch with curl using prefix (%s) (%v)", p, err) - } + err = e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/watch"), Value: wstr, Expected: `"YmFy"`, Timeout: 2}) + require.ErrorContains(cx.t, err, "unexpected exit code") } func testV3CurlTxn(cx ctlCtx) { diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index f72154a92300..2db5b2cd2a8a 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -16,6 +16,7 @@ package e2e import ( "context" + "errors" "fmt" "net/url" "path" @@ -29,7 +30,6 @@ import ( "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/etcdserverpb" - "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/tests/v3/framework/config" ) @@ -664,18 +664,23 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr return fmt.Errorf("failed to find member ID: %w", err) } + memberRemoved := false for i := 0; i < 10; i++ { - _, err = memberCtl.MemberRemove(ctx, memberID) - if err != nil && strings.Contains(err.Error(), rpctypes.ErrGRPCUnhealthy.Error()) { - time.Sleep(500 * time.Millisecond) - continue + _, err := memberCtl.MemberRemove(ctx, memberID) + if err != nil && strings.Contains(err.Error(), "member not found") { + memberRemoved = true + break } - break + + time.Sleep(500 * time.Millisecond) } - if err != nil { - return fmt.Errorf("failed to remove member: %w", err) + + if !memberRemoved { + return errors.New("failed to remove member after 10 tries") } + epc.lg.Info("successfully removed member", zap.String("acurl", proc.Config().Acurl)) + // Then stop process return proc.Close() } diff --git a/tests/framework/e2e/curl.go b/tests/framework/e2e/curl.go index a3b11de8591b..7d81a92f3e06 100644 --- a/tests/framework/e2e/curl.go +++ b/tests/framework/e2e/curl.go @@ -15,9 +15,11 @@ package e2e import ( + "context" "fmt" "math/rand" "strings" + "time" ) type CURLReq struct { @@ -38,6 +40,15 @@ type CURLReq struct { Ciphers string } +func (r CURLReq) timeoutDuration() time.Duration { + if r.Timeout != 0 { + return time.Duration(r.Timeout) * time.Second + } + + // assume a sane default to finish a curl request + return 5 * time.Second +} + // CURLPrefixArgs builds the beginning of a curl command for a given key // addressed to a random URL in the given cluster. func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method string, req CURLReq) []string { @@ -94,13 +105,20 @@ func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method st } func CURLPost(clus *EtcdProcessCluster, req CURLReq) error { - return SpawnWithExpect(CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "POST", req), req.Expected) + ctx, cancel := context.WithTimeout(context.Background(), req.timeoutDuration()) + defer cancel() + return SpawnWithExpectsContext(ctx, CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "POST", req), nil, req.Expected) } func CURLPut(clus *EtcdProcessCluster, req CURLReq) error { - return SpawnWithExpect(CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "PUT", req), req.Expected) + ctx, cancel := context.WithTimeout(context.Background(), req.timeoutDuration()) + defer cancel() + return SpawnWithExpectsContext(ctx, CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "PUT", req), nil, req.Expected) } func CURLGet(clus *EtcdProcessCluster, req CURLReq) error { - return SpawnWithExpect(CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "GET", req), req.Expected) + ctx, cancel := context.WithTimeout(context.Background(), req.timeoutDuration()) + defer cancel() + + return SpawnWithExpectsContext(ctx, CURLPrefixArgs(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "GET", req), nil, req.Expected) } diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index d55fcccefec1..90fce32953aa 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -19,6 +19,7 @@ import ( "fmt" "net/url" "os" + "strings" "syscall" "testing" "time" @@ -134,11 +135,18 @@ func (ep *EtcdServerProcess) Stop() (err error) { if ep == nil || ep.proc == nil { return nil } + defer func() { + ep.proc = nil + }() + err = ep.proc.Stop() - ep.proc = nil if err != nil { return err } + err = ep.proc.Close() + if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { + return err + } <-ep.donec ep.donec = make(chan struct{}) if ep.cfg.Purl.Scheme == "unix" || ep.cfg.Purl.Scheme == "unixs" { @@ -183,11 +191,7 @@ func (ep *EtcdServerProcess) Kill() error { } func (ep *EtcdServerProcess) Wait() error { - err := ep.proc.Wait() - if err != nil { - ep.cfg.lg.Error("failed to wait for server exit", zap.String("name", ep.cfg.Name)) - return err - } + ep.proc.Wait() ep.cfg.lg.Info("server exited", zap.String("name", ep.cfg.Name)) ep.proc = nil return nil diff --git a/tests/framework/e2e/util.go b/tests/framework/e2e/util.go index d449b043ed92..46ac286f0dfa 100644 --- a/tests/framework/e2e/util.go +++ b/tests/framework/e2e/util.go @@ -50,7 +50,11 @@ func SpawnWithExpectWithEnv(args []string, envVars map[string]string, expected s } func SpawnWithExpects(args []string, envVars map[string]string, xs ...string) error { - _, err := SpawnWithExpectLines(context.TODO(), args, envVars, xs...) + return SpawnWithExpectsContext(context.TODO(), args, envVars, xs...) +} + +func SpawnWithExpectsContext(ctx context.Context, args []string, envVars map[string]string, xs ...string) error { + _, err := SpawnWithExpectLines(ctx, args, envVars, xs...) return err } @@ -74,26 +78,29 @@ func SpawnWithExpectLines(ctx context.Context, args []string, envVars map[string lines = append(lines, l) } perr := proc.Close() + if perr != nil { + return lines, fmt.Errorf("err: %w, with output lines %v", perr, proc.Lines()) + } + l := proc.LineCount() if len(xs) == 0 && l != noOutputLineCount { // expect no output return nil, fmt.Errorf("unexpected output from %v (got lines %q, line count %d) %v. Try EXPECT_DEBUG=TRUE", args, lines, l, l != noOutputLineCount) } - return lines, perr + return lines, nil } func RunUtilCompletion(args []string, envVars map[string]string) ([]string, error) { proc, err := SpawnCmd(args, envVars) if err != nil { - return nil, fmt.Errorf("failed to spawn command: %w", err) + return nil, fmt.Errorf("failed to spawn command %v with error: %w", args, err) } - defer proc.Stop() - perr := proc.Wait() - // make sure that all the outputs are received - proc.Close() - if perr != nil { - return nil, fmt.Errorf("unexpected error from command %v: %w", args, perr) + proc.Wait() + err = proc.Close() + if err != nil { + return nil, fmt.Errorf("failed to close command %v with error: %w", args, err) } + return proc.Lines(), nil } diff --git a/tests/linearizability/failpoints.go b/tests/linearizability/failpoints.go index 17384619c18f..06477aef4945 100644 --- a/tests/linearizability/failpoints.go +++ b/tests/linearizability/failpoints.go @@ -69,7 +69,7 @@ func (f killFailpoint) Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster return err } err = member.Wait() - if err != nil { + if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { return err } err = member.Start(ctx) @@ -103,7 +103,7 @@ func (f goFailpoint) Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) } } err = member.Wait() - if err != nil { + if err != nil && !strings.Contains(err.Error(), "unexpected exit code") { return err } err = member.Start(ctx) From ff6c93f6302071b8e635c46e381858b6c603ca90 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 30 Oct 2022 04:18:05 -0500 Subject: [PATCH 16/17] tests: Add revision to etcd linearizability model Signed-off-by: Marek Siarkowicz --- tests/linearizability/client.go | 10 ++- tests/linearizability/model.go | 78 +++++++++++++------ tests/linearizability/model_test.go | 116 +++++++++++++++++++--------- 3 files changed, 142 insertions(+), 62 deletions(-) diff --git a/tests/linearizability/client.go b/tests/linearizability/client.go index 1b3f9f72ef48..5addf729769b 100644 --- a/tests/linearizability/client.go +++ b/tests/linearizability/client.go @@ -66,7 +66,7 @@ func (c *recordingClient) Get(ctx context.Context, key string) error { ClientId: c.id, Input: etcdRequest{op: Get, key: key}, Call: callTime.UnixNano(), - Output: etcdResponse{getData: readData}, + Output: etcdResponse{getData: readData, revision: resp.Header.Revision}, Return: returnTime.UnixNano(), }) return nil @@ -74,13 +74,17 @@ func (c *recordingClient) Get(ctx context.Context, key string) error { func (c *recordingClient) Put(ctx context.Context, key, value string) error { callTime := time.Now() - _, err := c.client.Put(ctx, key, value) + resp, err := c.client.Put(ctx, key, value) returnTime := time.Now() + var revision int64 + if resp != nil && resp.Header != nil { + revision = resp.Header.Revision + } c.operations = append(c.operations, porcupine.Operation{ ClientId: c.id, Input: etcdRequest{op: Put, key: key, putData: value}, Call: callTime.UnixNano(), - Output: etcdResponse{err: err}, + Output: etcdResponse{err: err, revision: revision}, Return: returnTime.UnixNano(), }) return nil diff --git a/tests/linearizability/model.go b/tests/linearizability/model.go index eebb9c05183c..5a9000f69623 100644 --- a/tests/linearizability/model.go +++ b/tests/linearizability/model.go @@ -33,13 +33,15 @@ type etcdRequest struct { } type etcdResponse struct { - getData string - err error + getData string + revision int64 + err error } type EtcdState struct { Key string Value string + LastRevision int64 FailedWrites map[string]struct{} } @@ -51,9 +53,6 @@ var etcdModel = porcupine.Model{ if err != nil { panic(err) } - if state.FailedWrites == nil { - state.FailedWrites = map[string]struct{}{} - } ok, state := step(state, in.(etcdRequest), out.(etcdResponse)) data, err := json.Marshal(state) if err != nil { @@ -64,22 +63,19 @@ var etcdModel = porcupine.Model{ DescribeOperation: func(in, out interface{}) string { request := in.(etcdRequest) response := out.(etcdResponse) - var resp string switch request.op { case Get: if response.err != nil { - resp = response.err.Error() + return fmt.Sprintf("get(%q) -> %q", request.key, response.err) } else { - resp = response.getData + return fmt.Sprintf("get(%q) -> %q, rev: %d", request.key, response.getData, response.revision) } - return fmt.Sprintf("get(%q) -> %q", request.key, resp) case Put: if response.err != nil { - resp = response.err.Error() + return fmt.Sprintf("put(%q, %q) -> %s", request.key, request.putData, response.err) } else { - resp = "ok" + return fmt.Sprintf("put(%q, %q) -> ok, rev: %d", request.key, request.putData, response.revision) } - return fmt.Sprintf("put(%q, %q) -> %s", request.key, request.putData, resp) default: return "" } @@ -88,32 +84,68 @@ var etcdModel = porcupine.Model{ func step(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { if request.key == "" { - panic("Invalid request") + panic("invalid request") } if state.Key == "" { - state.Key = request.key + return true, initState(request, response) } if state.Key != request.key { panic("Multiple keys not supported") } switch request.op { case Get: - if state.Value == response.getData { - return true, state - } - _, ok := state.FailedWrites[response.getData] - if ok { - state.Value = response.getData - delete(state.FailedWrites, response.getData) - return true, state - } + return stepGet(state, request, response) + case Put: + return stepPut(state, request, response) + default: + panic("Unknown operation") + } +} + +func initState(request etcdRequest, response etcdResponse) EtcdState { + state := EtcdState{ + Key: request.key, + LastRevision: response.revision, + FailedWrites: map[string]struct{}{}, + } + switch request.op { + case Get: + state.Value = response.getData case Put: if response.err == nil { state.Value = request.putData } else { state.FailedWrites[request.putData] = struct{}{} } + default: + panic("Unknown operation") + } + return state +} + +func stepGet(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { + if state.Value == response.getData && state.LastRevision <= response.revision { + return true, state + } + _, ok := state.FailedWrites[response.getData] + if ok && state.LastRevision < response.revision { + state.Value = response.getData + state.LastRevision = response.revision + delete(state.FailedWrites, response.getData) return true, state } return false, state } + +func stepPut(state EtcdState, request etcdRequest, response etcdResponse) (bool, EtcdState) { + if response.err != nil { + state.FailedWrites[request.putData] = struct{}{} + return true, state + } + if state.LastRevision >= response.revision { + return false, state + } + state.Value = request.putData + state.LastRevision = response.revision + return true, state +} diff --git a/tests/linearizability/model_test.go b/tests/linearizability/model_test.go index 1e29b070cb3c..61453310a983 100644 --- a/tests/linearizability/model_test.go +++ b/tests/linearizability/model_test.go @@ -16,68 +16,112 @@ package linearizability import ( "errors" - "github.com/anishathalye/porcupine" "testing" ) func TestModel(t *testing.T) { tcs := []struct { - name string - okOperations []porcupine.Operation - failOperation *porcupine.Operation + name string + operations []testOperation }{ { - name: "Etcd must return what was written", - okOperations: []porcupine.Operation{ - {Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{}}, - {Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}}, + name: "First Get can start from non-empty value and non-zero revision", + operations: []testOperation{ + {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 42}}, }, - failOperation: &porcupine.Operation{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "2"}}, }, { - name: "Etcd can crash after storing result but before returning success to client", - okOperations: []porcupine.Operation{ - {Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}}, - {Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}}, + name: "First Put can start from non-zero revision", + operations: []testOperation{ + {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 42}}, }, }, { - name: "Etcd can crash before storing result", - okOperations: []porcupine.Operation{ - {Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}}, - {Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}}, + name: "Get response data should match PUT", + operations: []testOperation{ + {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}}, + {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 1}, failure: true}, + {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "1", revision: 1}}, }, }, { - name: "Etcd can continue errored request after it failed", - okOperations: []porcupine.Operation{ - {Input: etcdRequest{op: Put, key: "key", putData: "1"}, Output: etcdResponse{err: errors.New("failed")}}, - {Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}}, - {Input: etcdRequest{op: Put, key: "key"}, Output: etcdResponse{getData: "2"}}, - {Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: "1"}}, + name: "Get response revision should be equal or greater then put", + operations: []testOperation{ + {req: etcdRequest{op: Put, key: "key"}, resp: etcdResponse{revision: 2}}, + {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{revision: 1}, failure: true}, + {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{revision: 2}}, + {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{revision: 4}}, + }, + }, + { + name: "Put bumps revision", + operations: []testOperation{ + {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}}, + {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 1}, failure: true}, + {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 2}}, + }, + }, + { + name: "Put can fail and be lost", + operations: []testOperation{ + {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}}, + {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{err: errors.New("failed")}}, + {req: etcdRequest{op: Put, key: "key", putData: "3"}, resp: etcdResponse{revision: 2}}, + }, + }, + { + name: "Put can fail but bump revision", + operations: []testOperation{ + {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}}, + {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{err: errors.New("failed")}}, + {req: etcdRequest{op: Put, key: "key", putData: "3"}, resp: etcdResponse{revision: 3}}, + }, + }, + { + name: "Put can fail but be persisted and bump revision", + operations: []testOperation{ + {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{revision: 1}}, + {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{err: errors.New("failed")}}, + {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 1}, failure: true}, + {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 2}}, + }, + }, + { + name: "Put can fail but be persisted later", + operations: []testOperation{ + {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{err: errors.New("failed")}}, + {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 2}}, + {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 2}}, + {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "1", revision: 3}}, + }, + }, + { + name: "Put can fail but bump revision later", + operations: []testOperation{ + {req: etcdRequest{op: Put, key: "key", putData: "1"}, resp: etcdResponse{err: errors.New("failed")}}, + {req: etcdRequest{op: Put, key: "key", putData: "2"}, resp: etcdResponse{revision: 2}}, + {req: etcdRequest{op: Get, key: "key"}, resp: etcdResponse{getData: "2", revision: 2}}, + {req: etcdRequest{op: Put, key: "key", putData: "3"}, resp: etcdResponse{revision: 4}}, }, - failOperation: &porcupine.Operation{Input: etcdRequest{op: Get, key: "key"}, Output: etcdResponse{getData: ""}}, }, } for _, tc := range tcs { var ok bool t.Run(tc.name, func(t *testing.T) { state := etcdModel.Init() - for _, op := range tc.okOperations { + for _, op := range tc.operations { t.Logf("state: %v", state) - ok, state = etcdModel.Step(state, op.Input, op.Output) - if !ok { - t.Errorf("Unexpected failed operation: %s", etcdModel.DescribeOperation(op.Input, op.Output)) + ok, state = etcdModel.Step(state, op.req, op.resp) + if ok != !op.failure { + t.Errorf("Unexpected operation result, expect: %v, got: %v, operation: %s", !op.failure, ok, etcdModel.DescribeOperation(op.req, op.resp)) } } - if tc.failOperation != nil { - t.Logf("state: %v", state) - ok, state = etcdModel.Step(state, tc.failOperation.Input, tc.failOperation.Output) - if ok { - t.Errorf("Unexpected succesfull operation: %s", etcdModel.DescribeOperation(tc.failOperation.Input, tc.failOperation.Output)) - } - - } }) } } + +type testOperation struct { + req etcdRequest + resp etcdResponse + failure bool +} From 855aa4f7a75b889938aed571ed5331d1b1ec5a3f Mon Sep 17 00:00:00 2001 From: Sasha Melentyev Date: Mon, 14 Nov 2022 23:18:53 +0300 Subject: [PATCH 17/17] all: Use ReplaceAll instead of Replace with -1 pos Signed-off-by: Sasha Melentyev --- contrib/lock/storage/storage.go | 4 ++-- pkg/cobrautl/help.go | 2 +- pkg/flags/flag.go | 2 +- tests/e2e/ctl_v3_grpc_test.go | 4 ++-- tests/functional/runner/help.go | 2 +- tests/integration/grpc_test.go | 8 ++++---- tests/linearizability/linearizability_test.go | 2 +- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/contrib/lock/storage/storage.go b/contrib/lock/storage/storage.go index e7d8c694cf9d..7e39e38f62d9 100644 --- a/contrib/lock/storage/storage.go +++ b/contrib/lock/storage/storage.go @@ -96,8 +96,8 @@ func handler(w http.ResponseWriter, r *http.Request) { } func escape(s string) string { - escaped := strings.Replace(s, "\n", " ", -1) - escaped = strings.Replace(escaped, "\r", " ", -1) + escaped := strings.ReplaceAll(s, "\n", " ") + escaped = strings.ReplaceAll(escaped, "\r", " ") return escaped } diff --git a/pkg/cobrautl/help.go b/pkg/cobrautl/help.go index 44cdc9aa8867..2f7e003dfa37 100644 --- a/pkg/cobrautl/help.go +++ b/pkg/cobrautl/help.go @@ -99,7 +99,7 @@ GLOBAL OPTIONS: {{end}} `[1:] - commandUsageTemplate = template.Must(template.New("command_usage").Funcs(templFuncs).Parse(strings.Replace(commandUsage, "\\\n", "", -1))) + commandUsageTemplate = template.Must(template.New("command_usage").Funcs(templFuncs).Parse(strings.ReplaceAll(commandUsage, "\\\n", ""))) } func etcdFlagUsages(flagSet *pflag.FlagSet) string { diff --git a/pkg/flags/flag.go b/pkg/flags/flag.go index 76a51a890194..5e60b72adc98 100644 --- a/pkg/flags/flag.go +++ b/pkg/flags/flag.go @@ -66,7 +66,7 @@ func SetPflagsFromEnv(lg *zap.Logger, prefix string, fs *pflag.FlagSet) error { // FlagToEnv converts flag string to upper-case environment variable key string. func FlagToEnv(prefix, name string) string { - return prefix + "_" + strings.ToUpper(strings.Replace(name, "-", "_", -1)) + return prefix + "_" + strings.ToUpper(strings.ReplaceAll(name, "-", "_")) } func verifyEnv(lg *zap.Logger, prefix string, usedEnvKey, alreadySet map[string]bool) { diff --git a/tests/e2e/ctl_v3_grpc_test.go b/tests/e2e/ctl_v3_grpc_test.go index 94b7b3a577b5..3488c191256b 100644 --- a/tests/e2e/ctl_v3_grpc_test.go +++ b/tests/e2e/ctl_v3_grpc_test.go @@ -106,7 +106,7 @@ func TestAuthority(t *testing.T) { } testutils.ExecuteWithTimeout(t, 5*time.Second, func() { - assertAuthority(t, strings.Replace(tc.expectAuthorityPattern, "${MEMBER_PORT}", "20000", -1), epc) + assertAuthority(t, strings.ReplaceAll(tc.expectAuthorityPattern, "${MEMBER_PORT}", "20000"), epc) }) }) @@ -119,7 +119,7 @@ func templateEndpoints(t *testing.T, pattern string, clus *e2e.EtcdProcessCluste var endpoints []string for i := 0; i < clus.Cfg.ClusterSize; i++ { ent := pattern - ent = strings.Replace(ent, "${MEMBER_PORT}", fmt.Sprintf("%d", e2e.EtcdProcessBasePort+i*5), -1) + ent = strings.ReplaceAll(ent, "${MEMBER_PORT}", fmt.Sprintf("%d", e2e.EtcdProcessBasePort+i*5)) endpoints = append(endpoints, ent) } return endpoints diff --git a/tests/functional/runner/help.go b/tests/functional/runner/help.go index 9bf9560a74d6..eb64b533f52b 100644 --- a/tests/functional/runner/help.go +++ b/tests/functional/runner/help.go @@ -101,7 +101,7 @@ GLOBAL OPTIONS: {{end}} `[1:] - commandUsageTemplate = template.Must(template.New("command_usage").Funcs(templFuncs).Parse(strings.Replace(commandUsage, "\\\n", "", -1))) + commandUsageTemplate = template.Must(template.New("command_usage").Funcs(templFuncs).Parse(strings.ReplaceAll(commandUsage, "\\\n", ""))) } func etcdFlagUsages(flagSet *pflag.FlagSet) string { diff --git a/tests/integration/grpc_test.go b/tests/integration/grpc_test.go index 2fa1afeb9fdb..47519bfb9ef4 100644 --- a/tests/integration/grpc_test.go +++ b/tests/integration/grpc_test.go @@ -146,8 +146,8 @@ func templateEndpoints(t *testing.T, pattern string, clus *integration.Cluster) var endpoints []string for _, m := range clus.Members { ent := pattern - ent = strings.Replace(ent, "${MEMBER_PORT}", m.GrpcPortNumber(), -1) - ent = strings.Replace(ent, "${MEMBER_NAME}", m.Name, -1) + ent = strings.ReplaceAll(ent, "${MEMBER_PORT}", m.GrpcPortNumber()) + ent = strings.ReplaceAll(ent, "${MEMBER_NAME}", m.Name) endpoints = append(endpoints, ent) } return endpoints @@ -156,8 +156,8 @@ func templateEndpoints(t *testing.T, pattern string, clus *integration.Cluster) func templateAuthority(t *testing.T, pattern string, m *integration.Member) string { t.Helper() authority := pattern - authority = strings.Replace(authority, "${MEMBER_PORT}", m.GrpcPortNumber(), -1) - authority = strings.Replace(authority, "${MEMBER_NAME}", m.Name, -1) + authority = strings.ReplaceAll(authority, "${MEMBER_PORT}", m.GrpcPortNumber()) + authority = strings.ReplaceAll(authority, "${MEMBER_NAME}", m.Name) return authority } diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index f3aa926ef004..9e73022587fd 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -219,7 +219,7 @@ func persistMemberDataDir(t *testing.T, clus *e2e.EtcdProcessCluster, path strin } func testResultsDirectory(t *testing.T) (string, error) { - path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.Replace(t.Name(), "/", "_", -1))) + path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.ReplaceAll(t.Name(), "/", "_"))) if err != nil { return path, err }