Skip to content

Commit

Permalink
storage: permit range leadership lease transfers
Browse files Browse the repository at this point in the history
Allow the leader of a range to propose a LeaderLease request naming
another replica as the leader.

referencing cockroachdb#6929
  • Loading branch information
andreimatei committed Jul 7, 2016
1 parent 36edf3a commit a9ce86c
Show file tree
Hide file tree
Showing 16 changed files with 1,152 additions and 499 deletions.
24 changes: 19 additions & 5 deletions roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,9 @@ func (*TruncateLogRequest) Method() Method { return TruncateLog }
// Method implements the Request interface.
func (*LeaderLeaseRequest) Method() Method { return LeaderLease }

// Method implements the Request interface.
func (*LeaseTransferRequest) Method() Method { return LeaseTransfer }

// Method implements the Request interface.
func (*ComputeChecksumRequest) Method() Method { return ComputeChecksum }

Expand Down Expand Up @@ -618,6 +621,12 @@ func (llr *LeaderLeaseRequest) ShallowCopy() Request {
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (lt *LeaseTransferRequest) ShallowCopy() Request {
shallowCopy := *lt
return &shallowCopy
}

// ShallowCopy implements the Request interface.
func (ccr *ComputeChecksumRequest) ShallowCopy() Request {
shallowCopy := *ccr
Expand Down Expand Up @@ -655,6 +664,7 @@ func (*NoopRequest) createReply() Response { return &NoopResponse{
func (*MergeRequest) createReply() Response { return &MergeResponse{} }
func (*TruncateLogRequest) createReply() Response { return &TruncateLogResponse{} }
func (*LeaderLeaseRequest) createReply() Response { return &LeaderLeaseResponse{} }
func (*LeaseTransferRequest) createReply() Response { return &LeaderLeaseResponse{} }
func (*ComputeChecksumRequest) createReply() Response { return &ComputeChecksumResponse{} }
func (*VerifyChecksumRequest) createReply() Response { return &VerifyChecksumResponse{} }

Expand Down Expand Up @@ -825,8 +835,12 @@ func (*ResolveIntentRangeRequest) flags() int { return isWrite | isRange }
func (*NoopRequest) flags() int { return isRead } // slightly special
func (*MergeRequest) flags() int { return isWrite }
func (*TruncateLogRequest) flags() int { return isWrite }
func (*LeaderLeaseRequest) flags() int { return isWrite }
func (*ComputeChecksumRequest) flags() int { return isWrite }
func (*VerifyChecksumRequest) flags() int { return isWrite }
func (*CheckConsistencyRequest) flags() int { return isAdmin | isRange }
func (*ChangeFrozenRequest) flags() int { return isWrite | isRange }

// TODO(tschottdorf): consider setting isAlone on LeaderLeaseRequest and
// LeaseTransferRequest.
func (*LeaderLeaseRequest) flags() int { return isWrite }
func (*LeaseTransferRequest) flags() int { return isWrite }
func (*ComputeChecksumRequest) flags() int { return isWrite }
func (*VerifyChecksumRequest) flags() int { return isWrite }
func (*CheckConsistencyRequest) flags() int { return isAdmin | isRange }
func (*ChangeFrozenRequest) flags() int { return isWrite | isRange }
1,014 changes: 624 additions & 390 deletions roachpb/api.pb.go

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,22 @@ message LeaderLeaseRequest {
optional Lease lease = 2[(gogoproto.nullable) = false];
}

// A LeaseTransferRequest represents the arguments to the LeaseTransfer()
// method. It is sent by a replica that currently holds the range lease and
// wants to transfer it away.
// Like a LeaderLeaseRequest, this request has the effect of instituting a new
// lease. The difference is that the new lease is allowed to overlap the
// existing one. It is a separate request because the LeaderLeaseRequest is
// special - it's not subject to the same replay protection restrictions as
// other requests, instead being protected from replays by the fact that leases
// are not generally allowed to overlap. The LeaseTransferRequest is not special
// in this respect (for example, the proposer of this command is checked to have
// been holding the lease when the proposal was made).
message LeaseTransferRequest {
optional Span header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
optional Lease lease = 2 [(gogoproto.nullable) = false];
}

// A LeaderLeaseResponse is the response to a LeaderLease()
// operation.
message LeaderLeaseResponse{
Expand Down Expand Up @@ -667,6 +683,7 @@ message RequestUnion {
optional NoopRequest noop = 25;
optional InitPutRequest init_put = 26;
optional ChangeFrozenRequest change_frozen = 27;
optional LeaseTransferRequest lease_transfer = 28;
}

// A ResponseUnion contains exactly one of the optional responses.
Expand Down
9 changes: 9 additions & 0 deletions roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (ba *BatchRequest) CreateReply() *BatchResponse {
merge int
truncateLog int
leaderLease int
leaseTransfer int
reverseScan int
computeChecksum int
verifyChecksum int
Expand Down Expand Up @@ -285,6 +286,8 @@ func (ba *BatchRequest) CreateReply() *BatchResponse {
counts.truncateLog++
case *LeaderLeaseRequest:
counts.leaderLease++
case *LeaseTransferRequest:
counts.leaseTransfer++
case *ReverseScanRequest:
counts.reverseScan++
case *ComputeChecksumRequest:
Expand Down Expand Up @@ -324,6 +327,7 @@ func (ba *BatchRequest) CreateReply() *BatchResponse {
merge []MergeResponse
truncateLog []TruncateLogResponse
leaderLease []LeaderLeaseResponse
leaseTransfer []LeaderLeaseResponse
reverseScan []ReverseScanResponse
computeChecksum []ComputeChecksumResponse
verifyChecksum []VerifyChecksumResponse
Expand Down Expand Up @@ -439,6 +443,11 @@ func (ba *BatchRequest) CreateReply() *BatchResponse {
bufs.leaderLease = make([]LeaderLeaseResponse, counts.leaderLease)
}
reply, bufs.leaderLease = &bufs.leaderLease[0], bufs.leaderLease[1:]
case *LeaseTransferRequest:
if bufs.leaseTransfer == nil {
bufs.leaseTransfer = make([]LeaderLeaseResponse, counts.leaseTransfer)
}
reply, bufs.leaseTransfer = &bufs.leaseTransfer[0], bufs.leaseTransfer[1:]
case *ReverseScanRequest:
if bufs.reverseScan == nil {
bufs.reverseScan = make([]ReverseScanResponse, counts.reverseScan)
Expand Down
4 changes: 4 additions & 0 deletions roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,10 @@ func (l Lease) String() string {

// Covers returns true if the given timestamp can be served by the Lease.
// This is the case if the timestamp precedes the Lease's stasis period.
// Note that the fact that a lease convers a timestamp is not enough for the
// holder of the lease to be able to serve a read with that timestamp;
// pendingLeaderLeaseRequest.TransferInProgress() should also be consulted to
// account for possible lease transfers.
func (l Lease) Covers(timestamp hlc.Timestamp) bool {
return timestamp.Less(l.StartStasis)
}
Expand Down
2 changes: 2 additions & 0 deletions roachpb/method.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ const (
TruncateLog
// LeaderLease requests a leader lease for a replica.
LeaderLease
// LeaseTransfer transfers the range lease from a lease holder to a new one.
LeaseTransfer
// ComputeChecksum starts a checksum computation over a replica snapshot.
ComputeChecksum
// VerifyChecksum verifies the checksum computed through an earlier
Expand Down
4 changes: 2 additions & 2 deletions roachpb/method_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

178 changes: 178 additions & 0 deletions storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"math"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -457,3 +458,180 @@ func TestRangeLookupUseReverse(t *testing.T) {
}
}
}

func TestRangeLeaseTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := storage.TestStoreContext()
mtc := &multiTestContext{}
mtc.storeContext = &ctx
var filterMu sync.Mutex
var filter func(filterArgs storagebase.FilterArgs) *roachpb.Error
mtc.storeContext.TestingKnobs.TestingCommandFilter =
func(filterArgs storagebase.FilterArgs) *roachpb.Error {
filterMu.Lock()
filterCopy := filter
filterMu.Unlock()
if filterCopy != nil {
return filterCopy(filterArgs)
}
return nil
}
var waitForTransferBlocked atomic.Value
waitForTransferBlocked.Store(false)
transferBlocked := make(chan struct{})
mtc.storeContext.TestingKnobs.LeaseTransferBlockedOnExtensionEvent = func(
_ roachpb.ReplicaDescriptor) {
if waitForTransferBlocked.Load().(bool) {
transferBlocked <- struct{}{}
waitForTransferBlocked.Store(false)
}
}
mtc.Start(t, 2)
defer mtc.Stop()

// First, do a write; we'll use it to determine when the dust has settled.
leftKey := roachpb.Key("a")
incArgs := incrementArgs(leftKey, 1)
if _, pErr := client.SendWrapped(mtc.distSenders[0], nil, &incArgs); pErr != nil {
t.Fatal(pErr)
}

// Get the left range's ID.
rangeID := mtc.stores[0].LookupReplica(roachpb.RKey("a"), nil).RangeID

// Replicate the left range onto node 1.
mtc.replicateRange(rangeID, 1)

replica0 := mtc.stores[0].LookupReplica(roachpb.RKey("a"), nil)
replica1 := mtc.stores[1].LookupReplica(roachpb.RKey("a"), nil)
gArgs := getArgs(leftKey)
replicaDesc, err := replica0.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}
// Check that replica0 can serve reads OK.
if _, pErr := client.SendWrappedWith(
mtc.senders[0], nil, roachpb.Header{Replica: replicaDesc}, &gArgs); pErr != nil {
t.Fatal(pErr)
}

// Move leadership to store 1.
var newLeaderDesc roachpb.ReplicaDescriptor
util.SucceedsSoon(t, func() error {
var err error
newLeaderDesc, err = replica1.GetReplicaDescriptor()
return err
})
// Capture the replica1's lease; we'll need its timing info so we can extend
// it below.
var replica1Lease roachpb.Lease
filterMu.Lock()
filter = func(filterArgs storagebase.FilterArgs) *roachpb.Error {
if filterArgs.Sid != mtc.stores[0].Ident.StoreID {
return nil
}
ltReq, ok := filterArgs.Req.(*roachpb.LeaseTransferRequest)
if !ok {
return nil
}
replica1Lease = ltReq.Lease
return nil
}
filterMu.Unlock()
if pErr := replica0.TransferLease(newLeaderDesc); pErr != nil {
t.Fatal(pErr)
}

// Check that replica0 doesn't serve reads any more.
replicaDesc, err = replica0.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}
_, pErr := client.SendWrappedWith(
mtc.senders[0], nil, roachpb.Header{Replica: replicaDesc}, &gArgs)
notLeaderError, ok := pErr.GetDetail().(*roachpb.NotLeaderError)
if !ok {
t.Fatalf("Expected %T, got %s", &roachpb.NotLeaderError{}, pErr)
}
if *(notLeaderError.Leader) != newLeaderDesc {
t.Fatalf("Expected leader %+v, got %+v",
newLeaderDesc, notLeaderError.Leader)
}

// Check that replica1 now has the lease (or gets it soon).
util.SucceedsSoon(t, func() error {
if _, pErr := client.SendWrappedWith(
mtc.senders[1], nil, roachpb.Header{Replica: replicaDesc}, &gArgs); pErr != nil {
return pErr.GoError()
}
return nil
})

// Make replica1 extend its lease and transfer the lease immediately after
// that. Test that the transfer still happens (it'll wait until the extension
// is done).
extensionSem := make(chan struct{})
filterMu.Lock()
filter = func(filterArgs storagebase.FilterArgs) *roachpb.Error {
if filterArgs.Sid != mtc.stores[1].Ident.StoreID {
return nil
}
llReq, ok := filterArgs.Req.(*roachpb.LeaderLeaseRequest)
if !ok {
return nil
}
if llReq.Lease.Replica == newLeaderDesc {
// Notify the main thread that the extension is in progress and wait for
// the signal to proceed.
filterMu.Lock()
filter = nil
filterMu.Unlock()
extensionSem <- struct{}{}
<-extensionSem
}
return nil
}
filterMu.Unlock()
// Initiate an extension.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
shouldRenewTS := replica1Lease.StartStasis.Add(-1, 0)
mtc.manualClock.Set(shouldRenewTS.WallTime + 1)
if _, pErr := client.SendWrappedWith(
mtc.senders[1], nil,
roachpb.Header{Replica: replicaDesc}, &gArgs); pErr != nil {
panic(pErr)
}
}()

<-extensionSem
waitForTransferBlocked.Store(true)
// Initiate a transfer.
wg.Add(1)
go func() {
defer wg.Done()
// Transfer back from replica1 to replica0.
if pErr := replica1.TransferLease(replicaDesc); pErr != nil {
panic(pErr)
}
}()
// Wait for the transfer to be blocked by the extension.
<-transferBlocked
// Now unblock the extension.
extensionSem <- struct{}{}
// Check that the transfer to replica1 eventually happens.
util.SucceedsSoon(t, func() error {
if _, pErr := client.SendWrappedWith(
mtc.senders[0], nil,
roachpb.Header{Replica: replicaDesc}, &gArgs); pErr != nil {
return pErr.GoError()
}
return nil
})
filterMu.Lock()
filter = nil
filterMu.Unlock()
wg.Wait()
}
8 changes: 8 additions & 0 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ import (
// Check that Stores implements the RangeDescriptorDB interface.
var _ kv.RangeDescriptorDB = &storage.Stores{}

// rg1 returns a wrapping sender that changes all requests to range 0 to
// requests to range 1.
// This function is DEPRECATED. Send your requests to the right range by
// properly initializing the request header.
func rg1(s *storage.Store) client.Sender {
return client.Wrap(s, func(ba roachpb.BatchRequest) roachpb.BatchRequest {
if ba.RangeID == 0 {
Expand Down Expand Up @@ -713,6 +717,10 @@ func (m *multiTestContext) replicateRange(rangeID roachpb.RangeID, dests ...int)
m.t.Fatal(err)
}

if dest >= len(m.stores) {
m.t.Fatalf("store index %d out of range; there's only %d of them", dest, len(m.stores))
}

if err := rep.ChangeReplicas(roachpb.ADD_REPLICA,
roachpb.ReplicaDescriptor{
NodeID: m.stores[dest].Ident.NodeID,
Expand Down
Loading

0 comments on commit a9ce86c

Please sign in to comment.