Skip to content

Commit

Permalink
kv: check timestamp cache updates against active lease
Browse files Browse the repository at this point in the history
This commit improves an existing (race-only) assertion to not only check
timestamp cache updates against the current HLC clock, but to also do so
against the lease that the request performing the update is evaluating
under. This ensures that timestamp cache updates are not lost during
non-cooperative lease change.
  • Loading branch information
nvanbenschoten committed Feb 12, 2021
1 parent 23263a3 commit 218a5a3
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 61 deletions.
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/kvserverpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ go_library(
embed = [":kvserverpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb",
visibility = ["//visibility:public"],
deps = ["//pkg/roachpb"],
deps = [
"//pkg/roachpb",
"//pkg/util/hlc",
],
)

proto_library(
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/kvserverpb/lease_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

package kvserverpb

import roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// IsValid returns whether the lease was valid at the time that the
// lease status was computed.
Expand All @@ -22,3 +25,15 @@ func (st LeaseStatus) IsValid() bool {
func (st LeaseStatus) OwnedBy(storeID roachpb.StoreID) bool {
return st.Lease.OwnedBy(storeID)
}

// Expiration returns the expiration of the lease.
func (st LeaseStatus) Expiration() hlc.Timestamp {
switch st.Lease.Type() {
case roachpb.LeaseExpiration:
return st.Lease.GetExpiration()
case roachpb.LeaseEpoch:
return st.Liveness.Expiration.ToTimestamp()
default:
panic("unexpected")
}
}
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (r *Replica) evalAndPropose(
ctx context.Context,
ba *roachpb.BatchRequest,
g *concurrency.Guard,
lease *roachpb.Lease,
st kvserverpb.LeaseStatus,
lul hlc.Timestamp,
) (chan proposalResult, func(), int64, *roachpb.Error) {
idKey := makeIDKey()
Expand All @@ -81,13 +81,13 @@ func (r *Replica) evalAndPropose(
// If the request hit a server-side concurrency retry error, immediately
// proagate the error. Don't assume ownership of the concurrency guard.
if isConcurrencyRetryError(pErr) {
pErr = maybeAttachLease(pErr, lease)
pErr = maybeAttachLease(pErr, &st.Lease)
return nil, nil, 0, pErr
}

// Attach the endCmds to the proposal and assume responsibility for
// releasing the concurrency guard if the proposal makes it to Raft.
proposal.ec = endCmds{repl: r, g: g}
proposal.ec = endCmds{repl: r, g: g, st: st}

// Pull out proposal channel to return. proposal.doneCh may be set to
// nil if it is signaled in this function.
Expand Down Expand Up @@ -143,7 +143,7 @@ func (r *Replica) evalAndPropose(
}

// Attach information about the proposer to the command.
proposal.command.ProposerLeaseSequence = lease.Sequence
proposal.command.ProposerLeaseSequence = st.Lease.Sequence

// Once a command is written to the raft log, it must be loaded into memory
// and replayed on all replicas. If a command is too big, stop it here. If
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (r *Replica) executeReadOnlyBatch(
}

// Otherwise, update the timestamp cache and release the concurrency guard.
ec, g := endCmds{repl: r, g: g}, nil
ec, g := endCmds{repl: r, g: g, st: st}, nil
ec.done(ctx, ba, br, pErr)

// Semi-synchronously process any intents that need resolving here in
Expand Down
12 changes: 8 additions & 4 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ func (r *Replica) collectSpans(
type endCmds struct {
repl *Replica
g *concurrency.Guard
st kvserverpb.LeaseStatus
}

// move moves the endCmds into the return value, clearing and making a call to
Expand All @@ -726,10 +727,13 @@ func (ec *endCmds) done(
}
defer ec.move() // clear

// Update the timestamp cache if the request is not being re-evaluated. Each
// request is considered in turn; only those marked as affecting the cache
// are processed.
ec.repl.updateTimestampCache(ctx, ba, br, pErr)
// Update the timestamp cache. Each request within the batch is considered
// in turn; only those marked as affecting the cache are processed. However,
// only do so if the request is consistent and was operating on the
// leaseholder under a valid range lease.
if ba.ReadConsistency == roachpb.CONSISTENT && ec.st.State == kvserverpb.LeaseState_VALID {
ec.repl.updateTimestampCache(ctx, &ec.st, ba, br, pErr)
}

// Release the latches acquired by the request and exit lock wait-queues.
// Must be done AFTER the timestamp cache is updated. ec.g is only set when
Expand Down
42 changes: 23 additions & 19 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,11 +612,12 @@ func TestReplicaContains(t *testing.T) {
}

func sendLeaseRequest(r *Replica, l *roachpb.Lease) error {
ctx := context.Background()
ba := roachpb.BatchRequest{}
ba.Timestamp = r.store.Clock().Now()
ba.Add(&roachpb.RequestLeaseRequest{Lease: *l})
exLease, _ := r.GetLease()
ch, _, _, pErr := r.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{})
st := r.CurrentLeaseStatus(ctx)
ch, _, _, pErr := r.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr == nil {
// Next if the command was committed, wait for the range to apply it.
// TODO(bdarnell): refactor this to a more conventional error-handling pattern.
Expand Down Expand Up @@ -1393,6 +1394,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())

Expand All @@ -1412,11 +1414,11 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) {
StoreID: 2,
},
}
exLease, _ := tc.repl.GetLease()
st := tc.repl.CurrentLeaseStatus(ctx)
ba := roachpb.BatchRequest{}
ba.Timestamp = tc.repl.store.Clock().Now()
ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease})
ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{})
ch, _, _, pErr := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr == nil {
// Next if the command was committed, wait for the range to apply it.
// TODO(bdarnell): refactor to a more conventional error-handling pattern.
Expand Down Expand Up @@ -7865,7 +7867,6 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) {
repl := tc.repl

tc.repl.mu.Lock()
lease := *repl.mu.state.Lease
abandoned := make(map[int64]struct{}) // protected by repl.mu
tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) {
if _, ok := abandoned[int64(p.command.MaxLeaseIndex)]; ok {
Expand All @@ -7886,7 +7887,8 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) {
Key: roachpb.Key(fmt.Sprintf("k%d", i)),
},
})
ch, _, idx, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &lease, hlc.Timestamp{})
st := repl.CurrentLeaseStatus(ctx)
ch, _, idx, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -7942,7 +7944,6 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
}
return false, nil
}
lease := *tc.repl.mu.state.Lease
tc.repl.mu.Unlock()

