Skip to content

Commit

Permalink
Epoch-based range leases implementation
Browse files Browse the repository at this point in the history
Introduce new epoch-based range leases. These are designed to use
the same machinery as the expiration-based leases but use epochs from
the node liveness table instead of expirations.

The same Lease protobuf is utilized for both types of leases, but
there's now an optional Epoch. Previously, the lease proto had a "stasis"
timestamp that's now removed and replaced by logic in the replica to
evaluate the state of a lease.

In order to evaluate whether a lease is valid at command apply time
(downstream of Raft), we evaluate the lease upstream of Raft and send
it with every Raft command to be compared to the lease at apply time.

See: https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/range_leases.md

Epoch-based leases can be enabled or disabled with the
`COCKROACH_ENABLE_EPOCH_LEASES` environment variable.

This change fixes a previously possible loophole in lease verification
for expiration-based leases. In this scenario, a node could propose a
command with an older lease, transfer it away, receive a new lease,
and then execute the command.
  • Loading branch information
spencerkimball committed Dec 5, 2016
1 parent 1e84ade commit 31ad27d
Show file tree
Hide file tree
Showing 45 changed files with 1,541 additions and 947 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func tryRaftLogEntry(kv engine.MVCCKeyValue) (string, error) {
return "", err
}
ent.Data = nil
return fmt.Sprintf("%s by %v\n%s\n%s\n", &ent, cmd.OriginReplica, cmd.BatchRequest, &cmd), nil
return fmt.Sprintf("%s by %s\n%s\n%s\n", &ent, cmd.OriginLease, cmd.BatchRequest, &cmd), nil
}
return fmt.Sprintf("%s: EMPTY\n", &ent), nil
} else if ent.Type == raftpb.EntryConfChange {
Expand Down
65 changes: 52 additions & 13 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,26 +949,65 @@ func (t Transaction) GetObservedTimestamp(nodeID NodeID) (hlc.Timestamp, bool) {
var _ fmt.Stringer = &Lease{}

func (l Lease) String() string {
start := time.Unix(0, l.Start.WallTime).UTC()
expiration := time.Unix(0, l.Expiration.WallTime).UTC()
return fmt.Sprintf("replica %s %s %s", l.Replica, start, expiration.Sub(start))
}

// Covers returns true if the given timestamp can be served by the Lease.
// This is the case if the timestamp precedes the Lease's stasis period.
// Note that the fact that a lease covers a timestamp is not enough for the
// holder of the lease to be able to serve a read with that timestamp;
// pendingLeaderLeaseRequest.TransferInProgress() should also be consulted to
// account for possible lease transfers.
func (l Lease) Covers(timestamp hlc.Timestamp) bool {
return timestamp.Less(l.StartStasis)
var proposedSuffix string
if l.ProposedTS != nil {
proposedSuffix = fmt.Sprintf(" pro=%s", l.ProposedTS)
}
if l.Type() == LeaseExpiration {
return fmt.Sprintf("repl=%s start=%s exp=%s%s", l.Replica, l.Start, l.Expiration, proposedSuffix)
}
return fmt.Sprintf("repl=%s start=%s epo=%d%s", l.Replica, l.Start, l.Epoch, proposedSuffix)
}

// OwnedBy returns whether the given store is the lease owner.
func (l Lease) OwnedBy(storeID StoreID) bool {
return l.Replica.StoreID == storeID
}

// LeaseType describes the type of lease.
type LeaseType int

const (
// LeaseExpiration allows range operations while the wall clock is
// within the expiration timestamp.
LeaseExpiration LeaseType = iota
// LeaseEpoch allows range operations while the node liveness epoch
// is equal to the lease epoch.
LeaseEpoch
)

// Type returns the lease type.
func (l Lease) Type() LeaseType {
if l.Epoch == nil {
return LeaseExpiration
}
return LeaseEpoch
}

// Equivalent determines whether ol is considered the same lease
// for the purposes of matching leases when executing a command.
// For expiration-based leases, extensions are allowed.
// Ignore proposed timestamps for lease verification; for epoch-
// based leases, the start time of the lease is sufficient to
// avoid using an older lease with same epoch.
func (l Lease) Equivalent(ol Lease) error {
l.ProposedTS, ol.ProposedTS = nil, nil
// If both leases are epoch-based, we must dereference the epochs
// and then set to nil.
if l.Type() == LeaseEpoch && ol.Type() == LeaseEpoch && *l.Epoch == *ol.Epoch {
l.Epoch, ol.Epoch = nil, nil
}
// For expiration-based leases, extensions are considered equivalent.
if l.Type() == LeaseExpiration && ol.Type() == LeaseExpiration &&
l.Expiration.Less(ol.Expiration) {
l.Expiration = ol.Expiration
}
if l == ol {
return nil
}
return errors.Errorf("leases %+v and %+v are not equivalent", l, ol)
}

// AsIntents takes a slice of spans and returns it as a slice of intents for
// the given transaction.
func AsIntents(spans []Span, txn *Transaction) []Intent {
Expand Down
292 changes: 156 additions & 136 deletions pkg/roachpb/data.pb.go

Large diffs are not rendered by default.

31 changes: 8 additions & 23 deletions pkg/roachpb/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,7 @@ message Intent {
}

// Lease contains information about range leases including the
// expiration and lease holder. It defines the two intervals
// [start, start_stasis) and [start_stasis, expiration). The
// former encompasses those timestamps for which the lease is
// active, while the latter is a cooldown period which avoids
// inconsistencies during lease holder changes as explained below.
// expiration and lease holder.
message Lease {
option (gogoproto.goproto_stringer) = false;
option (gogoproto.populate) = true;
Expand All @@ -306,24 +302,6 @@ message Lease {
// must be greater than the last lease expiration or the lease request
// is considered invalid.
optional util.hlc.Timestamp start = 1 [(gogoproto.nullable) = false];
// Before the lease expires, it enters a "stasis period" the length of which
// is usually determined by the lease holder's maximum allowed clock offset.
// During this stasis period, the lease must not be used (but can be extended
// by the owner instead). This prevents a failure of linearizability on a
// single register during lease changes. Without that stasis period, the
// following could occur:
// * a range lease gets committed on the new lease holder (but not the old).
// * client proposes and commits a write on new lease holder (with a timestamp
// just greater than the expiration of the old lease).
// * client tries to read what it wrote, but hits a slow coordinator
// (which assigns a timestamp covered by the old lease).
// * the read is served by the old lease holder (which has not processed the
// change in lease holdership).
// * the client fails to read their own write.
//
// Instead, the old lease holder must refuse to serve the client's command on the
// basis that its timestamp falls within the stasis period.
optional util.hlc.Timestamp start_stasis = 4 [(gogoproto.nullable) = false];

// The expiration is a timestamp at which the lease expires. This means that
// a new lease can be granted for a later timestamp.
Expand All @@ -332,6 +310,9 @@ message Lease {
// The address of the would-be lease holder.
optional ReplicaDescriptor replica = 3 [(gogoproto.nullable) = false];

// The start of the lease stasis period. This field deprecated.
optional util.hlc.Timestamp deprecated_start_stasis = 4 [(gogoproto.nullable) = false];

// The current timestamp when this lease has been proposed. Used after a
// transfer and after a node restart to enforce that a node only uses leases
// proposed after the time of the said transfer or restart. This is nullable
Expand All @@ -340,6 +321,10 @@ message Lease {
// TODO(andrei): Make this non-nullable after the rollout.
optional util.hlc.Timestamp proposed_ts = 5 [(gogoproto.nullable) = true,
(gogoproto.customname) = "ProposedTS"];

// The epoch of the lease holder's node liveness entry. If this value
// is non-zero, the start and expiration values are ignored.
optional int64 epoch = 6 [(gogoproto.nullable) = true];
}

// AbortCacheEntry contains information about a transaction which has
Expand Down
91 changes: 44 additions & 47 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/gogo/protobuf/proto"
)

func makeTS(walltime int64, logical int32) hlc.Timestamp {
Expand Down Expand Up @@ -578,6 +579,49 @@ func TestMakePriorityLimits(t *testing.T) {
}
}

func TestLeaseEquivalence(t *testing.T) {
r1 := ReplicaDescriptor{NodeID: 1, StoreID: 1, ReplicaID: 1}
r2 := ReplicaDescriptor{NodeID: 2, StoreID: 2, ReplicaID: 2}
ts1 := makeTS(1, 1)
ts2 := makeTS(2, 1)
ts3 := makeTS(3, 1)

epoch1 := Lease{Replica: r1, Start: ts1, Epoch: proto.Int64(1)}
epoch2 := Lease{Replica: r1, Start: ts1, Epoch: proto.Int64(2)}
expire1 := Lease{Replica: r1, Start: ts1, Expiration: ts2}
expire2 := Lease{Replica: r1, Start: ts1, Expiration: ts3}
epoch2TS2 := Lease{Replica: r2, Start: ts2, Epoch: proto.Int64(2)}
expire2TS2 := Lease{Replica: r2, Start: ts2, Expiration: ts3}

proposed1 := Lease{Replica: r1, Start: ts1, Epoch: proto.Int64(1), ProposedTS: &ts1}
proposed2 := Lease{Replica: r1, Start: ts1, Epoch: proto.Int64(2), ProposedTS: &ts1}
proposed3 := Lease{Replica: r1, Start: ts1, Epoch: proto.Int64(1), ProposedTS: &ts2}

testCases := []struct {
l, ol Lease
expSuccess bool
}{
{epoch1, epoch1, true}, // same epoch lease
{expire1, expire1, true}, // same expiration lease
{epoch1, epoch2, false}, // different epoch leases
{epoch1, epoch2TS2, false}, // different epoch leases
{expire1, expire2TS2, false}, // different expiration leases
{expire1, expire2, true}, // same expiration lease, extended
{expire2, expire1, false}, // same expiration lease, extended but backwards
{epoch1, expire1, false}, // epoch and expiration leases
{expire1, epoch1, false}, // expiration and epoch leases
{proposed1, proposed1, true}, // exact leases with identical timestamps
{proposed1, proposed2, false}, // same proposed timestamps, but diff epochs
{proposed1, proposed3, true}, // different proposed timestamps, same lease
}

for i, tc := range testCases {
if err := tc.l.Equivalent(tc.ol); tc.expSuccess != (err == nil) {
t.Errorf("%d: expected success? %t; got %s", i, tc.expSuccess, err)
}
}
}

func TestSpanOverlaps(t *testing.T) {
sA := Span{Key: []byte("a")}
sD := Span{Key: []byte("d")}
Expand Down Expand Up @@ -722,53 +766,6 @@ func TestRSpanIntersect(t *testing.T) {
}
}

func TestLeaseCovers(t *testing.T) {
mk := func(ds ...int64) (sl []hlc.Timestamp) {
for _, d := range ds {
sl = append(sl, hlc.ZeroTimestamp.Add(d, 0))
}
return sl
}

ts10 := mk(10)[0]
ts1K := mk(1000)[0]

for i, test := range []struct {
lease Lease
in, out []hlc.Timestamp
}{
{
lease: Lease{
StartStasis: mk(1)[0],
Expiration: ts1K,
},
in: mk(0),
out: mk(1, 100, 500, 999, 1000),
},
{
lease: Lease{
Start: ts10,
StartStasis: mk(500)[0],
Expiration: ts1K,
},
out: mk(500, 999, 1000, 1001, 2000),
// Note that the lease covers timestamps before its start timestamp.
in: mk(0, 9, 10, 300, 499),
},
} {
for _, ts := range test.in {
if !test.lease.Covers(ts) {
t.Errorf("%d: should contain %s", i, ts)
}
}
for _, ts := range test.out {
if test.lease.Covers(ts) {
t.Errorf("%d: must not contain %s", i, ts)
}
}
}
}

func BenchmarkValueSetBytes(b *testing.B) {
v := Value{}
bytes := make([]byte, 16)
Expand Down
7 changes: 6 additions & 1 deletion pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,12 @@ func (e *NotLeaseHolderError) Error() string {
}

func (e *NotLeaseHolderError) message(_ *Error) string {
return fmt.Sprintf("range %d: replica %s not lease holder; %s is", e.RangeID, e.Replica, e.LeaseHolder)
if e.LeaseHolder == nil {
return fmt.Sprintf("range %d: replica %s not lease holder; lease holder unknown", e.RangeID, e.Replica)
} else if e.Lease != nil {
return fmt.Sprintf("range %d: replica %s not lease holder; current lease is %s", e.RangeID, e.Replica, e.Lease)
}
return fmt.Sprintf("range %d: replica %s not lease holder; replica %s is", e.RangeID, e.Replica, *e.LeaseHolder)
}

var _ ErrorDetailInterface = &NotLeaseHolderError{}
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ func (s *Server) doDrain(modes []serverpb.DrainMode, setTo bool) ([]serverpb.Dra
case mode == serverpb.DrainMode_CLIENT:
err = s.pgServer.SetDraining(setTo)
case mode == serverpb.DrainMode_LEASES:
s.nodeLiveness.PauseHeartbeat(setTo)
err = s.node.SetDraining(setTo)
default:
err = errors.Errorf("unknown drain mode: %v (%d)", mode, mode)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
reflect.TypeOf(&roachpb.Lease{}): {
populatedConstructor: func(r *rand.Rand) proto.Message { return roachpb.NewPopulatedLease(r, false) },
emptySum: 10006158318270644799,
populatedSum: 17421216026521129287,
populatedSum: 1304511461063751549,
},
reflect.TypeOf(&roachpb.RaftTruncatedState{}): {
populatedConstructor: func(r *rand.Rand) proto.Message { return roachpb.NewPopulatedRaftTruncatedState(r, false) },
Expand Down
15 changes: 8 additions & 7 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ func TestRefreshPendingCommands(t *testing.T) {
{DisableRefreshReasonNewLeader: true, DisableRefreshReasonSnapshotApplied: true},
}
for _, c := range testCases {
func() {
t.Run("", func(t *testing.T) {
sc := storage.TestStoreConfig(nil)
sc.TestingKnobs = c
// Disable periodic gossip tasks which can move the range 1 lease
Expand Down Expand Up @@ -968,14 +968,15 @@ func TestRefreshPendingCommands(t *testing.T) {
mtc.stopStore(0)
mtc.restartStore(0)

// Expire existing leases (i.e. move the clock forward). This allows node
// 3 to grab the lease later in the test.
mtc.expireLeases()
// Expire existing leases (i.e. move the clock forward, but don't
// increment epochs). This allows node 3 to grab the lease later
// in the test.
mtc.expireLeasesWithoutIncrementingEpochs()
// Drain leases from nodes 0 and 1 to prevent them from grabbing any new
// leases.
for i := 0; i < 2; i++ {
if err := mtc.stores[i].DrainLeases(true); err != nil {
t.Fatal(err)
t.Fatalf("store %d: %v", i, err)
}
}

Expand All @@ -993,7 +994,7 @@ func TestRefreshPendingCommands(t *testing.T) {
}

mtc.waitForValues(roachpb.Key("a"), []int64{15, 15, 15})
}()
})
}
}

Expand Down Expand Up @@ -3013,7 +3014,7 @@ func TestRangeQuiescence(t *testing.T) {
defer mtc.Stop()
mtc.Start(t, 3)

stopNodeLivenessHeartbeats(mtc)
pauseNodeLivenessHeartbeats(mtc, true)

// Replica range 1 to all 3 nodes.
mtc.replicateRange(1, 1, 2)
Expand Down
Loading

0 comments on commit 31ad27d

Please sign in to comment.