Skip to content

Commit

Permalink
Epoch-based range leases implementation
Browse files Browse the repository at this point in the history
Introduce new epoch-based range leases. These are designed to use
the same machinery as the expiration-based leases but use epochs from
the node liveness table instead of expirations.

The same Lease protobuf is utilized for both types of leases, but
there's now an optional Epoch. Previously, the lease proto had a "stasis"
timestamp that's now removed and replaced by logic in the replica to
evaluate the state of a lease.

In order to evaluate whether a lease is valid at command apply time
(downstream of Raft), we evaluate the lease upstream of Raft and send
it with every Raft command to be compared to the lease at apply time.

See: https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/range_leases.md
  • Loading branch information
spencerkimball committed Oct 31, 2016
1 parent 4f704be commit f5ddf81
Show file tree
Hide file tree
Showing 40 changed files with 1,151 additions and 968 deletions.
3 changes: 2 additions & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ func tryRaftLogEntry(kv engine.MVCCKeyValue) (string, error) {
return "", err
}
ent.Data = nil
return fmt.Sprintf("%s by %v\n%s\n%s\n", &ent, cmd.OriginReplica, &cmd.Cmd, &cmd), nil
return fmt.Sprintf("%s by %v (lease %s)\n%s\n%s\n",
&ent, cmd.ProposerReplica, cmd.ProposerLease, &cmd.Cmd, &cmd), nil
}
return fmt.Sprintf("%s: EMPTY\n", &ent), nil
} else if ent.Type == raftpb.EntryConfChange {
Expand Down
39 changes: 27 additions & 12 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,25 +954,40 @@ var _ fmt.Stringer = &Lease{}

func (l Lease) String() string {
start := time.Unix(0, l.Start.WallTime).UTC()
expiration := time.Unix(0, l.Expiration.WallTime).UTC()
return fmt.Sprintf("replica %s %s %s", l.Replica, start, expiration.Sub(start))
}

// Covers returns true if the given timestamp can be served by the Lease.
// This is the case if the timestamp precedes the Lease's stasis period.
// Note that the fact that a lease covers a timestamp is not enough for the
// holder of the lease to be able to serve a read with that timestamp;
// pendingLeaderLeaseRequest.TransferInProgress() should also be consulted to
// account for possible lease transfers.
func (l Lease) Covers(timestamp hlc.Timestamp) bool {
return timestamp.Less(l.StartStasis)
if l.Epoch == 0 {
expiration := time.Unix(0, l.Expiration.WallTime).UTC()
return fmt.Sprintf("replica %s start=%s duration=%s", l.Replica, start, expiration.Sub(start))
}
return fmt.Sprintf("replica %s epoch=%d start=%s", l.Replica, l.Epoch, start)
}

// OwnedBy returns whether the given store is the lease owner.
func (l Lease) OwnedBy(storeID StoreID) bool {
return l.Replica.StoreID == storeID
}

// Empty returns whether the lease is uninitialized.
func (l Lease) Empty() bool {
return l.Replica.StoreID == 0
}

// Verify determines whether ol is either the exact same lease,
// or else is an extension of the same expiration-based lease.
func (l Lease) Verify(ol Lease) error {
if l == ol {
return nil
}
// The one exception is an expiration-based lease which has
// been extended.
if l.Epoch == 0 && ol.Epoch == 0 && l.Expiration.Less(ol.Expiration) {
l.Expiration = ol.Expiration
if l == ol {
return nil
}
}
return errors.Errorf("leases %+v and %+v do not verify", l, ol)
}

// AsIntents takes a slice of spans and returns it as a slice of intents for
// the given transaction.
func AsIntents(spans []Span, txn *Transaction) []Intent {
Expand Down
280 changes: 123 additions & 157 deletions pkg/roachpb/data.pb.go

Large diffs are not rendered by default.

30 changes: 7 additions & 23 deletions pkg/roachpb/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,7 @@ message Intent {
}

// Lease contains information about range leases including the
// expiration and lease holder. It defines the two intervals
// [start, start_stasis) and [start_stasis, expiration). The
// former encompasses those timestamps for which the lease is
// active, while the latter is a cooldown period which avoids
// inconsistencies during lease holder changes as explained below.
// expiration and lease holder.
message Lease {
option (gogoproto.goproto_stringer) = false;
option (gogoproto.populate) = true;
Expand All @@ -306,31 +302,19 @@ message Lease {
// must be greater than the last lease expiration or the lease request
// is considered invalid.
optional util.hlc.Timestamp start = 1 [(gogoproto.nullable) = false];
// Before the lease expires, it enters a "stasis period" the length of which
// is usually determined by the lease holder's maximum allowed clock offset.
// During this stasis period, the lease must not be used (but can be extended
// by the owner instead). This prevents a failure of linearizability on a
// single register during lease changes. Without that stasis period, the
// following could occur:
// * a range lease gets committed on the new lease holder (but not the old).
// * client proposes and commits a write on new lease holder (with a timestamp
// just greater than the expiration of the old lease).
// * client tries to read what it wrote, but hits a slow coordinator
// (which assigns a timestamp covered by the old lease).
// * the read is served by the old lease holder (which has not processed the
// change in lease holdership).
// * the client fails to read their own write.
//
// Instead, the old lease holder must refuse to serve the client's command on the
// basis that its timestamp falls within the stasis period.
optional util.hlc.Timestamp start_stasis = 4 [(gogoproto.nullable) = false];

// The expiration is a timestamp at which the lease expires. This means that
// a new lease can be granted for a later timestamp.
optional util.hlc.Timestamp expiration = 2 [(gogoproto.nullable) = false];

// The address of the would-be lease holder.
optional ReplicaDescriptor replica = 3 [(gogoproto.nullable) = false];

reserved 4;

// The epoch of the lease holder's node liveness entry. If this value
// is non-zero, the start and expiration values are ignored.
optional int64 epoch = 5 [(gogoproto.nullable) = false];
}

// AbortCacheEntry contains information about a transaction which has
Expand Down
83 changes: 36 additions & 47 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,42 @@ func TestMakePriorityLimits(t *testing.T) {
}
}

func TestLeaseVerify(t *testing.T) {
r1 := ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1}
r2 := ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2}
ts1 := makeTS(1, 1)
ts2 := makeTS(2, 1)
ts3 := makeTS(3, 1)

epoch1 := Lease{Replica: r1, Start: ts1, Epoch: 1}
epoch2 := Lease{Replica: r1, Start: ts1, Epoch: 2}
expire1 := Lease{Replica: r1, Start: ts1, Expiration: ts2}
expire2 := Lease{Replica: r1, Start: ts1, Expiration: ts3}
epoch2_ts2 := Lease{Replica: r2, Start: ts2, Epoch: 2}
expire2_ts2 := Lease{Replica: r2, Start: ts2, Expiration: ts3}

testCases := []struct {
l, ol Lease
expSuccess bool
}{
{epoch1, epoch1, true}, // same epoch lease
{expire1, expire1, true}, // same expiration lease
{epoch1, epoch2, false}, // different epoch leases
{epoch1, epoch2_ts2, false}, // different epoch leases
{expire1, expire2_ts2, false}, // different expiration leases
{expire1, expire2, true}, // same expiration lease, extended
{expire2, expire1, false}, // same expiration lease, extended but backwards
{epoch1, expire1, false}, // epoch and expiration leases
{expire1, epoch1, false}, // expiration and epoch leases
}

for i, tc := range testCases {
if err := tc.l.Verify(tc.ol); tc.expSuccess != (err == nil) {
t.Errorf("%d: expected success? %t; got %s", i, tc.expSuccess, err)
}
}
}

