Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/storage: introduce local timestamps for MVCC versions in MVCCValue #80706

Merged
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,9 @@ func (tcf *TxnCoordSenderFactory) TestingSetLinearizable(linearizable bool) {
func (tcf *TxnCoordSenderFactory) TestingSetMetrics(metrics TxnMetrics) {
tcf.metrics = metrics
}

// TestingSetCommitWaitFilter allows tests to instrument the beginning of a
// transaction commit wait sleep.
func (tcf *TxnCoordSenderFactory) TestingSetCommitWaitFilter(filter func()) {
tcf.testingKnobs.CommitWaitFilter = filter
}
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type ClientTestingKnobs struct {
// DisableCommitSanityCheck allows "setting" the DisableCommitSanityCheck to
// true without actually overriding the variable.
DisableCommitSanityCheck bool

// CommitWaitFilter allows tests to instrument the beginning of a transaction
// commit wait sleep.
CommitWaitFilter func()
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
19 changes: 11 additions & 8 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,8 @@ func (tc *TxnCoordSender) Send(
// clock to read below that future-time value, violating "monotonic reads".
//
// In practice, most transactions do not need to wait at all, because their
// commit timestamps were pulled from an HLC clock (i.e. are not synthetic)
// commit timestamps were pulled from an HLC clock (either the local clock
// or a remote clock on a node whom the local node has communicated with)
// and so they will be guaranteed to lead the local HLC's clock, assuming
// proper HLC time propagation. Only transactions whose commit timestamps
// were pushed into the future will need to wait, like those who wrote to a
Expand Down Expand Up @@ -640,18 +641,20 @@ func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) er
commitTS := tc.mu.txn.WriteTimestamp
readOnly := tc.mu.txn.Sequence == 0
linearizable := tc.linearizable
needWait := commitTS.Synthetic || (linearizable && !readOnly)
if !needWait {
// No need to wait. If !Synthetic then we know the commit timestamp
// leads the local HLC clock, and since that's all we'd need to wait
// for, we can short-circuit.
return nil
}

waitUntil := commitTS
if linearizable && !readOnly {
waitUntil = waitUntil.Add(tc.clock.MaxOffset().Nanoseconds(), 0)
}
if waitUntil.LessEq(tc.clock.Now()) {
// No wait fast-path. This is the common case for most transactions. Only
// transactions who have their commit timestamp bumped into the future will
// need to wait.
return nil
}
if fn := tc.testingKnobs.CommitWaitFilter; fn != nil {
fn()
}

before := tc.clock.PhysicalTime()
est := waitUntil.GoTime().Sub(before)
Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,8 +1349,12 @@ func TestTxnCommitWait(t *testing.T) {
//
testFn := func(t *testing.T, linearizable, commit, readOnly, futureTime, deferred bool) {
s, metrics, cleanupFn := setupMetricsTest(t)
s.DB.GetFactory().(*kvcoord.TxnCoordSenderFactory).TestingSetLinearizable(linearizable)
defer cleanupFn()
s.DB.GetFactory().(*kvcoord.TxnCoordSenderFactory).TestingSetLinearizable(linearizable)
commitWaitC := make(chan struct{})
s.DB.GetFactory().(*kvcoord.TxnCoordSenderFactory).TestingSetCommitWaitFilter(func() {
close(commitWaitC)
})

// maxClockOffset defines the maximum clock offset between nodes in the
// cluster. When in linearizable mode, all writing transactions must
Expand All @@ -1361,7 +1365,7 @@ func TestTxnCommitWait(t *testing.T) {
// gateway they use. This ensures that all causally dependent
// transactions commit with higher timestamps, even if their read and
// writes sets do not conflict with the original transaction's. This, in
// turn, prevents the "causal reverse" anamoly which can be observed by
// turn, prevents the "causal reverse" anomaly which can be observed by
// a third, concurrent transaction. See the following blog post for
// more: https://www.cockroachlabs.com/blog/consistency-model/.
maxClockOffset := s.Clock.MaxOffset()
Expand Down Expand Up @@ -1468,8 +1472,10 @@ func TestTxnCommitWait(t *testing.T) {

// Advance the manual clock slowly. If the commit-wait sleep completes
// too early, we'll catch it with the require.Empty. If it completes too
// late, we'll stall when pulling from the channel.
// late, we'll stall when pulling from the channel. Before doing so, wait
// until the transaction has begun its commit-wait.
for expWait > 0 {
<-commitWaitC
require.Empty(t, errC)

adv := futureOffset / 5
Expand Down Expand Up @@ -2297,6 +2303,7 @@ func TestTxnRequestTxnTimestamp(t *testing.T) {
return err
}
}
manual.Set(txn.ProvisionalCommitTimestamp().WallTime)
return nil
}); err != nil {
t.Fatal(err)
Expand Down
11 changes: 5 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,11 @@ func RunCommitTrigger(
// side-effects beyond those of the intents that it has written.

// The transaction should not have a commit timestamp in the future of present
// time, even it its commit timestamp is synthetic. Such cases should be
// caught in maybeCommitWaitBeforeCommitTrigger before getting here, which
// should sleep for long enough to ensure that the local clock leads the
// commit timestamp. An error here may indicate that the transaction's commit
// timestamp was bumped after it acquired latches.
if txn.WriteTimestamp.Synthetic && rec.Clock().Now().Less(txn.WriteTimestamp) {
// time. Such cases should be caught in maybeCommitWaitBeforeCommitTrigger
// before getting here, which should sleep for long enough to ensure that the
// local clock leads the commit timestamp. An error here may indicate that the
// transaction's commit timestamp was bumped after it acquired latches.
if rec.Clock().Now().Less(txn.WriteTimestamp) {
return result.Result{}, errors.AssertionFailedf("txn %s with %s commit trigger needs "+
"commit wait. Was its timestamp bumped after acquiring latches?", txn, ct.Kind())
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func ResolveIntent(
}

update := args.AsLockUpdate()
if update.ClockWhilePending.NodeID != cArgs.EvalCtx.NodeID() {
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update)
if err != nil {
return result.Result{}, err
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func ResolveIntentRange(
}

update := args.AsLockUpdate()
if update.ClockWhilePending.NodeID != cArgs.EvalCtx.NodeID() {
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
numKeys, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, h.MaxSpanRequestKeys)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type MockEvalCtx struct {
ClusterSettings *cluster.Settings
Desc *roachpb.RangeDescriptor
StoreID roachpb.StoreID
NodeID roachpb.NodeID
Clock *hlc.Clock
Stats enginepb.MVCCStats
QPS float64
Expand Down Expand Up @@ -208,7 +209,7 @@ func (m *mockEvalCtxImpl) GetConcurrencyManager() concurrency.Manager {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) NodeID() roachpb.NodeID {
panic("unimplemented")
return m.MockEvalCtx.NodeID
}
func (m *mockEvalCtxImpl) GetNodeLocality() roachpb.Locality {
panic("unimplemented")
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func NewManager(cfg Config) Manager {
},
lt: lt,
ltw: &lockTableWaiterImpl{
nodeDesc: cfg.NodeDesc,
st: cfg.Settings,
clock: cfg.Clock,
stopper: cfg.Stopper,
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,12 @@ func (c *cluster) PushTransaction(
func (c *cluster) ResolveIntent(
ctx context.Context, intent roachpb.LockUpdate, _ intentresolver.ResolveOptions,
) *roachpb.Error {
log.Eventf(ctx, "resolving intent %s for txn %s with %s status", intent.Key, intent.Txn.ID.Short(), intent.Status)
var obsStr string
if obs := intent.ClockWhilePending; obs != (roachpb.ObservedTimestamp{}) {
obsStr = fmt.Sprintf(" and clock observation {%d %v}", obs.NodeID, obs.Timestamp)
}
log.Eventf(ctx, "resolving intent %s for txn %s with %s status%s",
intent.Key, intent.Txn.ID.Short(), intent.Status, obsStr)
c.m.OnLockUpdated(ctx, &intent)
return nil
}
Expand Down
118 changes: 112 additions & 6 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ var LockTableDeadlockDetectionPushDelay = settings.RegisterDurationSetting(

// lockTableWaiterImpl is an implementation of lockTableWaiter.
type lockTableWaiterImpl struct {
st *cluster.Settings
clock *hlc.Clock
stopper *stop.Stopper
ir IntentResolver
lt lockTable
nodeDesc *roachpb.NodeDescriptor
st *cluster.Settings
clock *hlc.Clock
stopper *stop.Stopper
ir IntentResolver
lt lockTable

// When set, WriteIntentError are propagated instead of pushing
// conflicting transactions.
Expand Down Expand Up @@ -458,6 +459,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
// Construct the request header and determine which form of push to use.
h := w.pushHeader(req)
var pushType roachpb.PushTxnType
var beforePushObs roachpb.ObservedTimestamp
switch req.WaitPolicy {
case lock.WaitPolicy_Block:
// This wait policy signifies that the request wants to wait until the
Expand All @@ -469,6 +471,24 @@ func (w *lockTableWaiterImpl) pushLockTxn(
switch ws.guardAccess {
case spanset.SpanReadOnly:
pushType = roachpb.PUSH_TIMESTAMP
beforePushObs = roachpb.ObservedTimestamp{
NodeID: w.nodeDesc.NodeID,
Timestamp: w.clock.NowAsClockTimestamp(),
}
// TODO(nvanbenschoten): because information about the local_timestamp
// leading the MVCC timestamp of an intent is lost, we also need to push
// the intent up to the top of the transaction's local uncertainty limit
// on this node. This logic currently lives in pushHeader, but we could
// simplify it when removing synthetic timestamps and then move it out
// here.
//
// We could also explore adding a preserve_local_timestamp flag to
// MVCCValue that would explicitly storage the local timestamp even in
// cases where it would normally be omitted. This could be set during
// intent resolution when a push observation is provided. Or we could
// not persist this, but still preserve the local timestamp when the
// adjusting the intent, accepting that the intent would then no longer
// round-trip and would lose the local timestamp if rewritten later.
log.VEventf(ctx, 2, "pushing timestamp of txn %s above %s", ws.txn.ID.Short(), h.Timestamp)

case spanset.SpanReadWrite:
Expand Down Expand Up @@ -532,12 +552,98 @@ func (w *lockTableWaiterImpl) pushLockTxn(
// We always poison due to limitations of the API: not poisoning equals
// clearing the AbortSpan, and if our pushee transaction first got pushed
// for timestamp (by us), then (by someone else) aborted and poisoned, and
// then we run the below code, we're clearing the AbortSpan illegaly.
// then we run the below code, we're clearing the AbortSpan illegally.
// Furthermore, even if our pushType is not PUSH_ABORT, we may have ended up
// with the responsibility to abort the intents (for example if we find the
// transaction aborted). To do better here, we need per-intent information
// on whether we need to poison.
resolve := roachpb.MakeLockUpdate(pusheeTxn, roachpb.Span{Key: ws.key})
if pusheeTxn.Status == roachpb.PENDING {
// The pushee was still PENDING at the time that the push observed its
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvanbenschoten: To confirm that I understand the idea here, we're saying "we want to move the local timestamp of an intent as close as possible to its mvcc commit timestamp". Is that fair to say?

In other words, with this commit we're leveraging the fact that before every Push, we have the opportunity to look at the intent leaseholder's local clock and update the intent's local timestamp if its txn is found to be PENDING (in order to move the intent out of the pusher's uncertainty window, since the intent's txn could not have causally preceded the pusher).

If the above sounds good to you, what I don't understand is what would happen if we didn't have this "optimization". I recall from our meeting that you'd alluded to this being more than just an optimization. Without this optimization, a reader might redundantly block on a txn that commits way later and doesn't causally precede the reader, yes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aayushshah15 and I talked about this in person and we're on the same page now. Summarizing the discussion below.

In other words, with this commit we're leveraging the fact that before every Push, we have the opportunity to look at the intent leaseholder's local clock and update the intent's local timestamp if its txn is found to be PENDING (in order to move the intent out of the pusher's uncertainty window, since the intent's txn could not have causally preceded the pusher).

Yes, this is correct.

I recall from our meeting that you'd alluded to this being more than just an optimization. Without this optimization, a reader might redundantly block on a txn that commits way later and doesn't causally precede the reader, yes?

This is also mostly correct. Without this, a high-priority pusher that pushes the timestamp of another transaction would still see the pushee's intent as uncertain when it returned to read because the intent would retain its local timestamp. It would then ratchet its read timestamp to that of the pushee and end up in the same situation when it attempted to read again. This would continue indefinitely. In effect, this would allow a high-priority reader to block on a lower-priotity writer — a form of priority inversion.

// transaction record. It is safe to use the clock observation we gathered
// before initiating the push during intent resolution, as we know that this
// observation must have been made before the pushee committed (implicitly
// or explicitly) and acknowledged its client, assuming it does commit at
// some point.
//
// This observation can be used to forward the local timestamp of the intent
// when intent resolution forwards its version timestamp. This is important,
// as it prevents the pusher, who has an even earlier observed timestamp
// from this node, from considering this intent to be uncertain after the
// resolution succeeds and the pusher returns to read.
//
// For example, consider a reader with a read timestamp of 10, a global
// uncertainty limit of 25, and a local uncertainty limit (thanks to an
// observed timestamp) of 15. The reader conflicts with an intent that has a
// version timestamp and local timestamp of 8. The reader observes the local
// clock at 16 before pushing and then succeeds in pushing the intent's
// holder txn to timestamp 11 (read_timestamp + 1). If the reader were to
// resolve the intent to timestamp 11 but leave its local timestamp at 8
// then the reader would consider the value "uncertain" upon re-evaluation.
// However, if the reader also updates the value's local timestamp to 16
// during intent resolution then it will not consider the value to be
// "uncertain".
//
// Unfortunately, this does not quite work as written, as the MVCC key
// encoding logic normalizes (as an optimization) keys with
// local timestamp >= mvcc timestamp
// to
// local timestamp == mvcc timestamp
// To work around this, the pusher must also push the mvcc timestamp of the
// intent above its own local uncertainty limit. In the example above, this
// would mean pushing the intent's holder txn to timestamp 16 as well. The
// logic that handles this is in pushHeader.
//
// Note that it would be incorrect to update the intent's local timestamp if
// the pushee was found to be committed (implicitly or explicitly), as the
// pushee may have already acknowledged its client by the time the clock
// observation was taken and the value should be considered uncertain. Doing
// so could allow the pusher to serve a stale read.
//
// For example, if we used the observation after the push found a committed
// pushee, we would be susceptible to a stale read that looks like:
// 1. txn1 writes intent on key k @ ts 10, on node N
// 2. txn1 commits @ ts 15, acks client
// 3. txn1's async intent resolution of key k stalls
// 4. txn2 begins after txn1 with read timestamp @ 11
// 5. txn2 collects observed timestamp @ 12 from node N
// 6. txn2 encounters intent on key k, observes clock @ ts 13, pushes, finds
// committed record, resolves intent with observation. Committed version
// now has mvcc timestamp @ 15 and local timestamp @ 13
// 7. txn2 reads @ 11 with local uncertainty limit @ 12, fails to observe
// key k's new version. Stale read!
//
// More subtly, it would also be incorrect to update the intent's local
// timestamp using an observation captured _after_ the push completed, even
// if it had found a PENDING record. This is because this ordering makes no
// guarantee that the clock observation is captured before the pushee
// commits and acknowledges its client. This could not lead to the pusher
// serving a stale read, but it could lead to other transactions serving
// stale reads.
//
// For example, if we captured the observation after the push completed, we
// would be susceptible to a stale read that looks like:
// 1. txn1 writes intent on key k @ ts 10, on node N
// 2. txn2 (concurrent with txn1, so no risk of stale read itself) encounters
// intent on key k, pushes, finds pending record and pushes to timestamp 14
// 3. txn1 commits @ ts 15, acks client
// 4. txn1's async intent resolution of key k stalls
// 5. txn3 begins after txn1 with read timestamp @ 11
// 6. txn3 collects observed timestamp @ 12 from node N
// 7. txn2 observes clock @ 13 _after_ push, resolves intent (still pending)
// with observation. Intent now has mvcc timestamp @ 14 and local
// timestamp @ 13
// 8. txn3 reads @ 11 with local uncertainty limit @ 12, fails to observe
// key k's intent so it does not resolve it to committed. Stale read!
//
// There is some inherent raciness here, because the lease may move between
// when we push and when the reader later read. In such cases, the reader's
// local uncertainty limit may exceed the intent's local timestamp during
// the subsequent read and it may need to push again. However, we expect to
// eventually succeed in reading, either after lease movement subsides or
// after the reader's read timestamp surpasses its global uncertainty limit.
resolve.ClockWhilePending = beforePushObs
}
opts := intentresolver.ResolveOptions{Poison: true}
return w.ir.ResolveIntent(ctx, resolve, opts)
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,12 @@ func setupLockTableWaiterTest() (
signal: make(chan struct{}, 1),
}
w := &lockTableWaiterImpl{
st: st,
clock: hlc.NewClock(manual.UnixNano, time.Nanosecond),
stopper: stop.NewStopper(),
ir: ir,
lt: &mockLockTable{},
nodeDesc: &roachpb.NodeDescriptor{NodeID: 1},
st: st,
clock: hlc.NewClock(manual.UnixNano, time.Nanosecond),
stopper: stop.NewStopper(),
ir: ir,
lt: &mockLockTable{},
}
return w, ir, guard, manual
}
Expand Down Expand Up @@ -367,6 +368,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl
require.Equal(t, keyA, intent.Key)
require.Equal(t, pusheeTxn.ID, intent.Txn.ID)
require.Equal(t, roachpb.ABORTED, intent.Status)
require.Zero(t, intent.ClockWhilePending)
g.state = waitingState{kind: doneWaiting}
g.notify()
return nil
Expand Down Expand Up @@ -550,6 +552,7 @@ func testErrorWaitPush(
require.Equal(t, keyA, intent.Key)
require.Equal(t, pusheeTxn.ID, intent.Txn.ID)
require.Equal(t, roachpb.ABORTED, intent.Status)
require.Zero(t, intent.ClockWhilePending)
g.state = waitingState{kind: doneWaiting}
g.notify()
return nil
Expand Down Expand Up @@ -736,6 +739,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {
require.Equal(t, keyA, intent.Key)
require.Equal(t, pusheeTxn.ID, intent.Txn.ID)
require.Equal(t, roachpb.ABORTED, intent.Status)
require.Zero(t, intent.ClockWhilePending)
g.state = waitingState{kind: doneWaiting}
g.notify()
return nil
Expand Down Expand Up @@ -856,6 +860,7 @@ func TestLockTableWaiterDeferredIntentResolverError(t *testing.T) {
require.Equal(t, keyA, intents[0].Key)
require.Equal(t, pusheeTxn.ID, intents[0].Txn.ID)
require.Equal(t, roachpb.ABORTED, intents[0].Status)
require.Zero(t, intents[0].ClockWhilePending)
return err1
}
err := w.WaitOn(ctx, req, g)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ debug-advance-clock ts=123
on-txn-updated txn=txn2 status=pending ts=18,1
----
[-] update txn: increasing timestamp of txn2
[2] sequence req5: resolving intent "k" for txn 00000002 with PENDING status
[2] sequence req5: resolving intent "k" for txn 00000002 with PENDING status and clock observation {1 246.000000000,0}
[2] sequence req5: lock wait-queue event: done waiting
[2] sequence req5: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 123.000s
[2] sequence req5: acquiring latches
Expand Down
Loading