Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
kv: check timestamp cache updates against active lease
Browse files Browse the repository at this point in the history
This commit improves an existing (race-only) assertion to not only check
timestamp cache updates against the current HLC clock, but to also do so
against the lease that the request performing the update is evaluating
under. This ensures that timestamp cache updates are not lost during
non-cooperative lease change.
nvanbenschoten authored and andreimatei committed Feb 12, 2021
1 parent 14d5eef commit 616ce3c
Showing 8 changed files with 102 additions and 61 deletions.
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/kvserverpb/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -12,7 +12,10 @@ go_library(
embed = [":kvserverpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb",
visibility = ["//visibility:public"],
deps = ["//pkg/roachpb"],
deps = [
"//pkg/roachpb",
"//pkg/util/hlc",
],
)

proto_library(
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/kvserverpb/lease_status.go
Original file line number Diff line number Diff line change
@@ -10,7 +10,10 @@

package kvserverpb

import roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// IsValid returns whether the lease was valid at the time that the
// lease status was computed.
@@ -22,3 +25,15 @@ func (st LeaseStatus) IsValid() bool {
func (st LeaseStatus) OwnedBy(storeID roachpb.StoreID) bool {
return st.Lease.OwnedBy(storeID)
}

// Expiration returns the expiration of the lease.
func (st LeaseStatus) Expiration() hlc.Timestamp {
switch st.Lease.Type() {
case roachpb.LeaseExpiration:
return st.Lease.GetExpiration()
case roachpb.LeaseEpoch:
return st.Liveness.Expiration.ToTimestamp()
default:
panic("unexpected")
}
}
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
@@ -71,7 +71,7 @@ func (r *Replica) evalAndPropose(
ctx context.Context,
ba *roachpb.BatchRequest,
g *concurrency.Guard,
lease *roachpb.Lease,
st kvserverpb.LeaseStatus,
lul hlc.Timestamp,
) (chan proposalResult, func(), int64, *roachpb.Error) {
idKey := makeIDKey()
@@ -81,13 +81,13 @@ func (r *Replica) evalAndPropose(
// If the request hit a server-side concurrency retry error, immediately
// proagate the error. Don't assume ownership of the concurrency guard.
if isConcurrencyRetryError(pErr) {
pErr = maybeAttachLease(pErr, lease)
pErr = maybeAttachLease(pErr, &st.Lease)
return nil, nil, 0, pErr
}

// Attach the endCmds to the proposal and assume responsibility for
// releasing the concurrency guard if the proposal makes it to Raft.
proposal.ec = endCmds{repl: r, g: g}
proposal.ec = endCmds{repl: r, g: g, st: st}

// Pull out proposal channel to return. proposal.doneCh may be set to
// nil if it is signaled in this function.
@@ -143,7 +143,7 @@ func (r *Replica) evalAndPropose(
}

// Attach information about the proposer to the command.
proposal.command.ProposerLeaseSequence = lease.Sequence
proposal.command.ProposerLeaseSequence = st.Lease.Sequence

// Once a command is written to the raft log, it must be loaded into memory
// and replayed on all replicas. If a command is too big, stop it here. If
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ func (r *Replica) executeReadOnlyBatch(
}

// Otherwise, update the timestamp cache and release the concurrency guard.
ec, g := endCmds{repl: r, g: g}, nil
ec, g := endCmds{repl: r, g: g, st: st}, nil
ec.done(ctx, ba, br, pErr)

// Semi-synchronously process any intents that need resolving here in
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
@@ -702,6 +702,7 @@ func (r *Replica) collectSpans(
type endCmds struct {
repl *Replica
g *concurrency.Guard
st kvserverpb.LeaseStatus
}

// move moves the endCmds into the return value, clearing and making a call to
@@ -726,10 +727,13 @@ func (ec *endCmds) done(
}
defer ec.move() // clear

// Update the timestamp cache if the request is not being re-evaluated. Each
// request is considered in turn; only those marked as affecting the cache
// are processed.
ec.repl.updateTimestampCache(ctx, ba, br, pErr)
// Update the timestamp cache. Each request within the batch is considered
// in turn; only those marked as affecting the cache are processed. However,
// only do so if the request is consistent and was operating on the
// leaseholder under a valid range lease.
if ba.ReadConsistency == roachpb.CONSISTENT && ec.st.State == kvserverpb.LeaseState_VALID {
ec.repl.updateTimestampCache(ctx, &ec.st, ba, br, pErr)
}

// Release the latches acquired by the request and exit lock wait-queues.
// Must be done AFTER the timestamp cache is updated. ec.g is only set when
42 changes: 23 additions & 19 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
@@ -612,11 +612,12 @@ func TestReplicaContains(t *testing.T) {
}

func sendLeaseRequest(r *Replica, l *roachpb.Lease) error {
ctx := context.Background()
ba := roachpb.BatchRequest{}
ba.Timestamp = r.store.Clock().Now()
ba.Add(&roachpb.RequestLeaseRequest{Lease: *l})
exLease, _ := r.GetLease()
ch, _, _, pErr := r.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{})
st := r.CurrentLeaseStatus(ctx)
ch, _, _, pErr := r.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr == nil {
// Next if the command was committed, wait for the range to apply it.
// TODO(bdarnell): refactor this to a more conventional error-handling pattern.
@@ -1393,6 +1394,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

@@ -1412,11 +1414,11 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) {
StoreID: 2,
},
}
exLease, _ := tc.repl.GetLease()
st := tc.repl.CurrentLeaseStatus(ctx)
ba := roachpb.BatchRequest{}
ba.Timestamp = tc.repl.store.Clock().Now()
ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease})
ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{})
ch, _, _, pErr := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr == nil {
// Next if the command was committed, wait for the range to apply it.
// TODO(bdarnell): refactor to a more conventional error-handling pattern.
@@ -7865,7 +7867,6 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) {
repl := tc.repl

tc.repl.mu.Lock()
lease := *repl.mu.state.Lease
abandoned := make(map[int64]struct{}) // protected by repl.mu
tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) {
if _, ok := abandoned[int64(p.command.MaxLeaseIndex)]; ok {
@@ -7886,7 +7887,8 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) {
Key: roachpb.Key(fmt.Sprintf("k%d", i)),
},
})
ch, _, idx, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &lease, hlc.Timestamp{})
st := repl.CurrentLeaseStatus(ctx)
ch, _, idx, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if err != nil {
t.Fatal(err)
}
@@ -7942,7 +7944,6 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
}
return false, nil
}
lease := *tc.repl.mu.state.Lease
tc.repl.mu.Unlock()