func TestSpanOverlaps(t *testing.T) {
sA := Span{Key: []byte("a")}
sD := Span{Key: []byte("d")}
Expand Down Expand Up @@ -717,53 +753,6 @@ func TestRSpanIntersect(t *testing.T) {
}
}

func TestLeaseCovers(t *testing.T) {
mk := func(ds ...int64) (sl []hlc.Timestamp) {
for _, d := range ds {
sl = append(sl, hlc.ZeroTimestamp.Add(d, 0))
}
return sl
}

ts10 := mk(10)[0]
ts1K := mk(1000)[0]

for i, test := range []struct {
lease Lease
in, out []hlc.Timestamp
}{
{
lease: Lease{
StartStasis: mk(1)[0],
Expiration: ts1K,
},
in: mk(0),
out: mk(1, 100, 500, 999, 1000),
},
{
lease: Lease{
Start: ts10,
StartStasis: mk(500)[0],
Expiration: ts1K,
},
out: mk(500, 999, 1000, 1001, 2000),
// Note that the lease covers timestamps before its start timestamp.
in: mk(0, 9, 10, 300, 499),
},
} {
for _, ts := range test.in {
if !test.lease.Covers(ts) {
t.Errorf("%d: should contain %s", i, ts)
}
}
for _, ts := range test.out {
if test.lease.Covers(ts) {
t.Errorf("%d: must not contain %s", i, ts)
}
}
}
}

