Skip to content

Commit

Permalink
Merge pull request #18202 from petermattis/pmattis/quiesce-leader-tra…
Browse files Browse the repository at this point in the history
…nsfer

storage: prevent quiescence if Raft leadership transfer is in progress
  • Loading branch information
petermattis authored Sep 5, 2017
2 parents 5435227 + b4949cc commit a1e3a7f
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 25 deletions.
110 changes: 85 additions & 25 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3650,36 +3650,85 @@ func (r *Replica) maybeTickQuiesced() bool {
// not be sensitive enough.
func (r *Replica) maybeQuiesceLocked() bool {
ctx := r.AnnotateCtx(context.TODO())
if len(r.mu.proposals) != 0 {
status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.proposals))
if !ok {
return false
}
return r.quiesceAndNotifyLocked(ctx, status)
}

type quiescer interface {
raftStatusRLocked() *raft.Status
raftLastIndexLocked() (uint64, error)
hasRaftReadyRLocked() bool
ownsValidLeaseRLocked(ts hlc.Timestamp) bool
maybeTransferRaftLeader(ctx context.Context, status *raft.Status, ts hlc.Timestamp)
}

func (r *Replica) hasRaftReadyRLocked() bool {
return r.mu.internalRaftGroup.HasReady()
}

func (r *Replica) maybeTransferRaftLeader(
ctx context.Context, status *raft.Status, now hlc.Timestamp,
) {
l := *r.mu.state.Lease
if !r.isLeaseValidRLocked(l, now) {
return
}
if pr, ok := status.Progress[uint64(l.Replica.ReplicaID)]; ok && pr.Match >= status.Commit {
log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", l.Replica.ReplicaID)
r.store.metrics.RangeRaftLeaderTransfers.Inc(1)
r.mu.internalRaftGroup.TransferLeader(uint64(l.Replica.ReplicaID))
}
}