const num = 10
Expand All @@ -7956,7 +7957,8 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
Key: roachpb.Key(fmt.Sprintf("k%d", i)),
},
})
ch, _, idx, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), &lease, hlc.Timestamp{})
st := tc.repl.CurrentLeaseStatus(ctx)
ch, _, idx, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -9287,7 +9289,8 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
}

exLease, _ := repl.GetLease()
ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{})
st := kvserverpb.LeaseStatus{Lease: exLease, State: kvserverpb.LeaseState_VALID}
ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
Expand All @@ -9306,6 +9309,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) {
func TestProposeWithAsyncConsensus(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
tc := testContext{}
tsc := TestStoreConfig(nil)

Expand All @@ -9320,7 +9324,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) {
}

stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, tsc)
repl := tc.repl

Expand All @@ -9332,8 +9336,8 @@ func TestProposeWithAsyncConsensus(t *testing.T) {
ba.AsyncConsensus = true

atomic.StoreInt32(&filterActive, 1)
exLease, _ := repl.GetLease()
ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{})
st := tc.repl.CurrentLeaseStatus(ctx)
ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -9396,8 +9400,8 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) {
ba.Timestamp = tc.Clock().Now()

atomic.StoreInt32(&filterActive, 1)
exLease, _ := repl.GetLease()
_, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &exLease, hlc.Timestamp{})
st := repl.CurrentLeaseStatus(ctx)
_, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
Expand All @@ -9415,7 +9419,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) {
ba2.Timestamp = tc.Clock().Now()

var pErr *roachpb.Error
ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), &exLease, hlc.Timestamp{})
ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -12507,7 +12511,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) {
}
tc.StartWithStoreConfig(t, stopper, cfg)
key := roachpb.Key("a")
lease, _ := tc.repl.GetLease()
st := tc.repl.CurrentLeaseStatus(ctx)
txn := newTransaction("test", key, roachpb.NormalUserPriority, tc.Clock())
txnID = txn.ID
ba := roachpb.BatchRequest{
Expand All @@ -12529,7 +12533,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) {
tc.repl.RaftLock()
sp := cfg.AmbientCtx.Tracer.StartSpan("replica send", tracing.WithForceRealSpan())
tracedCtx := tracing.ContextWithSpan(ctx, sp)
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease, hlc.Timestamp{})
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
Expand Down Expand Up @@ -12599,7 +12603,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) {
cfg.AmbientCtx.Tracer = tracer
tc.StartWithStoreConfig(t, stopper, cfg)
key := roachpb.Key("a")
lease, _ := tc.repl.GetLease()
st := tc.repl.CurrentLeaseStatus(ctx)
txn := newTransaction("test", key, roachpb.NormalUserPriority, tc.Clock())
ba := roachpb.BatchRequest{
Header: roachpb.Header{
Expand All @@ -12621,7 +12625,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) {
// Go out of our way to enable recording so that expensive logging is enabled
// for this context.
sp.SetVerbose(true)
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease, hlc.Timestamp{})
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), st, hlc.Timestamp{})
if pErr != nil {
t.Fatal(pErr)
}
Expand Down
73 changes: 44 additions & 29 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ package kvserver

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand All @@ -38,19 +37,54 @@ func setTimestampCacheLowWaterMark(
}
}

