-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathreplica_read.go
134 lines (120 loc) · 4.75 KB
/
replica_read.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package storage
import (
"context"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
// executeReadOnlyBatch updates the read timestamp cache and waits for any
// overlapping writes currently processing through Raft ahead of us to
// clear via the latches.
func (r *Replica) executeReadOnlyBatch(
ctx context.Context, ba roachpb.BatchRequest,
) (br *roachpb.BatchResponse, pErr *roachpb.Error) {
// If the read is not inconsistent, the read requires the range lease or
// permission to serve via follower reads.
var status storagepb.LeaseStatus
var isFollowerRead bool
if ba.ReadConsistency.RequiresReadLease() {
if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil {
if nErr := r.canServeFollowerRead(ctx, ba, pErr); nErr != nil {
return nil, nErr
}
isFollowerRead = true
}
}
r.limitTxnMaxTimestamp(ctx, &ba, status)
spans, err := r.collectSpans(&ba)
if err != nil {
return nil, roachpb.NewError(err)
}
// Acquire latches to prevent overlapping commands from executing
// until this command completes.
log.Event(ctx, "acquire latches")
endCmds, err := r.beginCmds(ctx, &ba, spans)
if err != nil {
return nil, roachpb.NewError(err)
}
log.Event(ctx, "waiting for read lock")
r.readOnlyCmdMu.RLock()
defer r.readOnlyCmdMu.RUnlock()
// Guarantee we release the latches that we just acquired. It is
// important that this is inside the readOnlyCmdMu lock so that the
// timestamp cache update is synchronized. This is wrapped to delay
// pErr evaluation to its value when returning.
defer func() {
endCmds.done(br, pErr, proposalNoReevaluation)
}()
// TODO(nvanbenschoten): Can this be moved into Replica.requestCanProceed?
if _, err := r.IsDestroyed(); err != nil {
return nil, roachpb.NewError(err)
}
rSpan, err := keys.Range(ba)
if err != nil {
return nil, roachpb.NewError(err)
}
if err := r.requestCanProceed(rSpan, ba.Timestamp); err != nil {
return nil, roachpb.NewError(err)
}
// Evaluate read-only batch command. It checks for matching key range; note
// that holding readOnlyCmdMu throughout is important to avoid reads from the
// "wrong" key range being served after the range has been split.
var result result.Result
rec := NewReplicaEvalContext(r, spans)
readOnly := r.store.Engine().NewReadOnly()
if util.RaceEnabled {
readOnly = spanset.NewReadWriter(readOnly, spans)
}
defer readOnly.Close()
br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */)
// A merge is (likely) about to be carried out, and this replica
// needs to block all traffic until the merge either commits or
// aborts. See docs/tech-notes/range-merges.md.
if result.Local.DetachMaybeWatchForMerge() {
if err := r.maybeWatchForMerge(ctx); err != nil {
return nil, roachpb.NewError(err)
}
}
if intents := result.Local.DetachIntents(); len(intents) > 0 {
log.Eventf(ctx, "submitting %d intents to asynchronous processing", len(intents))
// We only allow synchronous intent resolution for consistent requests.
// Intent resolution is async/best-effort for inconsistent requests.
//
// An important case where this logic is necessary is for RangeLookup
// requests. In their case, synchronous intent resolution can deadlock
// if the request originated from the local node which means the local
// range descriptor cache has an in-flight RangeLookup request which
// prohibits any concurrent requests for the same range. See #17760.
allowSyncProcessing := ba.ReadConsistency == roachpb.CONSISTENT
if err := r.store.intentResolver.CleanupIntentsAsync(ctx, intents, allowSyncProcessing); err != nil {
log.Warning(ctx, err)
}
}
if pErr != nil {
log.VErrEvent(ctx, 3, pErr.String())
} else {
log.Event(ctx, "read completed")
if isFollowerRead {
r.store.metrics.FollowerReadsCount.Inc(1)
}
}
return br, pErr
}