-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
replica_evaluate.go
696 lines (649 loc) · 25.8 KB
/
replica_evaluate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver
import (
"bytes"
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
)
// optimizePuts searches for contiguous runs of Put & CPut commands in
// the supplied request union. Any run which exceeds a minimum length
// threshold employs a full order iterator to determine whether the
// range of keys being written is empty. If so, then the run can be
// set to put "blindly", meaning no iterator need be used to read
// existing values during the MVCC write.
// The caller should use the returned slice (which is either equal to
// the input slice, or has been shallow-copied appropriately to avoid
// mutating the original requests).
func optimizePuts(
reader storage.Reader, origReqs []kvpb.RequestUnion, distinctSpans bool,
) []kvpb.RequestUnion {
var minKey, maxKey roachpb.Key
var unique map[string]struct{}
if !distinctSpans {
unique = make(map[string]struct{}, len(origReqs))
}
// Returns false on occurrence of a duplicate key.
maybeAddPut := func(key roachpb.Key) bool {
// Note that casting the byte slice key to a string does not allocate.
if unique != nil {
if _, ok := unique[string(key)]; ok {
return false
}
unique[string(key)] = struct{}{}
}
if minKey == nil || bytes.Compare(key, minKey) < 0 {
minKey = key
}
if maxKey == nil || bytes.Compare(key, maxKey) > 0 {
maxKey = key
}
return true
}
firstUnoptimizedIndex := len(origReqs)
for i, r := range origReqs {
switch t := r.GetInner().(type) {
case *kvpb.PutRequest:
if maybeAddPut(t.Key) {
continue
}
case *kvpb.ConditionalPutRequest:
if maybeAddPut(t.Key) {
continue
}
case *kvpb.InitPutRequest:
if maybeAddPut(t.Key) {
continue
}
}
firstUnoptimizedIndex = i
break
}
if firstUnoptimizedIndex < optimizePutThreshold { // don't bother if below this threshold
return origReqs
}
// iter is being used to find the parts of the key range that is empty. We
// don't need to see intents for this purpose since intents also have
// provisional values that we will see.
iter := reader.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
// We want to include maxKey in our scan. Since UpperBound is exclusive, we
// need to set it to the key after maxKey.
UpperBound: maxKey.Next(),
})
defer iter.Close()
// If there are enough puts in the run to justify calling seek,
// we can determine whether any part of the range being written
// is "virgin" and set the puts to write blindly.
// Find the first non-empty key in the run.
iter.SeekGE(storage.MakeMVCCMetadataKey(minKey))
var iterKey roachpb.Key
if ok, err := iter.Valid(); err != nil {
// TODO(bdarnell): return an error here instead of silently
// running without the optimization?
log.Errorf(context.TODO(), "Seek returned error; disabling blind-put optimization: %+v", err)
return origReqs
} else if ok && bytes.Compare(iter.UnsafeKey().Key, maxKey) <= 0 {
iterKey = iter.UnsafeKey().Key.Clone()
}
// Set the prefix of the run which is being written to virgin
// keyspace to "blindly" put values.
reqs := append([]kvpb.RequestUnion(nil), origReqs...)
for i := range reqs[:firstUnoptimizedIndex] {
inner := reqs[i].GetInner()
if iterKey == nil || bytes.Compare(iterKey, inner.Header().Key) > 0 {
switch t := inner.(type) {
case *kvpb.PutRequest:
shallow := *t
shallow.Blind = true
reqs[i].MustSetInner(&shallow)
case *kvpb.ConditionalPutRequest:
shallow := *t
shallow.Blind = true
reqs[i].MustSetInner(&shallow)
case *kvpb.InitPutRequest:
shallow := *t
shallow.Blind = true
reqs[i].MustSetInner(&shallow)
default:
log.Fatalf(context.TODO(), "unexpected non-put request: %s", t)
}
}
}
return reqs
}
// evaluateBatch evaluates a batch request by splitting it up into its
// individual commands, passing them to evaluateCommand, and combining
// the results.
func evaluateBatch(
ctx context.Context,
idKey kvserverbase.CmdIDKey,
readWriter storage.ReadWriter,
rec batcheval.EvalContext,
ms *enginepb.MVCCStats,
ba *kvpb.BatchRequest,
g *concurrency.Guard,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
evalPath batchEvalPath,
) (_ *kvpb.BatchResponse, _ result.Result, retErr *kvpb.Error) {
defer func() {
// Ensure that errors don't carry the WriteTooOld flag set. The client
// handles non-error responses with the WriteTooOld flag set, and errors
// with this flag set confuse it.
if retErr != nil && retErr.GetTxn() != nil {
retErr.GetTxn().WriteTooOld = false
}
}()
// NB: Don't mutate BatchRequest directly.
baReqs := ba.Requests
// baHeader is the header passed to evaluate each command in the batch.
// After each command, it is updated with the result and used again for
// the next batch. At the end of evaluation, it is used to populate the
// response header (timestamp, txn, etc).
baHeader := ba.Header
br := ba.CreateReply()
// Optimize any contiguous sequences of put and conditional put ops.
if len(baReqs) >= optimizePutThreshold && evalPath == readWrite {
baReqs = optimizePuts(readWriter, baReqs, baHeader.DistinctSpans)
}
// Create a clone of the transaction to store the new txn state produced on
// the return/error path.
if baHeader.Txn != nil {
baHeader.Txn = baHeader.Txn.Clone()
// Check whether this transaction has been aborted, if applicable. This
// applies to reads and writes once a transaction that has begun to
// acquire locks (see #2231 for more about why we check for aborted
// transactions on reads). Note that 1PC transactions have had their
// transaction field cleared by this point so we do not execute this
// check in that case.
if baHeader.Txn.IsLocking() {
// We don't check the abort span for a couple of special requests:
// - if the request is asking to abort the transaction, then don't check the
// AbortSpan; we don't want the request to be rejected if the transaction
// has already been aborted.
// - heartbeats don't check the abort span. If the txn is aborted, they'll
// return an aborted proto in their otherwise successful response.
// TODO(nvanbenschoten): Let's remove heartbeats from this allowlist when
// we rationalize the TODO in txnHeartbeater.heartbeat.
if !ba.IsSingleAbortTxnRequest() && !ba.IsSingleHeartbeatTxnRequest() {
if pErr := checkIfTxnAborted(ctx, rec, readWriter, *baHeader.Txn); pErr != nil {
return nil, result.Result{}, pErr
}
}
}
}
var mergedResult result.Result
// WriteTooOldErrors have particular handling. When a request encounters the
// error, we'd like to lay down an intent in order to avoid writers being
// starved. So, for blind writes, we swallow the error and instead we set the
// WriteTooOld flag on the response. For non-blind writes (e.g. CPut), we
// can't do that and so we just return the WriteTooOldError - see note on
// IsRead() stanza below. Upon receiving either a WriteTooOldError or a
// response with the WriteTooOld flag set, the client will attempt to bump
// the txn's read timestamp through a refresh. If successful, the client
// will retry this batch (in both cases).
//
// In any case, evaluation of the current batch always continue after a
// WriteTooOldError in order to find out if there's more conflicts and chose
// a final write timestamp.
var writeTooOldState struct {
err *kvpb.WriteTooOldError
// cantDeferWTOE is set when a WriteTooOldError cannot be deferred past the
// end of the current batch.
cantDeferWTOE bool
}
// Only collect the scan stats if the tracing is enabled.
var ss *kvpb.ScanStats
if sp := tracing.SpanFromContext(ctx); sp.RecordingType() != tracingpb.RecordingOff {
ss = &kvpb.ScanStats{}
defer func() {
if ss.NumGets != 0 || ss.NumScans != 0 || ss.NumReverseScans != 0 {
// Only record non-empty ScanStats.
sp.RecordStructured(ss)
}
}()
}
// TODO(tbg): if we introduced an "executor" helper here that could carry state
// across the slots in the batch while we execute them, this code could come
// out a lot less ad-hoc.
for index, union := range baReqs {
// Execute the command.
args := union.GetInner()
if baHeader.Txn != nil {
// Set the Request's sequence number on the TxnMeta for this
// request. The MVCC layer (currently) uses TxnMeta to
// pass input arguments, such as the seqnum at which a
// request operates.
baHeader.Txn.Sequence = args.Header().Sequence
}
// If a unittest filter was installed, check for an injected error; otherwise, continue.
if filter := rec.EvalKnobs().TestingEvalFilter; filter != nil {
filterArgs := kvserverbase.FilterArgs{
Ctx: ctx,
CmdID: idKey,
Index: index,
Sid: rec.StoreID(),
Req: args,
Version: rec.ClusterSettings().Version.ActiveVersionOrEmpty(ctx).Version,
Hdr: baHeader,
}
if pErr := filter(filterArgs); pErr != nil {
if pErr.GetTxn() == nil {
pErr.SetTxn(baHeader.Txn)
}
log.Infof(ctx, "test injecting error: %s", pErr)
return nil, result.Result{}, pErr
}
}
reply := br.Responses[index].GetInner()
if seq := args.Header().KVNemesisSeq.Get(); seq != 0 {
ctx = kvnemesisutil.WithSeq(ctx, seq)
}
// Note that `reply` is populated even when an error is returned: it
// may carry a response transaction and in the case of WriteTooOldError
// (which is sometimes deferred) it is fully populated.
curResult, err := evaluateCommand(
ctx, readWriter, rec, ms, ss, baHeader, args, reply, g, st, ui, evalPath,
)
if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil {
filterArgs := kvserverbase.FilterArgs{
Ctx: ctx,
CmdID: idKey,
Index: index,
Sid: rec.StoreID(),
Req: args,
Hdr: baHeader,
Err: err,
}
if pErr := filter(filterArgs); pErr != nil {
if pErr.GetTxn() == nil {
pErr.SetTxn(baHeader.Txn)
}
log.Infof(ctx, "test injecting error: %s", pErr)
return nil, result.Result{}, pErr
}
}
// If this request is transactional, we now have potentially two
// transactions floating around: the one on baHeader.Txn (always
// there in a txn) and possibly a newer version in `reply.Header().Txn`.
// Absorb the update into baHeader.Txn and write back a nil Transaction
// to the header to ensure that there is only one version going forward.
if headerCopy := reply.Header(); baHeader.Txn != nil && headerCopy.Txn != nil {
baHeader.Txn.Update(headerCopy.Txn)
headerCopy.Txn = nil
reply.SetHeader(headerCopy)
}
if err != nil {
// If an EndTxn wants to restart because of a write too old, we
// might have a better error to return to the client.
if retErr := (*kvpb.TransactionRetryError)(nil); errors.As(err, &retErr) &&
retErr.Reason == kvpb.RETRY_WRITE_TOO_OLD &&
args.Method() == kvpb.EndTxn && writeTooOldState.err != nil {
err = writeTooOldState.err
// Don't defer this error. We could perhaps rely on the client observing
// the WriteTooOld flag and retry the batch, but we choose not too.
writeTooOldState.cantDeferWTOE = true
} else if wtoErr := (*kvpb.WriteTooOldError)(nil); errors.As(err, &wtoErr) {
// We got a WriteTooOldError. We continue on to run all
// commands in the batch in order to determine the highest
// timestamp for more efficient retries. If the batch is
// transactional, we continue to lay down intents so that
// other concurrent overlapping transactions are forced
// through intent resolution and the chances of this batch
// succeeding when it will be retried are increased.
if writeTooOldState.err != nil {
writeTooOldState.err.ActualTimestamp.Forward(
wtoErr.ActualTimestamp)
} else {
writeTooOldState.err = wtoErr
}
// For read-write requests that observe key-value state, we don't have
// the option of leaving an intent behind when they encounter a
// WriteTooOldError, so we have to return an error instead of a response
// with the WriteTooOld flag set (which would also leave intents
// behind). These requests need to be re-evaluated at the bumped
// timestamp in order for their results to be valid. The current
// evaluation resulted in an result that could well be different from
// what the request would return if it were evaluated at the bumped
// timestamp, which would cause the request to be rejected if it were
// sent again with the same sequence number after a refresh.
//
// Similarly, for read-only requests that encounter a WriteTooOldError,
// we don't have the option of returning a response with the WriteTooOld
// flag set because a response is not even generated in tandem with the
// WriteTooOldError. We could fix this and then allow WriteTooOldErrors
// to be deferred in these cases, but doing so would buy more into the
// extremely error-prone approach of retuning responses and errors
// together throughout the MVCC read path. Doing so is not desirable as
// it has repeatedly caused bugs in the past. Instead, we'd like to get
// rid of this pattern entirely and instead address the TODO below.
//
// TODO(andrei): What we really want to do here is either speculatively
// evaluate the request at the bumped timestamp and return that
// speculative result, or leave behind an unreplicated lock that won't
// prevent the request for evaluating again at the same sequence number
// but at a bumped timestamp.
if !kvpb.IsBlindWrite(args) {
writeTooOldState.cantDeferWTOE = true
}
if baHeader.Txn != nil {
log.VEventf(ctx, 2, "setting WriteTooOld because of key: %s. wts: %s -> %s",
args.Header().Key, baHeader.Txn.WriteTimestamp, wtoErr.ActualTimestamp)
baHeader.Txn.WriteTimestamp.Forward(wtoErr.ActualTimestamp)
baHeader.Txn.WriteTooOld = true
} else {
// For non-transactional requests, there's nowhere to defer the error
// to. And the request has to fail because non-transactional batches
// should read and write at the same timestamp.
writeTooOldState.cantDeferWTOE = true
}
// Clear error; we're done processing the WTOE for now and we'll return
// to considering it below after we've evaluated all requests.
err = nil
}
}
// Even on error, we need to propagate the result of evaluation.
//
// TODO(tbg): find out if that's true and why and improve the comment.
if err := mergedResult.MergeAndDestroy(curResult); err != nil {
log.Fatalf(
ctx,
"unable to absorb Result: %s\ndiff(new, old): %s",
err, pretty.Diff(curResult, mergedResult),
)
}
if err != nil {
pErr := kvpb.NewErrorWithTxn(err, baHeader.Txn)
// Initialize the error index.
pErr.SetErrorIndex(int32(index))
return nil, mergedResult, pErr
}
// If the last request was carried out with a limit, subtract the number
// of results from the limit going forward. Exhausting the limit results
// in a limit of -1. This makes sure that we still execute the rest of
// the batch, but with limit-aware operations returning no data.
h := reply.Header()
if limit, retResults := baHeader.MaxSpanRequestKeys, h.NumKeys; limit != 0 && retResults > 0 {
if retResults < limit {
baHeader.MaxSpanRequestKeys -= retResults
} else {
// The limit was either exceeded (which is allowed in some cases
// like when WholeRowsOfSize is set and AllowEmpty is false) or
// was exactly used up, so drop to -1 instead of zero (which
// would mean "no limit").
baHeader.MaxSpanRequestKeys = -1
}
}
// Same as for MaxSpanRequestKeys above, keep track of the limit and make
// sure to fall through to -1 instead of hitting zero (which means no
// limit). We have to check the ResumeReason as well, since e.g. a Scan
// response may not include the value that pushed it across the limit.
if baHeader.TargetBytes > 0 {
if h.ResumeReason == kvpb.RESUME_BYTE_LIMIT {
baHeader.TargetBytes = -1
} else if baHeader.TargetBytes > h.NumBytes {
baHeader.TargetBytes -= h.NumBytes
} else {
baHeader.TargetBytes = -1
}
}
}
// If we made it here, there was no error during evaluation, with the exception of
// a deferred WTOE. If it can't be deferred - return it now; otherwise it is swallowed.
// Note that we don't attach an Index to the returned Error.
//
// TODO(tbg): we could attach the index of the first WriteTooOldError seen, but does
// that buy us anything?
if writeTooOldState.cantDeferWTOE {
// NB: we can't do any error wrapping here yet due to compatibility with 20.2 nodes;
// there needs to be an ErrorDetail here.
return nil, mergedResult, kvpb.NewErrorWithTxn(writeTooOldState.err, baHeader.Txn)
}
// The batch evaluation will not return an error (i.e. either everything went
// fine or we're deferring a WriteTooOldError by having bumped
// baHeader.Txn.WriteTimestamp).
// Update the batch response timestamp field to the timestamp at which the
// batch's reads were evaluated.
if baHeader.Txn != nil {
// If transactional, send out the final transaction entry with the reply.
br.Txn = baHeader.Txn
// Note that br.Txn.ReadTimestamp might be higher than baHeader.Timestamp if
// we had an EndTxn that decided that it can refresh to something higher
// than baHeader.Timestamp because there were no refresh spans.
if br.Txn.ReadTimestamp.Less(baHeader.Timestamp) {
log.Fatalf(ctx, "br.Txn.ReadTimestamp < ba.Timestamp (%s < %s). ba: %s",
br.Txn.ReadTimestamp, baHeader.Timestamp, ba)
}
br.Timestamp = br.Txn.ReadTimestamp
} else {
br.Timestamp = baHeader.Timestamp
}
return br, mergedResult, nil
}
// evaluateCommand delegates to the eval method for the given
// kvpb.Request. The returned Result may be partially valid
// even if an error is returned. maxKeys is the number of scan results
// remaining for this batch (MaxInt64 for no limit).
func evaluateCommand(
ctx context.Context,
readWriter storage.ReadWriter,
rec batcheval.EvalContext,
ms *enginepb.MVCCStats,
ss *kvpb.ScanStats,
h kvpb.Header,
args kvpb.Request,
reply kvpb.Response,
g *concurrency.Guard,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
evalPath batchEvalPath,
) (result.Result, error) {
var err error
var pd result.Result
if cmd, ok := batcheval.LookupCommand(args.Method()); ok {
var now hlc.ClockTimestamp
if st != nil {
now = st.Now
}
cArgs := batcheval.CommandArgs{
EvalCtx: rec,
Header: h,
Args: args,
Now: now,
Stats: ms,
ScanStats: ss,
Concurrency: g,
Uncertainty: ui,
DontInterleaveIntents: evalPath == readOnlyWithoutInterleavedIntents,
}
if cmd.EvalRW != nil {
pd, err = cmd.EvalRW(ctx, readWriter, cArgs, reply)
} else {
pd, err = cmd.EvalRO(ctx, readWriter, cArgs, reply)
}
} else {
return result.Result{}, errors.Errorf("unrecognized command %s", args.Method())
}
if log.ExpensiveLogEnabled(ctx, 2) {
trunc := func(s string) string {
const maxLen = 256
if len(s) > maxLen {
return s[:maxLen-3] + "..."
}
return s
}
var resp string
if reply.Size() > 1024 /* 1kb */ {
// Avoid printing the entire response before truncating.
header := reply.Header()
resp = trunc(header.String())
} else {
resp = trunc(reply.String())
}
log.VEventf(ctx, 2, "evaluated %s command %s, txn=%v : resp=%s, err=%v",
args.Method(), trunc(args.String()), h.Txn, resp, err)
}
return pd, err
}
// canDoServersideRetry looks at the error produced by evaluating ba (or the
// WriteTooOldFlag in br.Txn if there's no error) and decides if it's possible
// to retry the batch evaluation at a higher timestamp. Retrying is sometimes
// possible in case of some retriable errors which ask for higher timestamps:
// for transactional requests, retrying is possible if the transaction had not
// performed any prior reads that need refreshing.
//
// This function is called both below and above latching, which is indicated by
// the concurrency guard argument. The concurrency guard, if not nil, indicates
// that the caller is holding latches and cannot adjust its timestamp beyond the
// limits of what is protected by those latches. If the concurrency guard is
// nil, the caller indicates that it is not holding latches and can therefore
// more freely adjust its timestamp because it will re-acquire latches at
// whatever timestamp the batch is bumped to.
//
// deadline, if not nil, specifies the highest timestamp (exclusive) at which
// the request can be evaluated. If ba is a transactional request, then deadline
// cannot be specified; a transaction's deadline comes from it's EndTxn request.
//
// If true is returned, ba and ba.Txn will have been updated with the new
// timestamp.
func canDoServersideRetry(
ctx context.Context,
pErr *kvpb.Error,
ba *kvpb.BatchRequest,
br *kvpb.BatchResponse,
g *concurrency.Guard,
deadline hlc.Timestamp,
) bool {
if ba.Txn != nil {
if !ba.CanForwardReadTimestamp {
return false
}
if !deadline.IsEmpty() {
log.Fatal(ctx, "deadline passed for transactional request")
}
if etArg, ok := ba.GetArg(kvpb.EndTxn); ok {
et := etArg.(*kvpb.EndTxnRequest)
deadline = et.Deadline
}
}
var newTimestamp hlc.Timestamp
if ba.Txn != nil {
if pErr != nil {
var ok bool
ok, newTimestamp = kvpb.TransactionRefreshTimestamp(pErr)
if !ok {
return false
}
} else {
if !br.Txn.WriteTooOld {
log.Fatalf(ctx, "expected the WriteTooOld flag to be set")
}
newTimestamp = br.Txn.WriteTimestamp
}
} else {
if pErr == nil {
log.Fatalf(ctx, "canDoServersideRetry called for non-txn request without error")
}
switch tErr := pErr.GetDetail().(type) {
case *kvpb.WriteTooOldError:
newTimestamp = tErr.RetryTimestamp()
case *kvpb.ReadWithinUncertaintyIntervalError:
newTimestamp = tErr.RetryTimestamp()
default:
return false
}
}
if batcheval.IsEndTxnExceedingDeadline(newTimestamp, deadline) {
return false
}
return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp)
}
// canReadOnlyRequestDropLatchesBeforeEval determines whether the batch request
// can potentially resolve its conflicts upfront (by scanning just the lock
// table first), bump the ts cache, release latches and then proceed with
// evaluation. Only non-locking read requests that aren't being evaluated under
// the `OptimisticEval` path are eligible for this optimization.
func canReadOnlyRequestDropLatchesBeforeEval(ba *kvpb.BatchRequest, g *concurrency.Guard) bool {
if g == nil {
// NB: A nil guard indicates that the caller is not holding latches.
return false
}
switch ba.Header.ReadConsistency {
case kvpb.CONSISTENT:
// TODO(aayush): INCONSISTENT and READ_UNCOMMITTED reads do not care about
// resolving lock conflicts at all. Yet, they can still drop latches early and
// evaluate once they've pinned their pebble engine state. We should consider
// supporting this by letting these kinds of requests drop latches early while
// also skipping the initial validation step of scanning the lock table.
case kvpb.INCONSISTENT, kvpb.READ_UNCOMMITTED:
return false
default:
panic(fmt.Sprintf("unexpected ReadConsistency: %s", ba.Header.ReadConsistency))
}
switch g.EvalKind {
case concurrency.PessimisticEval, concurrency.PessimisticAfterFailedOptimisticEval:
case concurrency.OptimisticEval:
// Requests going through the optimistic path are not allowed to drop their
// latches before evaluation since we do not know upfront the extent to
// which they will end up reading, and thus we cannot determine how much of
// the timestamp cache to update.
return false
default:
panic(fmt.Sprintf("unexpected EvalKind: %v", g.EvalKind))
}
// Only non-locking reads are eligible. This is because requests that need to
// lock the keys that they end up reading need to be isolated against other
// conflicting requests during their execution. Thus, they cannot release
// their latches before evaluation.
if ba.IsLocking() {
return false
}
switch ba.WaitPolicy {
case lock.WaitPolicy_Block, lock.WaitPolicy_Error:
case lock.WaitPolicy_SkipLocked:
// SkipLocked requests should only bump the timestamp cache over the keys
// that they actually ended up reading, and not the keys they ended up
// skipping over. Thus, they are not allowed to drop their latches before
// evaluation.
return false
default:
panic(fmt.Sprintf("unexpected WaitPolicy: %s", ba.WaitPolicy))
}
// We allow all non-locking, pessimistically evaluating read requests to try
// and resolve their conflicts upfront.
for _, req := range ba.Requests {
inner := req.GetInner()
switch inner.(type) {
case *kvpb.ExportRequest, *kvpb.GetRequest, *kvpb.ScanRequest, *kvpb.ReverseScanRequest:
default:
return false
}
}
return true
}