-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathtablewriter_upsert_opt.go
231 lines (199 loc) · 7.36 KB
/
tablewriter_upsert_opt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package sql
import (
"context"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
)
// tableUpserterOpt implements the upsert operation when it is planned by the
// cost-based optimizer (CBO). The CBO can use a much simpler upserter because
// it incorporates conflict detection, update and computed column evaluation,
// and other upsert operations into the input query, rather than requiring the
// upserter to do it. For example:
//
// CREATE TABLE abc (a INT PRIMARY KEY, b INT, c INT)
// INSERT INTO abc VALUES (1, 2) ON CONFLICT (a) DO UPDATE SET b=10
//
// The CBO will generate an input expression similar to this:
//
// SELECT ins_a, ins_b, ins_c, fetch_a, fetch_b, fetch_c, 10 AS upd_b
// FROM (VALUES (1, 2, NULL)) AS ins(ins_a, ins_b, ins_c)
// LEFT OUTER JOIN abc AS fetch(fetch_a, fetch_b, fetch_c)
// ON ins_a = fetch_a
//
// The other non-CBO upserters perform custom left lookup joins. However, that
// doesn't allow sharing of optimization rules and doesn't work with correlated
// SET expressions.
type tableUpserterOpt struct {
tableUpserterBase
evalCtx *tree.EvalContext
// fetchCols indicate which columns need to be fetched from the target table,
// in order to detect whether a conflict has occurred, as well as to provide
// existing values for updates.
fetchCols []sqlbase.ColumnDescriptor
// updateCols indicate which columns need an update during a conflict.
updateCols []sqlbase.ColumnDescriptor
// canaryOrdinal is the ordinal position of the column within the input row
// that is used to decide whether to execute an insert or update operation.
// If the canary column is null, then an insert will be performed; otherwise,
// an update is performed. This column will always be one of the fetchCols.
canaryOrdinal int
// resultRow is a reusable slice of Datums used to store result rows.
resultRow tree.Datums
// fkTables is used for foreign key checks in the update case.
fkTables row.TableLookupsByID
// ru is used when updating rows.
ru row.Updater
}
// init is part of the tableWriter interface.
func (tu *tableUpserterOpt) init(txn *client.Txn, evalCtx *tree.EvalContext) error {
err := tu.tableUpserterBase.init(txn, evalCtx)
if err != nil {
return err
}
tu.evalCtx = evalCtx
if tu.collectRows {
tu.resultRow = make(tree.Datums, len(tu.colIDToReturnIndex))
}
tu.ru, err = row.MakeUpdater(
txn,
tu.tableDesc(),
tu.fkTables,
tu.updateCols,
tu.fetchCols,
row.UpdaterDefault,
evalCtx,
tu.alloc,
)
return err
}
// row is part of the tableWriter interface.
func (tu *tableUpserterOpt) row(ctx context.Context, row tree.Datums, traceKV bool) error {
tu.batchSize++
tu.resultCount++
// Consult the canary column to determine whether to insert or update.
insertEnd := len(tu.ri.InsertCols)
if row[tu.canaryOrdinal] == tree.DNull {
// No conflict, so insert a new row.
return tu.insertNonConflictingRow(ctx, tu.b, row[:insertEnd], traceKV)
}
// If no columns need to be updated, then possibly collect the unchanged row.
fetchEnd := insertEnd + len(tu.fetchCols)
if len(tu.updateCols) == 0 {
if !tu.collectRows {
return nil
}
_, err := tu.rowsUpserted.AddRow(ctx, row[insertEnd:fetchEnd])
return err
}
// Update the row.
return tu.updateConflictingRow(
ctx, tu.b, row[insertEnd:fetchEnd], row[fetchEnd:], tu.tableDesc(), traceKV)
}
// atBatchEnd is part of the extendedTableWriter interface.
func (tu *tableUpserterOpt) atBatchEnd(ctx context.Context, traceKV bool) error {
// Nothing to do, because the row method does everything.
return nil
}
// insertNonConflictingRow inserts the given source row into the table when
// there was no conflict. If the RETURNING clause was specified, then the
// inserted row is stored in the rowsUpserted collection.
func (tu *tableUpserterOpt) insertNonConflictingRow(
ctx context.Context, b *client.Batch, insertRow tree.Datums, traceKV bool,
) error {
// Perform the insert proper.
if err := tu.ri.InsertRow(
ctx, b, insertRow, false /* overwrite */, row.CheckFKs, traceKV); err != nil {
return err
}
if !tu.collectRows {
return nil
}
// Reshape the row if needed.
if tu.insertReorderingRequired {
resultRow := tu.makeResultFromRow(insertRow, tu.ri.InsertColIDtoRowIndex)
_, err := tu.rowsUpserted.AddRow(ctx, resultRow)
return err
}
_, err := tu.rowsUpserted.AddRow(ctx, insertRow)
return err
}
// updateConflictingRow updates an existing row in the table when there was a
// conflict. The existing values from the row are provided in fetchRow, and the
// updated values are provided in updateValues. The updater is assumed to
// already be initialized with the descriptors for the fetch and update values.
// If the RETURNING clause was specified, then the updated row is stored in the
// rowsUpserted collection.
func (tu *tableUpserterOpt) updateConflictingRow(
ctx context.Context,
b *client.Batch,
fetchRow tree.Datums,
updateValues tree.Datums,
tableDesc *sqlbase.ImmutableTableDescriptor,
traceKV bool,
) error {
checkHelper := tu.fkTables[tableDesc.ID].CheckHelper
// Do we need to evaluate CHECK expressions?
if len(checkHelper.Exprs) > 0 {
if err := checkHelper.LoadRow(
tu.ru.FetchColIDtoRowIndex, fetchRow, false); err != nil {
return err
}
if err := checkHelper.LoadRow(
tu.ru.UpdateColIDtoRowIndex, updateValues, true); err != nil {
return err
}
if err := checkHelper.Check(tu.evalCtx); err != nil {
return err
}
}
// Queue the update in KV. This also returns an "update row"
// containing the updated values for every column in the
// table. This is useful for RETURNING, which we collect below.
_, err := tu.ru.UpdateRow(ctx, b, fetchRow, updateValues, row.CheckFKs, traceKV)
if err != nil {
return err
}
// We only need a result row if we're collecting rows.
if !tu.collectRows {
return nil
}
// We now need a row that has the shape of the result row.
for colID, returnIndex := range tu.colIDToReturnIndex {
// If an update value for a given column exists, use that; else use the
// existing value of that column.
rowIndex, ok := tu.ru.UpdateColIDtoRowIndex[colID]
if ok {
tu.resultRow[returnIndex] = updateValues[rowIndex]
} else {
rowIndex, ok = tu.ru.FetchColIDtoRowIndex[colID]
if !ok {
panic("no existing value is available for column")
}
tu.resultRow[returnIndex] = fetchRow[rowIndex]
}
}
_, err = tu.rowsUpserted.AddRow(ctx, tu.resultRow)
return err
}
// tableDesc is part of the tableWriter interface.
func (tu *tableUpserterOpt) tableDesc() *sqlbase.ImmutableTableDescriptor {
return tu.ri.Helper.TableDesc
}
// walkExprs is part of the tableWriter interface.
func (tu *tableUpserterOpt) walkExprs(walk func(desc string, index int, expr tree.TypedExpr)) {
}