func BenchmarkValueSetBytes(b *testing.B) {
v := Value{}
bytes := make([]byte, 16)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ func (s *Server) doDrain(modes []serverpb.DrainMode, setTo bool) ([]serverpb.Dra
case mode == serverpb.DrainMode_CLIENT:
err = s.pgServer.SetDraining(setTo)
case mode == serverpb.DrainMode_LEASES:
s.nodeLiveness.PauseHeartbeat(setTo)
err = s.node.SetDraining(setTo)
default:
err = errors.Errorf("unknown drain mode: %v (%d)", mode, mode)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func TestRangesResponse(t *testing.T) {
if len(ri.State.Desc.Replicas) != 1 || ri.State.Desc.Replicas[0] != expReplica {
t.Errorf("unexpected replica list %+v", ri.State.Desc.Replicas)
}
if ri.State.Lease == nil {
if ri.State.Lease.Empty() {
t.Error("expected a nontrivial Lease")
}
if ri.State.LastIndex == 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
},
reflect.TypeOf(&roachpb.Lease{}): {
populatedConstructor: func(r *rand.Rand) proto.Message { return roachpb.NewPopulatedLease(r, false) },
emptySum: 10006158318270644799,
populatedSum: 717371977055084394,
emptySum: 15357864913567733633,
populatedSum: 10752200836664780297,
},
reflect.TypeOf(&roachpb.RaftTruncatedState{}): {
populatedConstructor: func(r *rand.Rand) proto.Message { return roachpb.NewPopulatedRaftTruncatedState(r, false) },
Expand Down
79 changes: 32 additions & 47 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/gossiputil"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -697,7 +698,7 @@ func TestRefreshPendingCommands(t *testing.T) {
{DisableRefreshReasonSnapshotApplied: true, DisableRefreshReasonTicks: true},
{DisableRefreshReasonNewLeader: true, DisableRefreshReasonSnapshotApplied: true},
}
for _, c := range testCases {
for tcIdx, c := range testCases {
func() {
sc := storage.TestStoreConfig()
sc.TestingKnobs = c
Expand Down Expand Up @@ -749,14 +750,15 @@ func TestRefreshPendingCommands(t *testing.T) {
mtc.stopStore(0)
mtc.restartStore(0)

// Expire existing leases (i.e. move the clock forward). This allows node
// 3 to grab the lease later in the test.
mtc.expireLeases()
// Expire existing leases (i.e. move the clock forward, but don't
// increment epochs). This allows node 3 to grab the lease later
// in the test.
mtc.expireLeasesAndIncrementEpochs(false)
// Drain leases from nodes 0 and 1 to prevent them from grabbing any new
// leases.
for i := 0; i < 2; i++ {
if err := mtc.stores[i].DrainLeases(true); err != nil {
t.Fatal(err)
t.Fatalf("test case %d, store %d: %v", tcIdx, i, err)
}
}

Expand Down Expand Up @@ -1001,50 +1003,33 @@ func TestStoreRangeDownReplicate(t *testing.T) {
desc := replica.Desc()
mtc.replicateRange(desc.RangeID, 3, 4)

maxTimeout := time.After(10 * time.Second)
succeeded := false
i := 0
for !succeeded {
select {
case <-maxTimeout:
t.Fatalf("Failed to achieve proper replication within 10 seconds")
case <-time.After(10 * time.Millisecond):
rangeDesc := getRangeMetadata(rightKeyAddr, mtc, t)
if count := len(rangeDesc.Replicas); count < 3 {
t.Fatalf("Removed too many replicas; expected at least 3 replicas, found %d", count)
} else if count == 3 {
succeeded = true
break
}

// Cycle the lease to the next replica (on the next store) if that
// replica still exists. This avoids the condition in which we try
// to continuously remove the replica on a store when
// down-replicating while it also still holds the lease.
for {
i++
if i >= len(mtc.stores) {
i = 0
}
rep := mtc.stores[i].LookupReplica(rightKeyAddr, nil)
if rep != nil {
mtc.expireLeases()
// Force the read command request a new lease.
getArgs := getArgs(rightKey)
if _, err := client.SendWrapped(context.Background(), mtc.distSenders[i], &getArgs); err != nil {
t.Fatal(err)
}
mtc.stores[i].ForceReplicationScanAndProcess()
break
}
}
// Initialize the gossip network.
storeDescs := make([]*roachpb.StoreDescriptor, 0, len(mtc.stores))
for _, s := range mtc.stores {
desc, err := s.Descriptor()
if err != nil {
t.Fatal(err)
}
storeDescs = append(storeDescs, desc)
}
for _, g := range mtc.gossips {
gossiputil.NewStoreGossiper(g).GossipStores(storeDescs, t)
}

// Expire range leases one more time, so that any remaining resolutions can
// get a range lease.
// TODO(bdarnell): understand why some tests need this.
mtc.expireLeases()
util.SucceedsSoon(t, func() error {
rangeDesc := getRangeMetadata(rightKeyAddr, mtc, t)
if count := len(rangeDesc.Replicas); count < 3 {
t.Fatalf("Removed too many replicas; expected at least 3 replicas, found %d", count)
} else if count == 3 {
return nil
}
// Force scan & replication queue for each store to make sure the
// lease holder is given ample opportunity to down-replicate.
for i := 0; i < len(mtc.stores); i++ {
mtc.stores[i].ForceReplicationScanAndProcess()
}
return errors.Errorf("range has > 3 replicas: %+v", rangeDesc)
})
}

// TestChangeReplicasDescriptorInvariant tests that a replica change aborts if
Expand Down Expand Up @@ -2662,7 +2647,7 @@ func TestRangeQuiescence(t *testing.T) {
mtc.Start(t, 3)
defer mtc.Stop()

stopNodeLivenessHeartbeats(mtc)
pauseNodeLivenessHeartbeats(mtc, true)

// Replica range 1 to all 3 nodes.
mtc.replicateRange(1, 1, 2)
Expand Down
Loading

0 comments on commit f5ddf81

Please sign in to comment.