// shouldReplicaQuiesce determines if a replica should be quiesced. All of the
// access to Replica internals are gated by the quiescer interface to
// facilitate testing. Returns the raft.Status and true on success, and (nil,
// false) on failure.
func shouldReplicaQuiesce(
ctx context.Context, q quiescer, now hlc.Timestamp, numProposals int,
) (*raft.Status, bool) {
if numProposals != 0 {
if log.V(4) {
log.Infof(ctx, "not quiescing: %d pending commands", len(r.mu.proposals))
log.Infof(ctx, "not quiescing: %d pending commands", numProposals)
}
return false
return nil, false
}
status := q.raftStatusRLocked()
if status == nil {
if log.V(4) {
log.Infof(ctx, "not quiescing: dormant Raft group")
}
return nil, false
}
status := r.mu.internalRaftGroup.Status()
if status.SoftState.RaftState != raft.StateLeader {
if log.V(4) {
log.Infof(ctx, "not quiescing: not leader")
}
return false
return nil, false
}
if status.LeadTransferee != 0 {
if log.V(4) {
log.Infof(ctx, "not quiescing: leader transfer to %d in progress", status.LeadTransferee)
}
return nil, false
}
// Only quiesce if this replica is the leaseholder as well;
// otherwise the replica which is the valid leaseholder may have
// pending commands which it's waiting on this leader to propose.
if now := r.store.Clock().Now(); !r.ownsValidLeaseRLocked(now) {
if !q.ownsValidLeaseRLocked(now) {
if log.V(4) {
log.Infof(ctx, "not quiescing: not leaseholder")
}
// Try to correct leader-not-leaseholder condition, if encountered,
// assuming the leaseholder is caught up to the commit index.
if l := *r.mu.state.Lease; r.isLeaseValidRLocked(l, now) {
if pr, ok := status.Progress[uint64(l.Replica.ReplicaID)]; ok && pr.Match >= status.Commit {
log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", l.Replica.ReplicaID)
r.store.metrics.RangeRaftLeaderTransfers.Inc(1)
r.mu.internalRaftGroup.TransferLeader(uint64(l.Replica.ReplicaID))
}
}
return false
//
// TODO(peter): It is surprising that a method named shouldReplicaQuiesce
// might initiate transfer of the Raft leadership.
q.maybeTransferRaftLeader(ctx, status, now)
return nil, false
}
// We need all of Applied, Commit, LastIndex and Progress.Match indexes to be
// equal in order to quiesce.
Expand All @@ -3688,41 +3737,52 @@ func (r *Replica) maybeQuiesceLocked() bool {
log.Infof(ctx, "not quiescing: applied (%d) != commit (%d)",
status.Applied, status.Commit)
}
return false
return nil, false
}
lastIndex, err := q.raftLastIndexLocked()
if err != nil {
if log.V(4) {
log.Infof(ctx, "not quiescing: %v", err)
}
return nil, false
}
if status.Commit != r.mu.lastIndex {
if status.Commit != lastIndex {
if log.V(4) {
log.Infof(ctx, "not quiescing: commit (%d) != last-index (%d)",
status.Commit, r.mu.lastIndex)
log.Infof(ctx, "not quiescing: commit (%d) != lastIndex (%d)",
status.Commit, lastIndex)
}
return false
return nil, false
}
var foundSelf bool
for id, progress := range status.Progress {
if roachpb.ReplicaID(id) == r.mu.replicaID {
if id == status.ID {
foundSelf = true
}
if progress.Match != status.Applied {
if log.V(4) {
log.Infof(ctx, "not quiescing: replica %d match (%d) != applied (%d)",
id, progress.Match, status.Applied)
}
return false
return nil, false
}
}
if !foundSelf {
if log.V(4) {
log.Infof(ctx, "not quiescing: %d not found in progress: %+v",
r.mu.replicaID, status.Progress)
status.ID, status.Progress)
}
return false
return nil, false
}
if r.mu.internalRaftGroup.HasReady() {
if q.hasRaftReadyRLocked() {
if log.V(4) {
log.Infof(ctx, "not quiescing: raft ready")
}
return false
return nil, false
}
return status, true
}

func (r *Replica) quiesceAndNotifyLocked(ctx context.Context, status *raft.Status) bool {
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.mu.replicaID, r.mu.lastToReplica)
if fromErr != nil {
if log.V(4) {
Expand Down
126 changes: 126 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/gogo/protobuf/proto"
"github.com/kr/pretty"
"github.com/pkg/errors"
Expand Down Expand Up @@ -8668,3 +8669,128 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
t.Fatal("expected intents to have been cleared")
}
}

type testQuiescer struct {
numProposals int
status *raft.Status
lastIndex uint64
raftReady bool
ownsValidLease bool
}

func (q *testQuiescer) raftStatusRLocked() *raft.Status {
return q.status
}

func (q *testQuiescer) raftLastIndexLocked() (uint64, error) {
return q.lastIndex, nil
}

func (q *testQuiescer) hasRaftReadyRLocked() bool {
return q.raftReady
}

func (q *testQuiescer) ownsValidLeaseRLocked(ts hlc.Timestamp) bool {
return q.ownsValidLease
}

func (q *testQuiescer) maybeTransferRaftLeader(
ctx context.Context, status *raft.Status, ts hlc.Timestamp,
) {
// Nothing to do here. We test Raft leadership transfer in
// TestTransferRaftLeadership.
}

func TestShouldReplicaQuiesce(t *testing.T) {
defer leaktest.AfterTest(t)()

const logIndex = 10
const invalidIndex = 11
test := func(expected bool, transform func(q *testQuiescer) *testQuiescer) {
t.Run("", func(t *testing.T) {
// A testQuiescer initialized so that shouldReplicaQuiesce will return
// true. The transform function is intended to perform one mutation to
// this quiescer so that shouldReplicaQuiesce will return false.
q := &testQuiescer{
status: &raft.Status{
ID: 1,
HardState: raftpb.HardState{
Commit: logIndex,
},
SoftState: raft.SoftState{
RaftState: raft.StateLeader,
},
Applied: logIndex,
Progress: map[uint64]raft.Progress{
1: {Match: logIndex},
2: {Match: logIndex},
3: {Match: logIndex},
},
LeadTransferee: 0,
},
lastIndex: logIndex,
raftReady: false,
ownsValidLease: true,
}
q = transform(q)
_, ok := shouldReplicaQuiesce(context.Background(), q, hlc.Timestamp{}, q.numProposals)
if expected != ok {
t.Fatalf("expected %v, but found %v", expected, ok)
}
})
}

test(true, func(q *testQuiescer) *testQuiescer {
return q
})
test(false, func(q *testQuiescer) *testQuiescer {
q.numProposals = 1
return q
})
test(false, func(q *testQuiescer) *testQuiescer {
q.status = nil
return q
})
test(false, func(q *testQuiescer) *testQuiescer {
q.status.RaftState = raft.StateFollower
return q
})
test(false, func(q *testQuiescer) *testQuiescer {
q.status.RaftState = raft.StateCandidate
return q
})
test(false, func(q *testQuiescer) *testQuiescer {
q.status.LeadTransferee = 1
return q
})
test(false, func(q *testQuiescer) *testQuiescer {
q.status.Commit = invalidIndex
return q
})
test(false, func(q *testQuiescer) *testQuiescer {
q.status.Applied = invalidIndex
return q
})
test(false, func(q *testQuiescer) *testQuiescer {
q.lastIndex = invalidIndex
return q
})
for _, i := range []uint64{1, 2, 3} {
test(false, func(q *testQuiescer) *testQuiescer {
q.status.Progress[i] = raft.Progress{Match: invalidIndex}
return q
})
}
test(false, func(q *testQuiescer) *testQuiescer {
delete(q.status.Progress, q.status.ID)
return q
})
test(false, func(q *testQuiescer) *testQuiescer {
q.ownsValidLease = false
return q
})
test(false, func(q *testQuiescer) *testQuiescer {
q.raftReady = true
return q
})
}

0 comments on commit a1e3a7f

Please sign in to comment.