const num = 10
@@ -7956,7 +7957,8 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
Key: roachpb.Key(fmt.Sprintf("k%d", i)),
},
})
ch, _, idx, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), &lease, hlc.Timestamp{})
st := tc.repl.CurrentLeaseStatus(ctx)
ch, _, idx, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if err != nil {
t.Fatal(err)
}
@@ -9287,7 +9289,8 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
}

exLease, _ := repl.GetLease()
ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{})
st := kvserverpb.LeaseStatus{Lease: exLease, State: kvserverpb.LeaseState_VALID}
ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
@@ -9306,6 +9309,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
func TestProposeWithAsyncConsensus(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
tc := testContext{}
tsc := TestStoreConfig(nil)

@@ -9320,7 +9324,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) {
}

stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, tsc)
repl := tc.repl

@@ -9332,8 +9336,8 @@ func TestProposeWithAsyncConsensus(t *testing.T) {
ba.AsyncConsensus = true

atomic.StoreInt32(&filterActive, 1)
exLease, _ := repl.GetLease()
ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{})
st := tc.repl.CurrentLeaseStatus(ctx)
ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
@@ -9396,8 +9400,8 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) {
ba.Timestamp = tc.Clock().Now()

atomic.StoreInt32(&filterActive, 1)
exLease, _ := repl.GetLease()
_, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &exLease, hlc.Timestamp{})
st := repl.CurrentLeaseStatus(ctx)
_, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
@@ -9415,7 +9419,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) {
ba2.Timestamp = tc.Clock().Now()

var pErr *roachpb.Error
ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), &exLease, hlc.Timestamp{})
ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
@@ -12507,7 +12511,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) {
}
tc.StartWithStoreConfig(t, stopper, cfg)
key := roachpb.Key("a")
lease, _ := tc.repl.GetLease()
st := tc.repl.CurrentLeaseStatus(ctx)
txn := newTransaction("test", key, roachpb.NormalUserPriority, tc.Clock())
txnID = txn.ID
ba := roachpb.BatchRequest{
@@ -12529,7 +12533,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) {
tc.repl.RaftLock()
sp := cfg.AmbientCtx.Tracer.StartSpan("replica send", tracing.WithForceRealSpan())
tracedCtx := tracing.ContextWithSpan(ctx, sp)
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease, hlc.Timestamp{})
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
@@ -12599,7 +12603,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) {
cfg.AmbientCtx.Tracer = tracer
tc.StartWithStoreConfig(t, stopper, cfg)
key := roachpb.Key("a")
lease, _ := tc.repl.GetLease()
st := tc.repl.CurrentLeaseStatus(ctx)
txn := newTransaction("test", key, roachpb.NormalUserPriority, tc.Clock())
ba := roachpb.BatchRequest{
Header: roachpb.Header{
@@ -12621,7 +12625,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) {
// Go out of our way to enable recording so that expensive logging is enabled
// for this context.
sp.SetVerbose(true)
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease, hlc.Timestamp{})
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
73 changes: 44 additions & 29 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
@@ -12,14 +12,13 @@ package kvserver

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
@@ -38,19 +37,54 @@ func setTimestampCacheLowWaterMark(
}
}

// addToTSCacheChecked adds the specified timestamp to the timestamp cache
// covering the range of keys from start to end. Before doing so, the function
// performs a few assertions to check for proper use of the timestamp cache.
func (r *Replica) addToTSCacheChecked(
ctx context.Context,
st *kvserverpb.LeaseStatus,
ba *roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
start, end roachpb.Key,
ts hlc.Timestamp,
txnID uuid.UUID,
) {
// All updates to the timestamp cache must be performed below the expiration
// time of the leaseholder. This ensures correctness if the lease expires
// and is acquired by a new replica that begins serving writes immediately
// to the same keys at the next lease's start time.
if exp := st.Expiration(); exp.LessEq(ts) {
log.Fatalf(ctx, "Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+
"cache after evaluating %v (resp=%v; err=%v) with lease expiration %v. The timestamp "+
"cache update could be lost of a non-cooperative lease change.", ts, ba, br, pErr, exp)
}
// All updates the to timestamp cache with non-synthetic timestamps must be
// performed at or below the current time. This is no longer strictly
// required for correctness as lease transfers now read the timestamp cache
// directly instead of using the local HLC clock as a proxy for its high
// water-mark, but it serves as a good proxy for proper handling of HLC
// clock updates and, by extension, observed timestamps.
if !ts.Synthetic && st.Now.ToTimestamp().Less(ts) {
log.Fatalf(ctx, "Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+
"cache after evaluating %v (resp=%v; err=%v) with local hlc clock at timestamp %s. "+
"Non-synthetic timestamps should always lag the local hlc clock.", ts, ba, br, pErr, st.Now)
}
r.store.tsCache.Add(start, end, ts, txnID)
}

// updateTimestampCache updates the timestamp cache in order to set a low water
// mark for the timestamp at which mutations to keys overlapping the provided
// request can write, such that they don't re-write history.
func (r *Replica) updateTimestampCache(
ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error,
ctx context.Context,
st *kvserverpb.LeaseStatus,
ba *roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
) {
if ba.ReadConsistency != roachpb.CONSISTENT {
// Inconsistent reads are excluded from the timestamp cache.
return
}
addToTSCache := r.store.tsCache.Add
if util.RaceEnabled {
addToTSCache = checkedTSCacheUpdate(r.store.Clock().Now(), r.store.tsCache, ba, br, pErr)
addToTSCache := func(start, end roachpb.Key, ts hlc.Timestamp, txnID uuid.UUID) {
r.addToTSCacheChecked(ctx, st, ba, br, pErr, start, end, ts, txnID)
}
// Update the timestamp cache using the timestamp at which the batch
// was executed. Note this may have moved forward from ba.Timestamp,
@@ -213,25 +247,6 @@ func (r *Replica) updateTimestampCache(
}
}

// checkedTSCacheUpdate wraps tscache.Cache and asserts that any update to the
// cache is at or below the specified time.
func checkedTSCacheUpdate(
now hlc.Timestamp,
tc tscache.Cache,
ba *roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
) func(roachpb.Key, roachpb.Key, hlc.Timestamp, uuid.UUID) {
return func(start, end roachpb.Key, ts hlc.Timestamp, txnID uuid.UUID) {
if now.Less(ts) && !ts.Synthetic {
panic(fmt.Sprintf("Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+
"cache after evaluating %v (resp=%v; err=%v) with local hlc clock at timestamp %s. "+
"The timestamp cache update could be lost on a lease transfer.", ts, ba, br, pErr, now))
}
tc.Add(start, end, ts, txnID)
}
}

// txnsPushedDueToClosedTimestamp is a telemetry counter for the number of
// batch requests which have been pushed due to the closed timestamp.
var batchesPushedDueToClosedTimestamp telemetry.Counter
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
@@ -114,13 +114,13 @@ func (r *Replica) executeWriteBatch(
// Checking the context just before proposing can help avoid ambiguous errors.
if err := ctx.Err(); err != nil {
log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary())
return nil, g, roachpb.NewError(errors.Wrap(err, "aborted before proposing"))
return nil, g, roachpb.NewError(errors.Wrapf(err, "aborted before proposing"))
}

// If the command is proposed to Raft, ownership of and responsibility for
// the concurrency guard will be assumed by Raft, so provide the guard to
// evalAndPropose.
ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, &st.Lease, localUncertaintyLimit)
ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, st, localUncertaintyLimit)
if pErr != nil {
if maxLeaseIndex != 0 {
log.Fatalf(

0 comments on commit 616ce3c

Please sign in to comment.