-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
txn_interceptor_seq_num_allocator.go
165 lines (147 loc) · 6.48 KB
/
txn_interceptor_seq_num_allocator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kv
import (
"context"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/errors"
)
// txnSeqNumAllocator is a txnInterceptor in charge of allocating sequence
// numbers to all the individual requests in batches.
//
// Sequence numbers serve a few roles in the transaction model:
//
// 1. they are used to enforce an ordering between read and write operations in a
// single transaction that go to the same key. Each read request that travels
// through the interceptor is assigned the sequence number of the most recent
// write. Each write request that travels through the interceptor is assigned
// a sequence number larger than any previously allocated.
//
// This is true even for leaf transaction coordinators. In their case, they are
// provided the sequence number of the most recent write during construction.
// Because they only perform read operations and never issue writes, they assign
// each read this sequence number without ever incrementing their own counter.
// In this way, sequence numbers are maintained correctly across a distributed
// tree of transaction coordinators.
//
// 2. they are used to uniquely identify write operations. Because every write
// request is given a new sequence number, the tuple (txn_id, txn_epoch, seq)
// uniquely identifies a write operation across an entire cluster. This property
// is exploited when determining the status of an individual write by looking
// for its intent. We perform such an operation using the QueryIntent request
// type when pipelining transactional writes. We will do something similar
// during the recovery stage of implicitly committed transactions.
//
// 3. they are used to determine whether a batch contains the entire write set
// for a transaction. See BatchRequest.IsCompleteTransaction.
//
// 4. they are used to provide idempotency for replays and re-issues. The MVCC
// layer is sequence number-aware and ensures that reads at a given sequence
// number ignore writes in the same transaction at larger sequence numbers.
// Likewise, writes at a sequence number become no-ops if an intent with the
// same sequence is already present. If an intent with the same sequence is not
// already present but an intent with a larger sequence number is, an error is
// returned. Likewise, if an intent with the same sequence is present but its
// value is different than what we recompute, an error is returned.
//
type txnSeqNumAllocator struct {
wrapped lockedSender
// writeSeq is the current write seqnum, or the value last assigned
// to a write operation in a batch. It remains at 0 until the first
// write operation is encountered.
writeSeq enginepb.TxnSeq
// readSeqPlusOne is either:
// - 0 to indicate that read operations should read at the latest
// write seqnum (read-own-write behavior).
// - >0 to indicate that read operations should read at a
// fixed readSeq = (readSeqPlusOne - 1), also called
// "step-wise" behavior.
// We use a +1 offset so that the default value 0 can be
// used as sentinel to disable step-wise behavior.
readSeqPlusOne enginepb.TxnSeq
// commandCount indicates how many requests have been sent through
// this transaction. Reset on retryable txn errors.
// TODO(andrei): let's get rid of this. It should be maintained
// in the SQL level.
commandCount int32
}
// SendLocked is part of the txnInterceptor interface.
func (s *txnSeqNumAllocator) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
for _, ru := range ba.Requests {
// Only increment the sequence number generator for requests that
// will leave intents or requests that will commit the transaction.
// This enables ba.IsCompleteTransaction to work properly.
req := ru.GetInner()
if roachpb.IsTransactionWrite(req) || req.Method() == roachpb.EndTransaction {
s.writeSeq++
}
oldHeader := req.Header()
// Default case: operate at the current seqnum.
oldHeader.Sequence = s.writeSeq
if s.readSeqPlusOne > 0 && roachpb.IsReadOnly(req) {
// For read operations, if the step-wise execution mode has been
// enabled, then we want the read operation to read at the read
// seqnum, not the latest write seqnum.
oldHeader.Sequence = s.readSeqPlusOne - 1
}
ru.GetInner().SetHeader(oldHeader)
}
s.commandCount += int32(len(ba.Requests))
return s.wrapped.SendLocked(ctx, ba)
}
// setWrapped is part of the txnInterceptor interface.
func (s *txnSeqNumAllocator) setWrapped(wrapped lockedSender) { s.wrapped = wrapped }
// populateMetaLocked is part of the txnInterceptor interface.
func (s *txnSeqNumAllocator) populateMetaLocked(meta *roachpb.TxnCoordMeta) {
meta.CommandCount = s.commandCount
meta.Txn.Sequence = s.writeSeq
meta.ReadSeqNumPlusOne = s.readSeqPlusOne
}
// augmentMetaLocked is part of the txnInterceptor interface.
func (s *txnSeqNumAllocator) augmentMetaLocked(meta roachpb.TxnCoordMeta) {
s.commandCount += meta.CommandCount
if meta.Txn.Sequence > s.writeSeq {
s.writeSeq = meta.Txn.Sequence
}
if meta.ReadSeqNumPlusOne > s.readSeqPlusOne {
s.readSeqPlusOne = meta.ReadSeqNumPlusOne
}
}
// stepLocked bumps the read seqnum to the current write seqnum.
// Used by the TxnCoordSender's Step() method.
func (s *txnSeqNumAllocator) stepLocked() error {
if s.readSeqPlusOne-1 > s.writeSeq {
return errors.AssertionFailedf("cannot step() after mistaken initialization (%d,%d)", s.writeSeq, s.readSeqPlusOne)
}
s.readSeqPlusOne = s.writeSeq + 1
return nil
}
// disableSteppingLocked cancels the stepping behavior and
// restores read-latest-write behavior.
// Used by the TxnCoordSender's DisableStepping() method.
func (s *txnSeqNumAllocator) disableSteppingLocked() error {
s.readSeqPlusOne = 0
return nil
}
// epochBumpedLocked is part of the txnInterceptor interface.
func (s *txnSeqNumAllocator) epochBumpedLocked() {
s.commandCount = 0
s.writeSeq = 0
if s.readSeqPlusOne > 0 {
s.readSeqPlusOne = 1
} else {
s.readSeqPlusOne = 0
}
}
// closeLocked is part of the txnInterceptor interface.
func (*txnSeqNumAllocator) closeLocked() {}