// addToTSCacheChecked adds the specified timestamp to the timestamp cache
// covering the range of keys from start to end. Before doing so, the function
// performs a few assertions to check for proper use of the timestamp cache.
func (r *Replica) addToTSCacheChecked(
ctx context.Context,
st *kvserverpb.LeaseStatus,
ba *roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
start, end roachpb.Key,
ts hlc.Timestamp,
txnID uuid.UUID,
) {
// All updates to the timestamp cache must be performed below the expiration
// time of the leaseholder. This ensures correctness if the lease expires
// and is acquired by a new replica that begins serving writes immediately
// to the same keys at the next lease's start time.
if exp := st.Expiration(); exp.LessEq(ts) {
log.Fatalf(ctx, "Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+
"cache after evaluating %v (resp=%v; err=%v) with lease expiration %v. The timestamp "+
"cache update could be lost of a non-cooperative lease change.", ts, ba, br, pErr, exp)
}
// All updates the to timestamp cache with non-synthetic timestamps must be
// performed at or below the current time. This is no longer strictly
// required for correctness as lease transfers now read the timestamp cache
// directly instead of using the local HLC clock as a proxy for its high
// water-mark, but it serves as a good proxy for proper handling of HLC
// clock updates and, by extension, observed timestamps.
if !ts.Synthetic && st.Now.ToTimestamp().Less(ts) {
log.Fatalf(ctx, "Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+
"cache after evaluating %v (resp=%v; err=%v) with local hlc clock at timestamp %s. "+
"Non-synthetic timestamps should always lag the local hlc clock.", ts, ba, br, pErr, st.Now)
}
r.store.tsCache.Add(start, end, ts, txnID)
}

// updateTimestampCache updates the timestamp cache in order to set a low water
// mark for the timestamp at which mutations to keys overlapping the provided
// request can write, such that they don't re-write history.
func (r *Replica) updateTimestampCache(
ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error,
ctx context.Context,
st *kvserverpb.LeaseStatus,
ba *roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
) {
if ba.ReadConsistency != roachpb.CONSISTENT {
// Inconsistent reads are excluded from the timestamp cache.
return
}
addToTSCache := r.store.tsCache.Add
if util.RaceEnabled {
addToTSCache = checkedTSCacheUpdate(r.store.Clock().Now(), r.store.tsCache, ba, br, pErr)
addToTSCache := func(start, end roachpb.Key, ts hlc.Timestamp, txnID uuid.UUID) {
r.addToTSCacheChecked(ctx, st, ba, br, pErr, start, end, ts, txnID)
}
// Update the timestamp cache using the timestamp at which the batch
// was executed. Note this may have moved forward from ba.Timestamp,
Expand Down Expand Up @@ -213,25 +247,6 @@ func (r *Replica) updateTimestampCache(
}
}

// checkedTSCacheUpdate wraps tscache.Cache and asserts that any update to the
// cache is at or below the specified time.
func checkedTSCacheUpdate(
now hlc.Timestamp,
tc tscache.Cache,
ba *roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
) func(roachpb.Key, roachpb.Key, hlc.Timestamp, uuid.UUID) {
return func(start, end roachpb.Key, ts hlc.Timestamp, txnID uuid.UUID) {
if now.Less(ts) && !ts.Synthetic {
panic(fmt.Sprintf("Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+
"cache after evaluating %v (resp=%v; err=%v) with local hlc clock at timestamp %s. "+
"The timestamp cache update could be lost on a lease transfer.", ts, ba, br, pErr, now))
}
tc.Add(start, end, ts, txnID)
}
}

// txnsPushedDueToClosedTimestamp is a telemetry counter for the number of
// batch requests which have been pushed due to the closed timestamp.
var batchesPushedDueToClosedTimestamp telemetry.Counter
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ func (r *Replica) executeWriteBatch(
// Checking the context just before proposing can help avoid ambiguous errors.
if err := ctx.Err(); err != nil {
log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary())
return nil, g, roachpb.NewError(errors.Wrap(err, "aborted before proposing"))
return nil, g, roachpb.NewError(errors.Wrapf(err, "aborted before proposing"))
}

// If the command is proposed to Raft, ownership of and responsibility for
// the concurrency guard will be assumed by Raft, so provide the guard to
// evalAndPropose.
ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, &st.Lease, localUncertaintyLimit)
ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, st, localUncertaintyLimit)
if pErr != nil {
if maxLeaseIndex != 0 {
log.Fatalf(
Expand Down

0 comments on commit 218a5a3

Please sign in to comment.