From 218a5a38dd1d3d6e33e17258391da8e94061cf96 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 19 Jan 2021 14:52:47 -0500 Subject: [PATCH] kv: check timestamp cache updates against active lease This commit improves an existing (race-only) assertion to not only check timestamp cache updates against the current HLC clock, but to also do so against the lease that the request performing the update is evaluating under. This ensures that timestamp cache updates are not lost during non-cooperative lease change. --- pkg/kv/kvserver/kvserverpb/BUILD.bazel | 5 +- pkg/kv/kvserver/kvserverpb/lease_status.go | 17 ++++- pkg/kv/kvserver/replica_raft.go | 8 +-- pkg/kv/kvserver/replica_read.go | 2 +- pkg/kv/kvserver/replica_send.go | 12 ++-- pkg/kv/kvserver/replica_test.go | 42 +++++++------ pkg/kv/kvserver/replica_tscache.go | 73 +++++++++++++--------- pkg/kv/kvserver/replica_write.go | 4 +- 8 files changed, 102 insertions(+), 61 deletions(-) diff --git a/pkg/kv/kvserver/kvserverpb/BUILD.bazel b/pkg/kv/kvserver/kvserverpb/BUILD.bazel index b2871d40b64a..b767e4bfd4cb 100644 --- a/pkg/kv/kvserver/kvserverpb/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverpb/BUILD.bazel @@ -12,7 +12,10 @@ go_library( embed = [":kvserverpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb", visibility = ["//visibility:public"], - deps = ["//pkg/roachpb"], + deps = [ + "//pkg/roachpb", + "//pkg/util/hlc", + ], ) proto_library( diff --git a/pkg/kv/kvserver/kvserverpb/lease_status.go b/pkg/kv/kvserver/kvserverpb/lease_status.go index 1bbac36973ec..5734d5c4e386 100644 --- a/pkg/kv/kvserver/kvserverpb/lease_status.go +++ b/pkg/kv/kvserver/kvserverpb/lease_status.go @@ -10,7 +10,10 @@ package kvserverpb -import roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) // IsValid returns whether the lease was valid at the time that the // lease status was computed. @@ -22,3 +25,15 @@ func (st LeaseStatus) IsValid() bool { func (st LeaseStatus) OwnedBy(storeID roachpb.StoreID) bool { return st.Lease.OwnedBy(storeID) } + +// Expiration returns the expiration of the lease. +func (st LeaseStatus) Expiration() hlc.Timestamp { + switch st.Lease.Type() { + case roachpb.LeaseExpiration: + return st.Lease.GetExpiration() + case roachpb.LeaseEpoch: + return st.Liveness.Expiration.ToTimestamp() + default: + panic("unexpected") + } +} diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index a0c2f4a46035..09d5d86bb504 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -71,7 +71,7 @@ func (r *Replica) evalAndPropose( ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, - lease *roachpb.Lease, + st kvserverpb.LeaseStatus, lul hlc.Timestamp, ) (chan proposalResult, func(), int64, *roachpb.Error) { idKey := makeIDKey() @@ -81,13 +81,13 @@ func (r *Replica) evalAndPropose( // If the request hit a server-side concurrency retry error, immediately // proagate the error. Don't assume ownership of the concurrency guard. if isConcurrencyRetryError(pErr) { - pErr = maybeAttachLease(pErr, lease) + pErr = maybeAttachLease(pErr, &st.Lease) return nil, nil, 0, pErr } // Attach the endCmds to the proposal and assume responsibility for // releasing the concurrency guard if the proposal makes it to Raft. - proposal.ec = endCmds{repl: r, g: g} + proposal.ec = endCmds{repl: r, g: g, st: st} // Pull out proposal channel to return. proposal.doneCh may be set to // nil if it is signaled in this function. @@ -143,7 +143,7 @@ func (r *Replica) evalAndPropose( } // Attach information about the proposer to the command. - proposal.command.ProposerLeaseSequence = lease.Sequence + proposal.command.ProposerLeaseSequence = st.Lease.Sequence // Once a command is written to the raft log, it must be loaded into memory // and replayed on all replicas. If a command is too big, stop it here. If diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 533c35377136..5f614a4db004 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -92,7 +92,7 @@ func (r *Replica) executeReadOnlyBatch( } // Otherwise, update the timestamp cache and release the concurrency guard. - ec, g := endCmds{repl: r, g: g}, nil + ec, g := endCmds{repl: r, g: g, st: st}, nil ec.done(ctx, ba, br, pErr) // Semi-synchronously process any intents that need resolving here in diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 515909ae4b74..2df84c4e9a19 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -702,6 +702,7 @@ func (r *Replica) collectSpans( type endCmds struct { repl *Replica g *concurrency.Guard + st kvserverpb.LeaseStatus } // move moves the endCmds into the return value, clearing and making a call to @@ -726,10 +727,13 @@ func (ec *endCmds) done( } defer ec.move() // clear - // Update the timestamp cache if the request is not being re-evaluated. Each - // request is considered in turn; only those marked as affecting the cache - // are processed. - ec.repl.updateTimestampCache(ctx, ba, br, pErr) + // Update the timestamp cache. Each request within the batch is considered + // in turn; only those marked as affecting the cache are processed. However, + // only do so if the request is consistent and was operating on the + // leaseholder under a valid range lease. + if ba.ReadConsistency == roachpb.CONSISTENT && ec.st.State == kvserverpb.LeaseState_VALID { + ec.repl.updateTimestampCache(ctx, &ec.st, ba, br, pErr) + } // Release the latches acquired by the request and exit lock wait-queues. // Must be done AFTER the timestamp cache is updated. ec.g is only set when diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 1b058c1a47e2..d3c202d1aa3e 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -612,11 +612,12 @@ func TestReplicaContains(t *testing.T) { } func sendLeaseRequest(r *Replica, l *roachpb.Lease) error { + ctx := context.Background() ba := roachpb.BatchRequest{} ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *l}) - exLease, _ := r.GetLease() - ch, _, _, pErr := r.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{}) + st := r.CurrentLeaseStatus(ctx) + ch, _, _, pErr := r.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor this to a more conventional error-handling pattern. @@ -1393,6 +1394,7 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() stopper := stop.NewStopper() defer stopper.Stop(context.Background()) @@ -1412,11 +1414,11 @@ func TestReplicaLeaseRejectUnknownRaftNodeID(t *testing.T) { StoreID: 2, }, } - exLease, _ := tc.repl.GetLease() + st := tc.repl.CurrentLeaseStatus(ctx) ba := roachpb.BatchRequest{} ba.Timestamp = tc.repl.store.Clock().Now() ba.Add(&roachpb.RequestLeaseRequest{Lease: *lease}) - ch, _, _, pErr := tc.repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{}) + ch, _, _, pErr := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr == nil { // Next if the command was committed, wait for the range to apply it. // TODO(bdarnell): refactor to a more conventional error-handling pattern. @@ -7865,7 +7867,6 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { repl := tc.repl tc.repl.mu.Lock() - lease := *repl.mu.state.Lease abandoned := make(map[int64]struct{}) // protected by repl.mu tc.repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { if _, ok := abandoned[int64(p.command.MaxLeaseIndex)]; ok { @@ -7886,7 +7887,8 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, idx, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &lease, hlc.Timestamp{}) + st := repl.CurrentLeaseStatus(ctx) + ch, _, idx, err := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if err != nil { t.Fatal(err) } @@ -7942,7 +7944,6 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { } return false, nil } - lease := *tc.repl.mu.state.Lease tc.repl.mu.Unlock() const num = 10 @@ -7956,7 +7957,8 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { Key: roachpb.Key(fmt.Sprintf("k%d", i)), }, }) - ch, _, idx, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), &lease, hlc.Timestamp{}) + st := tc.repl.CurrentLeaseStatus(ctx) + ch, _, idx, err := tc.repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if err != nil { t.Fatal(err) } @@ -9287,7 +9289,8 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { } exLease, _ := repl.GetLease() - ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{}) + st := kvserverpb.LeaseStatus{Lease: exLease, State: kvserverpb.LeaseState_VALID} + ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } @@ -9306,6 +9309,7 @@ func TestErrorInRaftApplicationClearsIntents(t *testing.T) { func TestProposeWithAsyncConsensus(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + ctx := context.Background() tc := testContext{} tsc := TestStoreConfig(nil) @@ -9320,7 +9324,7 @@ func TestProposeWithAsyncConsensus(t *testing.T) { } stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) + defer stopper.Stop(ctx) tc.StartWithStoreConfig(t, stopper, tsc) repl := tc.repl @@ -9332,8 +9336,8 @@ func TestProposeWithAsyncConsensus(t *testing.T) { ba.AsyncConsensus = true atomic.StoreInt32(&filterActive, 1) - exLease, _ := repl.GetLease() - ch, _, _, pErr := repl.evalAndPropose(context.Background(), &ba, allSpansGuard(), &exLease, hlc.Timestamp{}) + st := tc.repl.CurrentLeaseStatus(ctx) + ch, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } @@ -9396,8 +9400,8 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { ba.Timestamp = tc.Clock().Now() atomic.StoreInt32(&filterActive, 1) - exLease, _ := repl.GetLease() - _, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), &exLease, hlc.Timestamp{}) + st := repl.CurrentLeaseStatus(ctx) + _, _, _, pErr := repl.evalAndPropose(ctx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } @@ -9415,7 +9419,7 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { ba2.Timestamp = tc.Clock().Now() var pErr *roachpb.Error - ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), &exLease, hlc.Timestamp{}) + ch, _, _, pErr = repl.evalAndPropose(ctx, &ba2, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } @@ -12507,7 +12511,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) { } tc.StartWithStoreConfig(t, stopper, cfg) key := roachpb.Key("a") - lease, _ := tc.repl.GetLease() + st := tc.repl.CurrentLeaseStatus(ctx) txn := newTransaction("test", key, roachpb.NormalUserPriority, tc.Clock()) txnID = txn.ID ba := roachpb.BatchRequest{ @@ -12529,7 +12533,7 @@ func TestProposalNotAcknowledgedOrReproposedAfterApplication(t *testing.T) { tc.repl.RaftLock() sp := cfg.AmbientCtx.Tracer.StartSpan("replica send", tracing.WithForceRealSpan()) tracedCtx := tracing.ContextWithSpan(ctx, sp) - ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease, hlc.Timestamp{}) + ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } @@ -12599,7 +12603,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) { cfg.AmbientCtx.Tracer = tracer tc.StartWithStoreConfig(t, stopper, cfg) key := roachpb.Key("a") - lease, _ := tc.repl.GetLease() + st := tc.repl.CurrentLeaseStatus(ctx) txn := newTransaction("test", key, roachpb.NormalUserPriority, tc.Clock()) ba := roachpb.BatchRequest{ Header: roachpb.Header{ @@ -12621,7 +12625,7 @@ func TestLaterReproposalsDoNotReuseContext(t *testing.T) { // Go out of our way to enable recording so that expensive logging is enabled // for this context. sp.SetVerbose(true) - ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), &lease, hlc.Timestamp{}) + ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, &ba, allSpansGuard(), st, hlc.Timestamp{}) if pErr != nil { t.Fatal(pErr) } diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 2b3827ef4790..56b2dcdc876d 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -12,14 +12,13 @@ package kvserver import ( "context" - "fmt" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -38,19 +37,54 @@ func setTimestampCacheLowWaterMark( } } +// addToTSCacheChecked adds the specified timestamp to the timestamp cache +// covering the range of keys from start to end. Before doing so, the function +// performs a few assertions to check for proper use of the timestamp cache. +func (r *Replica) addToTSCacheChecked( + ctx context.Context, + st *kvserverpb.LeaseStatus, + ba *roachpb.BatchRequest, + br *roachpb.BatchResponse, + pErr *roachpb.Error, + start, end roachpb.Key, + ts hlc.Timestamp, + txnID uuid.UUID, +) { + // All updates to the timestamp cache must be performed below the expiration + // time of the leaseholder. This ensures correctness if the lease expires + // and is acquired by a new replica that begins serving writes immediately + // to the same keys at the next lease's start time. + if exp := st.Expiration(); exp.LessEq(ts) { + log.Fatalf(ctx, "Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+ + "cache after evaluating %v (resp=%v; err=%v) with lease expiration %v. The timestamp "+ + "cache update could be lost of a non-cooperative lease change.", ts, ba, br, pErr, exp) + } + // All updates the to timestamp cache with non-synthetic timestamps must be + // performed at or below the current time. This is no longer strictly + // required for correctness as lease transfers now read the timestamp cache + // directly instead of using the local HLC clock as a proxy for its high + // water-mark, but it serves as a good proxy for proper handling of HLC + // clock updates and, by extension, observed timestamps. + if !ts.Synthetic && st.Now.ToTimestamp().Less(ts) { + log.Fatalf(ctx, "Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+ + "cache after evaluating %v (resp=%v; err=%v) with local hlc clock at timestamp %s. "+ + "Non-synthetic timestamps should always lag the local hlc clock.", ts, ba, br, pErr, st.Now) + } + r.store.tsCache.Add(start, end, ts, txnID) +} + // updateTimestampCache updates the timestamp cache in order to set a low water // mark for the timestamp at which mutations to keys overlapping the provided // request can write, such that they don't re-write history. func (r *Replica) updateTimestampCache( - ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error, + ctx context.Context, + st *kvserverpb.LeaseStatus, + ba *roachpb.BatchRequest, + br *roachpb.BatchResponse, + pErr *roachpb.Error, ) { - if ba.ReadConsistency != roachpb.CONSISTENT { - // Inconsistent reads are excluded from the timestamp cache. - return - } - addToTSCache := r.store.tsCache.Add - if util.RaceEnabled { - addToTSCache = checkedTSCacheUpdate(r.store.Clock().Now(), r.store.tsCache, ba, br, pErr) + addToTSCache := func(start, end roachpb.Key, ts hlc.Timestamp, txnID uuid.UUID) { + r.addToTSCacheChecked(ctx, st, ba, br, pErr, start, end, ts, txnID) } // Update the timestamp cache using the timestamp at which the batch // was executed. Note this may have moved forward from ba.Timestamp, @@ -213,25 +247,6 @@ func (r *Replica) updateTimestampCache( } } -// checkedTSCacheUpdate wraps tscache.Cache and asserts that any update to the -// cache is at or below the specified time. -func checkedTSCacheUpdate( - now hlc.Timestamp, - tc tscache.Cache, - ba *roachpb.BatchRequest, - br *roachpb.BatchResponse, - pErr *roachpb.Error, -) func(roachpb.Key, roachpb.Key, hlc.Timestamp, uuid.UUID) { - return func(start, end roachpb.Key, ts hlc.Timestamp, txnID uuid.UUID) { - if now.Less(ts) && !ts.Synthetic { - panic(fmt.Sprintf("Unsafe timestamp cache update! Cannot add timestamp %s to timestamp "+ - "cache after evaluating %v (resp=%v; err=%v) with local hlc clock at timestamp %s. "+ - "The timestamp cache update could be lost on a lease transfer.", ts, ba, br, pErr, now)) - } - tc.Add(start, end, ts, txnID) - } -} - // txnsPushedDueToClosedTimestamp is a telemetry counter for the number of // batch requests which have been pushed due to the closed timestamp. var batchesPushedDueToClosedTimestamp telemetry.Counter diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 9609a2a74fb4..b02db0ae3af0 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -114,13 +114,13 @@ func (r *Replica) executeWriteBatch( // Checking the context just before proposing can help avoid ambiguous errors. if err := ctx.Err(); err != nil { log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary()) - return nil, g, roachpb.NewError(errors.Wrap(err, "aborted before proposing")) + return nil, g, roachpb.NewError(errors.Wrapf(err, "aborted before proposing")) } // If the command is proposed to Raft, ownership of and responsibility for // the concurrency guard will be assumed by Raft, so provide the guard to // evalAndPropose. - ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, &st.Lease, localUncertaintyLimit) + ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, ba, g, st, localUncertaintyLimit) if pErr != nil { if maxLeaseIndex != 0 { log.Fatalf(