-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathcmd_query_intent.go
169 lines (157 loc) · 6.39 KB
/
cmd_query_intent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package batcheval
import (
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
func init() {
RegisterReadOnlyCommand(kvpb.QueryIntent, declareKeysQueryIntent, QueryIntent)
}
func declareKeysQueryIntent(
rs ImmutableRangeState,
_ *kvpb.Header,
req kvpb.Request,
latchSpans *spanset.SpanSet,
_ *lockspanset.LockSpanSet,
_ time.Duration,
) error {
// QueryIntent requests acquire a non-MVCC latch in order to read the queried
// lock, if one exists, regardless of the time it was written at. This
// isolates them from in-flight intent writes and exclusive lock acquisitions.
latchSpans.AddNonMVCC(spanset.SpanReadOnly, req.Header().Span())
// They also acquire a read latch on the per-transaction local key that all
// replicated shared lock acquisitions acquire latches on to isolate them
// To isolate themselves from any in-flight shared locking requests that they
// TODO(arul): add a test.
//
// TODO(XXX): Do we really need this? We're saying that we don't need to
// prevent replicated locks from landing after the query intent request
// because pipelined replicated locks can never be part of a batch that's
// being committed in parallel. As such, there can't be any in-flight requests
// (that haven't evaluated yet) that acquire replicated locks and pipeline
// them. Does that mean this latching isn't required?
txnID := req.(*kvpb.QueryIntentRequest).Txn.ID
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: keys.ReplicatedSharedLocksTransactionLatchingKey(rs.GetRangeID(), txnID),
})
return nil
}
// QueryIntent checks if an intent exists for the specified transaction at the
// given key. If the intent is missing, the request prevents the intent from
// ever being written at the specified timestamp (but the actual prevention
// happens during the timestamp cache update).
//
// QueryIntent returns an error if the intent is missing and its ErrorIfMissing
// field is set to true.
func QueryIntent(
ctx context.Context, reader storage.Reader, cArgs CommandArgs, resp kvpb.Response,
) (result.Result, error) {
args := cArgs.Args.(*kvpb.QueryIntentRequest)
h := cArgs.Header
reply := resp.(*kvpb.QueryIntentResponse)
ownTxn := false
if h.Txn != nil {
// Determine if the request is querying an intent in its own
// transaction. If not, the request is rejected as querying one
// transaction's intent from within another transaction is unsupported.
if h.Txn.ID == args.Txn.ID {
ownTxn = true
} else {
return result.Result{}, ErrTransactionUnsupported
}
}
if h.WriteTimestamp().Less(args.Txn.WriteTimestamp) {
// This condition must hold for the timestamp cache update in
// Replica.updateTimestampCache to be safe.
return result.Result{}, errors.AssertionFailedf("QueryIntent request timestamp %s less than txn WriteTimestamp %s",
h.Timestamp, args.Txn.WriteTimestamp)
}
// Intents have special handling because there's an associated timestamp
// component with them.
//
// TODO(arul): We should be able to remove the lock.None case once
// compatibility with 24.1 is no longer an issue.
if args.LockStrength == lock.Intent || args.LockStrength == lock.None {
// Read from the lock table to see if an intent exists.
intent, err := storage.GetIntent(ctx, reader, args.Key, storage.BatchEvalReadCategory)
if err != nil {
return result.Result{}, err
}
reply.FoundIntent = false
reply.FoundUnpushedIntent = false
if intent != nil {
// See comment on QueryIntentRequest.Txn for an explanation of this
// comparison.
// TODO(nvanbenschoten): Now that we have a full intent history,
// we can look at the exact sequence! That won't serve as much more
// than an assertion that QueryIntent is being used correctly.
reply.FoundIntent = (args.Txn.ID == intent.Txn.ID) &&
(args.Txn.Epoch == intent.Txn.Epoch) &&
(args.Txn.Sequence <= intent.Txn.Sequence)
if !reply.FoundIntent {
log.VEventf(ctx, 2, "intent mismatch requires - %v == %v and %v == %v and %v <= %v",
args.Txn.ID, intent.Txn.ID, args.Txn.Epoch, intent.Txn.Epoch, args.Txn.Sequence, intent.Txn.Sequence)
} else {
// If we found a matching intent, check whether the intent was pushed past
// its expected timestamp.
cmpTS := args.Txn.WriteTimestamp
if ownTxn {
// If the request is querying an intent for its own transaction, forward
// the timestamp we compare against to the provisional commit timestamp
// in the batch header.
cmpTS.Forward(h.Txn.WriteTimestamp)
}
reply.FoundUnpushedIntent = intent.Txn.WriteTimestamp.LessEq(cmpTS)
if !reply.FoundUnpushedIntent {
log.VEventf(ctx, 2, "found pushed intent")
// If the request was querying an intent in its own transaction, update
// the response transaction.
// TODO(nvanbenschoten): if this is necessary for correctness, say so.
// And then add a test to demonstrate that.
if ownTxn {
reply.Txn = h.Txn.Clone()
reply.Txn.WriteTimestamp.Forward(intent.Txn.WriteTimestamp)
}
}
}
} else {
log.VEventf(ctx, 2, "found no intent")
}
if !reply.FoundIntent && args.ErrorIfMissing {
return result.Result{}, kvpb.NewIntentMissingError(args.Key, intent)
}
} else {
found, err := storage.VerifyLock(
ctx, reader, &args.Txn, args.LockStrength, args.Key, args.IgnoredSeqNums,
)
if err != nil {
return result.Result{}, err
}
if found {
reply.FoundIntent = true
reply.FoundUnpushedIntent = true
}
if !reply.FoundIntent && args.ErrorIfMissing {
return result.Result{}, kvpb.NewIntentMissingError(args.Key, nil /* intent */)
}
}
return result.Result{}, nil
}