Skip to content

Commit

Permalink
storage: clean up colocation of leaseholder and raft leader
Browse files Browse the repository at this point in the history
When the Raft leader is not colocated with the leaseholder, we transfer
the Raft leadership to the leaseholder, as long as the leaseholder has
not fallen behind on applying log entries.

There are two important places where we want to perform this check:

  1. when we apply a new lease, in case the lease has changed hands; and
  2. on every tick, to catch cases where were unable to transfer Raft leadership
     immediately because the leaseholder had fallen behind.

Instead of duplicating this logic in both places, share a helper
function. Also hoist the on-tick check out of shouldReplicaQuiesce.
Though that function is called on every tick, it was an extremely
non-obvious place to be checking for leadership/leaseholder colocation.

Release note: None
  • Loading branch information
benesch committed Oct 28, 2018
1 parent e9e3ff2 commit c23d984
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 74 deletions.
54 changes: 34 additions & 20 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4529,6 +4529,8 @@ func fatalOnRaftReadyErr(ctx context.Context, expl string, err error) {
// tick the Raft group, returning true if the raft group exists and is
// unquiesced; false otherwise.
func (r *Replica) tick(livenessMap IsLiveMap) (bool, error) {
ctx := r.AnnotateCtx(context.TODO())

r.unreachablesMu.Lock()
remotes := r.unreachablesMu.remotes
r.unreachablesMu.remotes = nil
Expand All @@ -4551,10 +4553,12 @@ func (r *Replica) tick(livenessMap IsLiveMap) (bool, error) {
if r.mu.quiescent {
return false, nil
}
if r.maybeQuiesceLocked(livenessMap) {
if r.maybeQuiesceLocked(ctx, livenessMap) {
return false, nil
}

r.maybeTransferRaftLeadershipLocked(ctx)

r.mu.ticks++
r.mu.internalRaftGroup.Tick()

Expand Down Expand Up @@ -4631,8 +4635,7 @@ func (r *Replica) tick(livenessMap IsLiveMap) (bool, error) {
// would quiesce. The fallout from this situation are undesirable raft
// elections which will cause throughput hiccups to the range, but not
// correctness issues.
func (r *Replica) maybeQuiesceLocked(livenessMap IsLiveMap) bool {
ctx := r.AnnotateCtx(context.TODO())
func (r *Replica) maybeQuiesceLocked(ctx context.Context, livenessMap IsLiveMap) bool {
status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.proposals), livenessMap)
if !ok {
return false
Expand All @@ -4646,28 +4649,45 @@ type quiescer interface {
raftLastIndexLocked() (uint64, error)
hasRaftReadyRLocked() bool
ownsValidLeaseRLocked(ts hlc.Timestamp) bool
maybeTransferRaftLeader(ctx context.Context, status *raft.Status, ts hlc.Timestamp)
}

func (r *Replica) hasRaftReadyRLocked() bool {
return r.mu.internalRaftGroup.HasReady()
}

// TODO(tschottdorf): there's also maybeTransferRaftLeadership. Only one should exist.
func (r *Replica) maybeTransferRaftLeader(
ctx context.Context, status *raft.Status, now hlc.Timestamp,
) {
l := *r.mu.state.Lease
if !r.isLeaseValidRLocked(l, now) {
func (r *Replica) maybeTransferRaftLeadership(ctx context.Context) {
r.mu.Lock()
r.maybeTransferRaftLeadershipLocked(ctx)
r.mu.Unlock()
}

// maybeTransferRaftLeadershipLocked attempts to transfer the leadership away
// from this node to the leaseholder, if this node is the current raft leader
// but not the leaseholder. We don't attempt to transfer leadership if the
// leaseholder is behind on applying the log.
//
// We like it when leases and raft leadership are collocated because that
// facilitates quick command application (requests generally need to make it to
// both the lease holder and the raft leader before being applied by other
// replicas).
func (r *Replica) maybeTransferRaftLeadershipLocked(ctx context.Context) {
if r.store.TestingKnobs().DisableLeaderFollowsLeaseholder {
return
}
if r.store.TestingKnobs().DisableLeaderFollowsLeaseholder {
lease := *r.mu.state.Lease
if lease.OwnedBy(r.StoreID()) || !r.isLeaseValidRLocked(lease, r.Clock().Now()) {
return
}
if pr, ok := status.Progress[uint64(l.Replica.ReplicaID)]; ok && pr.Match >= status.Commit {
log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", l.Replica.ReplicaID)
raftStatus := r.raftStatusRLocked()
if raftStatus == nil || raftStatus.RaftState != raft.StateLeader {
return
}
lhReplicaID := uint64(lease.Replica.ReplicaID)
lhProgress, ok := raftStatus.Progress[lhReplicaID]
if (ok && lhProgress.Match >= raftStatus.Commit) || r.mu.draining {
log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", lhReplicaID)
r.store.metrics.RangeRaftLeaderTransfers.Inc(1)
r.mu.internalRaftGroup.TransferLeader(uint64(l.Replica.ReplicaID))
r.mu.internalRaftGroup.TransferLeader(lhReplicaID)
}
}

Expand Down Expand Up @@ -4713,12 +4733,6 @@ func shouldReplicaQuiesce(
if log.V(4) {
log.Infof(ctx, "not quiescing: not leaseholder")
}
// Try to correct leader-not-leaseholder condition, if encountered,
// assuming the leaseholder is caught up to the commit index.
//
// TODO(peter): It is surprising that a method named shouldReplicaQuiesce
// might initiate transfer of the Raft leadership.
q.maybeTransferRaftLeader(ctx, status, now)
return nil, false
}
// We need all of Applied, Commit, LastIndex and Progress.Match indexes to be
Expand Down
50 changes: 4 additions & 46 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/kr/pretty"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"golang.org/x/time/rate"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -309,17 +308,10 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) {
r.txnWaitQueue.Clear(true /* disable */)
}

if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) &&
!r.store.TestingKnobs().DisableLeaderFollowsLeaseholder {
// If this replica is the raft leader but it is not the new lease holder,
// then try to transfer the raft leadership to match the lease. We like it
// when leases and raft leadership are collocated because that facilitates
// quick command application (requests generally need to make it to both the
// lease holder and the raft leader before being applied by other replicas).
// Note that this condition is also checked periodically when computing
// replica metrics.
r.maybeTransferRaftLeadership(ctx, newLease.Replica.ReplicaID)
}
// If we're the current raft leader, may want to transfer the leadership to
// the new leaseholder. Note that this condition is also checked periodically
// when ticking the replica.
r.maybeTransferRaftLeadership(ctx)

// Notify the store that a lease change occurred and it may need to
// gossip the updated store descriptor (with updated capacity).
Expand Down Expand Up @@ -462,40 +454,6 @@ func addSSTablePreApply(
return copied
}

// maybeTransferRaftLeadership attempts to transfer the leadership
// away from this node to target, if this node is the current raft
// leader. We don't attempt to transfer leadership if the transferee
// is behind on applying the log.
//
// TODO(tschottdorf): there's also maybeTransferRaftLeader. Only one should exist.
func (r *Replica) maybeTransferRaftLeadership(ctx context.Context, target roachpb.ReplicaID) {
err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
// Only the raft leader can attempt a leadership transfer.
if status := raftGroup.Status(); status.RaftState == raft.StateLeader {
// Only attempt this if the target has all the log entries. Although
// TransferLeader is supposed to do the right thing if the target is not
// caught up, this check avoids periods of 0 QPS:
// https://github.com/cockroachdb/cockroach/issues/22573#issuecomment-366106118
if pr, ok := status.Progress[uint64(target)]; (ok && pr.Match == r.mu.lastIndex) || r.mu.draining {
log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", target)
r.store.metrics.RangeRaftLeaderTransfers.Inc(1)
raftGroup.TransferLeader(uint64(target))
}
}
return true, nil
})
if err != nil {
// An error here indicates that this Replica has been destroyed
// while lacking the necessary synchronization (or even worse, it
// fails spuriously - could be a storage error), and so we avoid
// sweeping that under the rug.
//
// TODO(tschottdorf): this error is not handled any more
// at this level.
log.Fatal(ctx, roachpb.NewReplicaCorruptionError(err))
}
}

func (r *Replica) handleReplicatedEvalResult(
ctx context.Context,
rResult storagepb.ReplicatedEvalResult,
Expand Down
7 changes: 0 additions & 7 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9795,13 +9795,6 @@ func (q *testQuiescer) ownsValidLeaseRLocked(ts hlc.Timestamp) bool {
return q.ownsValidLease
}

func (q *testQuiescer) maybeTransferRaftLeader(
ctx context.Context, status *raft.Status, ts hlc.Timestamp,
) {
// Nothing to do here. We test Raft leadership transfer in
// TestTransferRaftLeadership.
}

func TestShouldReplicaQuiesce(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
17 changes: 16 additions & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ func (s *Store) SetDraining(drain bool) {

if needsRaftTransfer {
r.raftMu.Lock()
r.maybeTransferRaftLeadership(ctx, drainingLease.Replica.ReplicaID)
r.maybeTransferRaftLeadership(ctx)
r.raftMu.Unlock()
}
}); err != nil {
Expand Down Expand Up @@ -2187,6 +2187,21 @@ func splitPostApply(
r.mu.Unlock()
log.Event(ctx, "copied timestamp cache")

// We need to explicitly wake up the Raft group on the right-hand range or
// else the range could be underreplicated for an indefinite period of time.
//
// Specifically, suppose one of the replicas of the left-hand range never
// applies this split trigger, e.g., because it catches up via a snapshot that
// advances it past this split. That store won't create the right-hand replica
// until it receives a Raft message addressed to the right-hand range. But
// since new replicas start out quiesced, unless we explicitly awaken the
// Raft group, there might not be any Raft traffic for quite a while.
if err := rightRng.withRaftGroup(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) {
return true, nil
}); err != nil {
log.Fatalf(ctx, "unable to create raft group for right-hand range in split: %s", err)
}

// Invoke the leasePostApply method to ensure we properly initialize
// the replica according to whether it holds the lease. This enables
// the txnWaitQueue.
Expand Down

0 comments on commit c23d984

Please sign in to comment.