Skip to content

Commit

Permalink
kv: introduce and plumb intent resolution "pending observations"
Browse files Browse the repository at this point in the history
This commit introduces a new `ClockWhilePending` field to intent
resolution requests. This field is an optional clock observation from
the intent's leaseholder node that was captured at some point before the
intent's transaction was pushed and found to be PENDING. The clock
observation is used to forward the intent's local timestamp during
intent resolution. If the clock observation was captured from a
different node than the node which evaluates the ResolveIntent request,
it will be ignored and the intent's local timestamp will not be changed.

The field is not yet read, but is written and will be read by a later
commit.
  • Loading branch information
nvanbenschoten committed May 9, 2022
1 parent f52ecd6 commit 186e9a8
Show file tree
Hide file tree
Showing 14 changed files with 199 additions and 41 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func ResolveIntent(
}

update := args.AsLockUpdate()
if update.ClockWhilePending.NodeID != cArgs.EvalCtx.NodeID() {
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
ok, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update)
if err != nil {
return result.Result{}, err
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func ResolveIntentRange(
}

update := args.AsLockUpdate()
if update.ClockWhilePending.NodeID != cArgs.EvalCtx.NodeID() {
// The observation was from the wrong node. Ignore.
update.ClockWhilePending = roachpb.ObservedTimestamp{}
}
numKeys, resumeSpan, err := storage.MVCCResolveWriteIntentRange(
ctx, readWriter, ms, update, h.MaxSpanRequestKeys)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type MockEvalCtx struct {
ClusterSettings *cluster.Settings
Desc *roachpb.RangeDescriptor
StoreID roachpb.StoreID
NodeID roachpb.NodeID
Clock *hlc.Clock
Stats enginepb.MVCCStats
QPS float64
Expand Down Expand Up @@ -208,7 +209,7 @@ func (m *mockEvalCtxImpl) GetConcurrencyManager() concurrency.Manager {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) NodeID() roachpb.NodeID {
panic("unimplemented")
return m.MockEvalCtx.NodeID
}
func (m *mockEvalCtxImpl) GetNodeLocality() roachpb.Locality {
panic("unimplemented")
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func NewManager(cfg Config) Manager {
},
lt: lt,
ltw: &lockTableWaiterImpl{
nodeDesc: cfg.NodeDesc,
st: cfg.Settings,
clock: cfg.Clock,
stopper: cfg.Stopper,
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,12 @@ func (c *cluster) PushTransaction(
func (c *cluster) ResolveIntent(
ctx context.Context, intent roachpb.LockUpdate, _ intentresolver.ResolveOptions,
) *roachpb.Error {
log.Eventf(ctx, "resolving intent %s for txn %s with %s status", intent.Key, intent.Txn.ID.Short(), intent.Status)
var obsStr string
if obs := intent.ClockWhilePending; obs != (roachpb.ObservedTimestamp{}) {
obsStr = fmt.Sprintf(" and clock observation {%d %v}", obs.NodeID, obs.Timestamp)
}
log.Eventf(ctx, "resolving intent %s for txn %s with %s status%s",
intent.Key, intent.Txn.ID.Short(), intent.Status, obsStr)
c.m.OnLockUpdated(ctx, &intent)
return nil
}
Expand Down
118 changes: 112 additions & 6 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ var LockTableDeadlockDetectionPushDelay = settings.RegisterDurationSetting(

// lockTableWaiterImpl is an implementation of lockTableWaiter.
type lockTableWaiterImpl struct {
st *cluster.Settings
clock *hlc.Clock
stopper *stop.Stopper
ir IntentResolver
lt lockTable
nodeDesc *roachpb.NodeDescriptor
st *cluster.Settings
clock *hlc.Clock
stopper *stop.Stopper
ir IntentResolver
lt lockTable

// When set, WriteIntentError are propagated instead of pushing
// conflicting transactions.
Expand Down Expand Up @@ -458,6 +459,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
// Construct the request header and determine which form of push to use.
h := w.pushHeader(req)
var pushType roachpb.PushTxnType
var beforePushObs roachpb.ObservedTimestamp
switch req.WaitPolicy {
case lock.WaitPolicy_Block:
// This wait policy signifies that the request wants to wait until the
Expand All @@ -469,6 +471,24 @@ func (w *lockTableWaiterImpl) pushLockTxn(
switch ws.guardAccess {
case spanset.SpanReadOnly:
pushType = roachpb.PUSH_TIMESTAMP
beforePushObs = roachpb.ObservedTimestamp{
NodeID: w.nodeDesc.NodeID,
Timestamp: w.clock.NowAsClockTimestamp(),
}
// TODO(nvanbenschoten): because information about the local_timestamp
// leading the MVCC timestamp of an intent is lost, we also need to push
// the intent up to the top of the transaction's local uncertainty limit
// on this node. This logic currently lives in pushHeader, but we could
// simplify it when removing synthetic timestamps and then move it out
// here.
//
// We could also explore adding a preserve_local_timestamp flag to
// MVCCValue that would explicitly storage the local timestamp even in
// cases where it would normally be omitted. This could be set during
// intent resolution when a push observation is provided. Or we could
// not persist this, but still preserve the local timestamp when the
// adjusting the intent, accepting that the intent would then no longer
// round-trip and would lose the local timestamp if rewritten later.
log.VEventf(ctx, 2, "pushing timestamp of txn %s above %s", ws.txn.ID.Short(), h.Timestamp)

case spanset.SpanReadWrite:
Expand Down Expand Up @@ -532,12 +552,98 @@ func (w *lockTableWaiterImpl) pushLockTxn(
// We always poison due to limitations of the API: not poisoning equals
// clearing the AbortSpan, and if our pushee transaction first got pushed
// for timestamp (by us), then (by someone else) aborted and poisoned, and
// then we run the below code, we're clearing the AbortSpan illegaly.
// then we run the below code, we're clearing the AbortSpan illegally.
// Furthermore, even if our pushType is not PUSH_ABORT, we may have ended up
// with the responsibility to abort the intents (for example if we find the
// transaction aborted). To do better here, we need per-intent information
// on whether we need to poison.
resolve := roachpb.MakeLockUpdate(pusheeTxn, roachpb.Span{Key: ws.key})
if pusheeTxn.Status == roachpb.PENDING {
// The pushee was still PENDING at the time that the push observed its
// transaction record. It is safe to use the clock observation we gathered
// before initiating the push during intent resolution, as we know that this
// observation must have been made before the pushee committed (implicitly
// or explicitly) and acknowledged its client, assuming it does commit at
// some point.
//
// This observation can be used to forward the local timestamp of the intent
// when intent resolution forwards its version timestamp. This is important,
// as it prevents the pusher, who has an even earlier observed timestamp
// from this node, from considering this intent to be uncertain after the
// resolution succeeds and the pusher returns to read.
//
// For example, consider a reader with a read timestamp of 10, a global
// uncertainty limit of 25, and a local uncertainty limit (thanks to an
// observed timestamp) of 15. The reader conflicts with an intent that has a
// version timestamp and local timestamp of 8. The reader observes the local
// clock at 16 before pushing and then succeeds in pushing the intent's
// holder txn to timestamp 11 (read_timestamp + 1). If the reader were to
// resolve the intent to timestamp 11 but leave its local timestamp at 8
// then the reader would consider the value "uncertain" upon re-evaluation.
// However, if the reader also updates the value's local timestamp to 16
// during intent resolution then it will not consider the value to be
// "uncertain".
//
// Unfortunately, this does not quite work as written, as the MVCC key
// encoding logic normalizes (as an optimization) keys with
// local timestamp >= mvcc timestamp
// to
// local timestamp == mvcc timestamp
// To work around this, the pusher must also push the mvcc timestamp of the
// intent above its own local uncertainty limit. In the example above, this
// would mean pushing the intent's holder txn to timestamp 16 as well. The
// logic that handles this is in pushHeader.
//
// Note that it would be incorrect to update the intent's local timestamp if
// the pushee was found to be committed (implicitly or explicitly), as the
// pushee may have already acknowledged its client by the time the clock
// observation was taken and the value should be considered uncertain. Doing
// so could allow the pusher to serve a stale read.
//
// For example, if we used the observation after the push found a committed
// pushee, we would be susceptible to a stale read that looks like:
// 1. txn1 writes intent on key k @ ts 10, on node N
// 2. txn1 commits @ ts 15, acks client
// 3. txn1's async intent resolution of key k stalls
// 4. txn2 begins after txn1 with read timestamp @ 11
// 5. txn2 collects observed timestamp @ 12 from node N
// 6. txn2 encounters intent on key k, observes clock @ ts 13, pushes, finds
// committed record, resolves intent with observation. Committed version
// now has mvcc timestamp @ 15 and local timestamp @ 13
// 7. txn2 reads @ 11 with local uncertainty limit @ 12, fails to observe
// key k's new version. Stale read!
//
// More subtly, it would also be incorrect to update the intent's local
// timestamp using an observation captured _after_ the push completed, even
// if it had found a PENDING record. This is because this ordering makes no
// guarantee that the clock observation is captured before the pushee
// commits and acknowledges its client. This could not lead to the pusher
// serving a stale read, but it could lead to other transactions serving
// stale reads.
//
// For example, if we captured the observation after the push completed, we
// would be susceptible to a stale read that looks like:
// 1. txn1 writes intent on key k @ ts 10, on node N
// 2. txn2 (concurrent with txn1, so no risk of stale read itself) encounters
// intent on key k, pushes, finds pending record and pushes to timestamp 14
// 3. txn1 commits @ ts 15, acks client
// 4. txn1's async intent resolution of key k stalls
// 5. txn3 begins after txn1 with read timestamp @ 11
// 6. txn3 collects observed timestamp @ 12 from node N
// 7. txn2 observes clock @ 13 _after_ push, resolves intent (still pending)
// with observation. Intent now has mvcc timestamp @ 14 and local
// timestamp @ 13
// 8. txn3 reads @ 11 with local uncertainty limit @ 12, fails to observe
// key k's intent so it does not resolve it to committed. Stale read!
//
// There is some inherent raciness here, because the lease may move between
// when we push and when the reader later read. In such cases, the reader's
// local uncertainty limit may exceed the intent's local timestamp during
// the subsequent read and it may need to push again. However, we expect to
// eventually succeed in reading, either after lease movement subsides or
// after the reader's read timestamp surpasses its global uncertainty limit.
resolve.ClockWhilePending = beforePushObs
}
opts := intentresolver.ResolveOptions{Poison: true}
return w.ir.ResolveIntent(ctx, resolve, opts)
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,12 @@ func setupLockTableWaiterTest() (
signal: make(chan struct{}, 1),
}
w := &lockTableWaiterImpl{
st: st,
clock: hlc.NewClock(manual.UnixNano, time.Nanosecond),
stopper: stop.NewStopper(),
ir: ir,
lt: &mockLockTable{},
nodeDesc: &roachpb.NodeDescriptor{NodeID: 1},
st: st,
clock: hlc.NewClock(manual.UnixNano, time.Nanosecond),
stopper: stop.NewStopper(),
ir: ir,
lt: &mockLockTable{},
}
return w, ir, guard, manual
}
Expand Down Expand Up @@ -367,6 +368,7 @@ func testWaitPush(t *testing.T, k waitKind, makeReq func() Request, expPushTS hl
require.Equal(t, keyA, intent.Key)
require.Equal(t, pusheeTxn.ID, intent.Txn.ID)
require.Equal(t, roachpb.ABORTED, intent.Status)
require.Zero(t, intent.ClockWhilePending)
g.state = waitingState{kind: doneWaiting}
g.notify()
return nil
Expand Down Expand Up @@ -550,6 +552,7 @@ func testErrorWaitPush(
require.Equal(t, keyA, intent.Key)
require.Equal(t, pusheeTxn.ID, intent.Txn.ID)
require.Equal(t, roachpb.ABORTED, intent.Status)
require.Zero(t, intent.ClockWhilePending)
g.state = waitingState{kind: doneWaiting}
g.notify()
return nil
Expand Down Expand Up @@ -736,6 +739,7 @@ func testWaitPushWithTimeout(t *testing.T, k waitKind, makeReq func() Request) {
require.Equal(t, keyA, intent.Key)
require.Equal(t, pusheeTxn.ID, intent.Txn.ID)
require.Equal(t, roachpb.ABORTED, intent.Status)
require.Zero(t, intent.ClockWhilePending)
g.state = waitingState{kind: doneWaiting}
g.notify()
return nil
Expand Down Expand Up @@ -856,6 +860,7 @@ func TestLockTableWaiterDeferredIntentResolverError(t *testing.T) {
require.Equal(t, keyA, intents[0].Key)
require.Equal(t, pusheeTxn.ID, intents[0].Txn.ID)
require.Equal(t, roachpb.ABORTED, intents[0].Status)
require.Zero(t, intents[0].ClockWhilePending)
return err1
}
err := w.WaitOn(ctx, req, g)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ debug-advance-clock ts=123
on-txn-updated txn=txn2 status=pending ts=18,1
----
[-] update txn: increasing timestamp of txn2
[2] sequence req5: resolving intent "k" for txn 00000002 with PENDING status
[2] sequence req5: resolving intent "k" for txn 00000002 with PENDING status and clock observation {1 246.000000000,0}
[2] sequence req5: lock wait-queue event: done waiting
[2] sequence req5: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 123.000s
[2] sequence req5: acquiring latches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ sequence req=req1
on-txn-updated txn=txn1 status=pending ts=15,2
----
[-] update txn: increasing timestamp of txn1
[3] sequence req1: resolving intent "k" for txn 00000001 with PENDING status
[3] sequence req1: resolving intent "k" for txn 00000001 with PENDING status and clock observation {1 123.000000000,1}
[3] sequence req1: lock wait-queue event: done waiting
[3] sequence req1: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[3] sequence req1: acquiring latches
Expand Down Expand Up @@ -113,7 +113,7 @@ sequence req=req1
on-txn-updated txn=txn1 status=pending ts=135,1
----
[-] update txn: increasing timestamp of txn1
[3] sequence req1: resolving intent "k" for txn 00000001 with PENDING status
[3] sequence req1: resolving intent "k" for txn 00000001 with PENDING status and clock observation {1 135.000000000,1}
[3] sequence req1: lock wait-queue event: done waiting
[3] sequence req1: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[3] sequence req1: acquiring latches
Expand Down Expand Up @@ -182,7 +182,7 @@ sequence req=req1
on-txn-updated txn=txn1 status=pending ts=150,2?
----
[-] update txn: increasing timestamp of txn1
[3] sequence req1: resolving intent "k" for txn 00000001 with PENDING status
[3] sequence req1: resolving intent "k" for txn 00000001 with PENDING status and clock observation {1 135.000000000,2}
[3] sequence req1: lock wait-queue event: done waiting
[3] sequence req1: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[3] sequence req1: acquiring latches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ local: num=0
on-txn-updated txn=txn1 status=pending ts=12,2
----
[-] update txn: increasing timestamp of txn1
[2] sequence req2: resolving intent "k" for txn 00000001 with PENDING status
[2] sequence req2: resolving intent "k" for txn 00000001 with PENDING status and clock observation {1 123.000000000,1}
[2] sequence req2: lock wait-queue event: done waiting
[2] sequence req2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[2] sequence req2: acquiring latches
Expand Down Expand Up @@ -191,7 +191,7 @@ local: num=0
on-txn-updated txn=txn1 status=pending ts=12,2
----
[-] update txn: increasing timestamp of txn1
[2] sequence req2: resolving intent "k" for txn 00000001 with PENDING status
[2] sequence req2: resolving intent "k" for txn 00000001 with PENDING status and clock observation {1 123.000000000,3}
[2] sequence req2: lock wait-queue event: done waiting
[2] sequence req2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[2] sequence req2: acquiring latches
Expand Down Expand Up @@ -321,7 +321,7 @@ local: num=0
on-txn-updated txn=txn1 status=pending ts=12,2
----
[-] update txn: increasing timestamp of txn1
[2] sequence req2: resolving intent "k" for txn 00000001 with PENDING status
[2] sequence req2: resolving intent "k" for txn 00000001 with PENDING status and clock observation {1 123.000000000,5}
[2] sequence req2: lock wait-queue event: done waiting
[2] sequence req2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[2] sequence req2: acquiring latches
Expand Down
24 changes: 13 additions & 11 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,21 +869,23 @@ func (ir *IntentResolver) ResolveIntents(
var batcher *requestbatcher.RequestBatcher
if len(intent.EndKey) == 0 {
req = &roachpb.ResolveIntentRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(intent.Span),
IntentTxn: intent.Txn,
Status: intent.Status,
Poison: opts.Poison,
IgnoredSeqNums: intent.IgnoredSeqNums,
RequestHeader: roachpb.RequestHeaderFromSpan(intent.Span),
IntentTxn: intent.Txn,
Status: intent.Status,
Poison: opts.Poison,
IgnoredSeqNums: intent.IgnoredSeqNums,
ClockWhilePending: intent.ClockWhilePending,
}
batcher = ir.irBatcher
} else {
req = &roachpb.ResolveIntentRangeRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(intent.Span),
IntentTxn: intent.Txn,
Status: intent.Status,
Poison: opts.Poison,
MinTimestamp: opts.MinTimestamp,
IgnoredSeqNums: intent.IgnoredSeqNums,
RequestHeader: roachpb.RequestHeaderFromSpan(intent.Span),
IntentTxn: intent.Txn,
Status: intent.Status,
Poison: opts.Poison,
MinTimestamp: opts.MinTimestamp,
IgnoredSeqNums: intent.IgnoredSeqNums,
ClockWhilePending: intent.ClockWhilePending,
}
batcher = ir.irRangeBatcher
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1627,21 +1627,23 @@ func (acrr *AdminChangeReplicasRequest) Changes() []ReplicationChange {
// intent request.
func (rir *ResolveIntentRequest) AsLockUpdate() LockUpdate {
return LockUpdate{
Span: rir.Span(),
Txn: rir.IntentTxn,
Status: rir.Status,
IgnoredSeqNums: rir.IgnoredSeqNums,
Span: rir.Span(),
Txn: rir.IntentTxn,
Status: rir.Status,
IgnoredSeqNums: rir.IgnoredSeqNums,
ClockWhilePending: rir.ClockWhilePending,
}
}

// AsLockUpdate creates a lock update message corresponding to the given resolve
// intent range request.
func (rirr *ResolveIntentRangeRequest) AsLockUpdate() LockUpdate {
return LockUpdate{
Span: rirr.Span(),
Txn: rirr.IntentTxn,
Status: rirr.Status,
IgnoredSeqNums: rirr.IgnoredSeqNums,
Span: rirr.Span(),
Txn: rirr.IntentTxn,
Status: rirr.Status,
IgnoredSeqNums: rirr.IgnoredSeqNums,
ClockWhilePending: rirr.ClockWhilePending,
}
}

Expand Down
Loading

0 comments on commit 186e9a8

Please sign in to comment.