-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathtablewriter.go
1280 lines (1122 loc) · 40.9 KB
/
tablewriter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2016 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package sql
import (
"bytes"
"context"
"fmt"
"github.com/pkg/errors"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
// expressionCarrier handles visiting sub-expressions.
type expressionCarrier interface {
// walkExprs explores all sub-expressions held by this object, if
// any.
walkExprs(func(desc string, index int, expr tree.TypedExpr))
}
// tableWriter handles writing kvs and forming table rows.
//
// Usage:
// err := tw.init(txn, evalCtx)
// // Handle err.
// for {
// values := ...
// row, err := tw.row(values)
// // Handle err.
// }
// err := tw.finalize()
// // Handle err.
type tableWriter interface {
expressionCarrier
// init provides the tableWriter with a Txn and optional monitor to write to
// and returns an error if it was misconfigured.
init(*client.Txn, *tree.EvalContext) error
// row performs a sql row modification (tableInserter performs an insert,
// etc). It batches up writes to the init'd txn and periodically sends them.
// The passed Datums is not used after `row` returns. The returned Datums is
// suitable for use with returningHelper.
// The traceKV parameter determines whether the individual K/V operations
// should be logged to the context. We use a separate argument here instead
// of a Value field on the context because Value access in context.Context
// is rather expensive and the tableWriter interface is used on the
// inner loop of table accesses.
row(context.Context, tree.Datums, bool /* traceKV */) (tree.Datums, error)
// finalize flushes out any remaining writes. It is called after all calls to
// row. It returns a slice of all Datums not yet returned by calls to `row`.
// The traceKV parameter determines whether the individual K/V operations
// should be logged to the context. See the comment above for why
// this a separate parameter as opposed to a Value field on the context.
//
// autoCommit specifies whether the tableWriter is free to commit the txn in
// which it was operating once all writes are performed.
finalize(
ctx context.Context, autoCommit autoCommitOpt, traceKV bool,
) (*sqlbase.RowContainer, error)
// tableDesc returns the TableDescriptor for the table that the tableWriter
// will modify.
tableDesc() *sqlbase.TableDescriptor
// fkSpanCollector returns the FkSpanCollector for the tableWriter.
fkSpanCollector() sqlbase.FkSpanCollector
// close frees all resources held by the tableWriter.
close(context.Context)
}
type autoCommitOpt int
const (
noAutoCommit autoCommitOpt = iota
autoCommitEnabled
)
var _ tableWriter = (*tableInserter)(nil)
var _ tableWriter = (*tableUpdater)(nil)
var _ tableWriter = (*tableUpserter)(nil)
var _ tableWriter = (*tableDeleter)(nil)
// tableInserter handles writing kvs and forming table rows for inserts.
type tableInserter struct {
ri sqlbase.RowInserter
// Set by init.
txn *client.Txn
b *client.Batch
}
func (ti *tableInserter) walkExprs(_ func(desc string, index int, expr tree.TypedExpr)) {}
func (ti *tableInserter) init(txn *client.Txn, _ *tree.EvalContext) error {
ti.txn = txn
ti.b = txn.NewBatch()
return nil
}
func (ti *tableInserter) row(
ctx context.Context, values tree.Datums, traceKV bool,
) (tree.Datums, error) {
return nil, ti.ri.InsertRow(ctx, ti.b, values, false, sqlbase.CheckFKs, traceKV)
}
func (ti *tableInserter) finalize(
ctx context.Context, autoCommit autoCommitOpt, _ bool,
) (*sqlbase.RowContainer, error) {
var err error
if autoCommit == autoCommitEnabled {
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
// coordinator.
err = ti.txn.CommitInBatch(ctx, ti.b)
} else {
err = ti.txn.Run(ctx, ti.b)
}
if err != nil {
return nil, sqlbase.ConvertBatchError(ctx, ti.ri.Helper.TableDesc, ti.b)
}
return nil, nil
}
func (ti *tableInserter) tableDesc() *sqlbase.TableDescriptor {
return ti.ri.Helper.TableDesc
}
func (ti *tableInserter) fkSpanCollector() sqlbase.FkSpanCollector {
return ti.ri.Fks
}
// tableUpdater handles writing kvs and forming table rows for updates.
type tableUpdater struct {
ru sqlbase.RowUpdater
// Set by init.
txn *client.Txn
evalCtx *tree.EvalContext
b *client.Batch
}
func (ti *tableInserter) close(_ context.Context) {}
func (tu *tableUpdater) walkExprs(_ func(desc string, index int, expr tree.TypedExpr)) {}
func (tu *tableUpdater) init(txn *client.Txn, evalCtx *tree.EvalContext) error {
tu.txn = txn
tu.evalCtx = evalCtx
tu.b = txn.NewBatch()
return nil
}
func (tu *tableUpdater) row(
ctx context.Context, values tree.Datums, traceKV bool,
) (tree.Datums, error) {
oldValues := values[:len(tu.ru.FetchCols)]
updateValues := values[len(tu.ru.FetchCols):]
return tu.ru.UpdateRow(ctx, tu.b, oldValues, updateValues, sqlbase.CheckFKs, traceKV)
}
func (tu *tableUpdater) finalize(
ctx context.Context, autoCommit autoCommitOpt, _ bool,
) (*sqlbase.RowContainer, error) {
var err error
if autoCommit == autoCommitEnabled {
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
// coordinator.
err = tu.txn.CommitInBatch(ctx, tu.b)
} else {
err = tu.txn.Run(ctx, tu.b)
}
if err != nil {
return nil, sqlbase.ConvertBatchError(ctx, tu.ru.Helper.TableDesc, tu.b)
}
return nil, nil
}
func (tu *tableUpdater) tableDesc() *sqlbase.TableDescriptor {
return tu.ru.Helper.TableDesc
}
func (tu *tableUpdater) fkSpanCollector() sqlbase.FkSpanCollector {
return tu.ru.Fks
}
func (tu *tableUpdater) close(_ context.Context) {}
type tableUpsertEvaler interface {
expressionCarrier
// TODO(dan): The tableUpsertEvaler interface separation was an attempt to
// keep sql logic out of the mapping between table rows and kv operations.
// Unfortunately, it was a misguided effort. tableUpserter's responsibilities
// should really be defined as those needed in distributed sql leaf nodes,
// which will necessarily include expr evaluation.
// eval populates into resultRow the values for the update case of
// an upsert, given the row that would have been inserted and the
// existing (conflicting) values.
eval(insertRow, existingRow, resultRow tree.Datums) (tree.Datums, error)
// evalComputedCols evaluates the computed columns for this upsert using the
// values in updatedRow and appends the results, in order, to appendTo,
// returning the result.
evalComputedCols(updatedRow tree.Datums, appendTo tree.Datums) (tree.Datums, error)
// shouldUpdate returns the result of evaluating the WHERE clause of the
// ON CONFLICT ... DO UPDATE clause.
shouldUpdate(insertRow tree.Datums, existingRow tree.Datums) (bool, error)
}
// tableUpserter handles writing kvs and forming table rows for upserts.
//
// There are two distinct "modes" that tableUpserter can use, one of
// which is selected by newUpsertNode(). In the general mode, rows to
// be inserted are batched up from calls to `row` and then the upsert
// is processed using finalize(). The other mode is the fast
// path. See fastTableUpserter below.
//
// In the general mode, the insert rows are accumulated in insertRows.
// Then during finalize(), the final upsert processing occurs. This
// uses 1 or 2 `client.Batch`s from the init'd txn to fetch the
// existing (conflicting) values, followed by one more `client.Batch`
// with the appropriate inserts and updates. In this case, all
// necessary `client.Batch`s are created and run within the lifetime
// of `finalize`.
//
type tableUpserter struct {
ri sqlbase.RowInserter
// insertRows are the rows produced by the insertion data source.
// These are accumulated while iterating on the insertion data source.
insertRows sqlbase.RowContainer
// updateCols indicates which columns need an update during a
// conflict. There is one entry per column descriptors in the
// table. However only the entries identified in the conflict clause
// of the original statement will be populated, to disambiguate
// columns that need an update from those that don't.
updateCols []sqlbase.ColumnDescriptor
conflictIndex sqlbase.IndexDescriptor
alloc *sqlbase.DatumAlloc
collectRows bool
anyComputed bool
// These are set for ON CONFLICT DO UPDATE, but not for DO NOTHING
evaler *upsertHelper
// updateValues is the temporary buffer for rows computed by
// evaler.eval(). It is reused from one row to the next to limit
// allocations.
updateValues tree.Datums
// Set by init.
txn *client.Txn
evalCtx *tree.EvalContext
fkTables sqlbase.TableLookupsByID // for fk checks in update case
ru sqlbase.RowUpdater
updateColIDtoRowIndex map[sqlbase.ColumnID]int
fetchCols []sqlbase.ColumnDescriptor
fetchColIDtoRowIndex map[sqlbase.ColumnID]int
fetcher sqlbase.RowFetcher
// Rows returned if collectRows is true.
rowsUpserted *sqlbase.RowContainer
// rowTemplate is used to prepare rows to add to rowsUpserted.
rowTemplate tree.Datums
// rowIdxToRetIdx maps the indexes in the inserted rows
// back to indexes in rowTemplate.
rowIdxToRetIdx []int
// For allocation avoidance.
indexKeyPrefix []byte
}
func (tu *tableUpserter) walkExprs(walk func(desc string, index int, expr tree.TypedExpr)) {
if tu.evaler != nil {
tu.evaler.walkExprs(walk)
}
}
func (tu *tableUpserter) init(txn *client.Txn, evalCtx *tree.EvalContext) error {
tableDesc := tu.tableDesc()
tu.txn = txn
tu.evalCtx = evalCtx
tu.indexKeyPrefix = sqlbase.MakeIndexKeyPrefix(tableDesc, tableDesc.PrimaryIndex.ID)
tu.insertRows.Init(
tu.evalCtx.Mon.MakeBoundAccount(), sqlbase.ColTypeInfoFromColDescs(tu.ri.InsertCols), 0,
)
if tu.collectRows {
tu.rowsUpserted = sqlbase.NewRowContainer(
tu.evalCtx.Mon.MakeBoundAccount(),
sqlbase.ColTypeInfoFromColDescs(tableDesc.Columns),
tu.insertRows.Len(),
)
// In some cases (e.g. `INSERT INTO t (a) ...`) rowVals does not contain
// all the table columns. We need to pass values for all table columns
// to rh, in the correct order; we will use rowTemplate for this. We
// also need a table that maps row indices to rowTemplate indices to
// fill in the row values; any absent values will be NULLs.
tu.rowTemplate = make(tree.Datums, len(tableDesc.Columns))
}
colIDToRetIndex := map[sqlbase.ColumnID]int{}
for i, col := range tableDesc.Columns {
colIDToRetIndex[col.ID] = i
}
tu.rowIdxToRetIdx = make([]int, len(tu.ri.InsertCols))
for i, col := range tu.ri.InsertCols {
tu.rowIdxToRetIdx[i] = colIDToRetIndex[col.ID]
}
// TODO(dan): This could be made tighter, just the rows needed for the ON
// CONFLICT and RETURNING exprs.
requestedCols := tableDesc.Columns
if len(tu.updateCols) == 0 {
tu.fetchCols = requestedCols
tu.fetchColIDtoRowIndex = sqlbase.ColIDtoRowIndexFromCols(requestedCols)
} else {
var err error
tu.ru, err = sqlbase.MakeRowUpdater(
txn,
tableDesc,
tu.fkTables,
tu.updateCols,
requestedCols,
sqlbase.RowUpdaterDefault,
tu.evalCtx,
tu.alloc,
)
if err != nil {
return err
}
// t.ru.fetchCols can also contain columns undergoing mutation.
tu.fetchCols = tu.ru.FetchCols
tu.fetchColIDtoRowIndex = tu.ru.FetchColIDtoRowIndex
tu.updateColIDtoRowIndex = make(map[sqlbase.ColumnID]int)
for i, updateCol := range tu.ru.UpdateCols {
tu.updateColIDtoRowIndex[updateCol.ID] = i
}
}
tu.insertRows.Init(
tu.evalCtx.Mon.MakeBoundAccount(), sqlbase.ColTypeInfoFromColDescs(tu.ri.InsertCols), 0,
)
var valNeededForCol util.FastIntSet
for i, col := range tu.fetchCols {
if _, ok := tu.fetchColIDtoRowIndex[col.ID]; ok {
valNeededForCol.Add(i)
}
}
tableArgs := sqlbase.RowFetcherTableArgs{
Desc: tableDesc,
Index: &tableDesc.PrimaryIndex,
ColIdxMap: tu.fetchColIDtoRowIndex,
Cols: tu.fetchCols,
ValNeededForCol: valNeededForCol,
}
return tu.fetcher.Init(
false /* reverse */, false /*returnRangeInfo*/, false /* isCheck */, tu.alloc, tableArgs,
)
}
func (tu *tableUpserter) row(
ctx context.Context, row tree.Datums, traceKV bool,
) (tree.Datums, error) {
// TODO(dan): If len(tu.insertRows) > some threshold, call flush().
return tu.insertRows.AddRow(ctx, row)
}
// finalize commits to tu.txn any rows batched up in tu.insertRows.
func (tu *tableUpserter) finalize(
ctx context.Context, autoCommit autoCommitOpt, traceKV bool,
) (*sqlbase.RowContainer, error) {
existingRows, pkToRowIdx, conflictingPKs, err := tu.fetchExisting(ctx, traceKV)
if err != nil {
return nil, err
}
// At this point existingRows contains data for the conflicting
// rows, and pkToRowIdx maps each PK in tu.insertRows to an entry in
// existingRows.
// During the upsert processing below, existingRows will contain
// initially the data from KV, but will be extended with each new
// inserted row that didn't exist in KV. Then each update will
// modify existingRows in-place, so that subsequent updates can
// discover the modified values.
tableDesc := tu.tableDesc()
b := tu.txn.NewBatch()
for i := 0; i < tu.insertRows.Len(); i++ {
insertRow := tu.insertRows.At(i)
// upsertRowPK will be the key of the conflicting row. This may
// be different from insertRow's PK if the conflicting index is a
// secondary index.
var upsertRowPK roachpb.Key
if conflictingPKs != nil {
// If a secondary index helped us find the conflicting PK for this row,
// use that PK to identify the existing row.
upsertRowPK = conflictingPKs[i]
} else {
// Otherwise, encode the values to determine the primary key.
insertRowPK, _, err := sqlbase.EncodeIndexKey(
tableDesc, &tableDesc.PrimaryIndex, tu.ri.InsertColIDtoRowIndex, insertRow, tu.indexKeyPrefix)
if err != nil {
return nil, err
}
upsertRowPK = insertRowPK
}
// At this point, upsertRowPK is either:
// - nil if a secondary index was used and determined there is no conflict already;
// - non-nil if a conflict may be present. In that case
// we must consult pkToRowIdx to determine whether we already
// have data (i.e. a conflict) in existingRows.
//
// existingRowIdx will be set to a valid value if a conflict is
// detected.
existingRowIdx := -1
if upsertRowPK != nil {
rowIdx, ok := pkToRowIdx[string(upsertRowPK)]
if ok {
// There was a conflict after all.
existingRowIdx = rowIdx
}
}
// We'll use resultRow to produce a RETURNING row below, if one is needed.
var resultRow tree.Datums
// Do we have a conflict?
if existingRowIdx == -1 {
// We don't have a conflict. This is a new row in KV. Create it.
err := tu.ri.InsertRow(ctx, b, insertRow, false, sqlbase.CheckFKs, traceKV)
if err != nil {
return nil, err
}
// We may not know the upsertRowPK yet for the new row, for
// example when the conflicting index was a secondary index.
// In that case, compute it now.
if upsertRowPK == nil {
upsertRowPK, _, err = sqlbase.EncodeIndexKey(
tableDesc, &tableDesc.PrimaryIndex, tu.ri.InsertColIDtoRowIndex, insertRow, tu.indexKeyPrefix)
if err != nil {
return nil, err
}
}
// We now need a row that has the shape of the result row.
resultRow = tu.makeResultFromInsertRow(insertRow, tableDesc.Columns)
// Then remember it for further upserts.
pkToRowIdx[string(upsertRowPK)] = len(existingRows)
existingRows = append(existingRows, resultRow)
} else if len(tu.updateCols) == 0 {
// If len(tu.updateCols) == 0, then we're in the DO NOTHING
// case. There is no update to be done, also no result row to be collected.
// See the pg docs, e.g.: https://www.postgresql.org/docs/10/static/sql-insert.html
// """
// The optional RETURNING clause causes INSERT to compute and
// return value(s) based on each row actually inserted (or
// updated, if an ON CONFLICT DO UPDATE clause was used). This
// is primarily useful for obtaining values that were supplied
// by defaults, such as a serial sequence number. However, any
// expression using the table's columns is allowed. The syntax
// of the RETURNING list is identical to that of the output list
// of SELECT. Only rows that were successfully inserted or
// updated will be returned.
// ""
continue
} else {
// existingRow carries the values previously seen in KV
// or newly inserted earlier in this batch.
existingRow := existingRows[existingRowIdx]
// Check the ON CONFLICT DO UPDATE WHERE ... clause.
existingValues := existingRow[:len(tu.ru.FetchCols)]
shouldUpdate, err := tu.evaler.shouldUpdate(insertRow, existingValues)
if err != nil {
return nil, err
}
if !shouldUpdate {
// WHERE tells us there is nothing to do. Stop here.
// There is also no RETURNING result.
// See https://www.postgresql.org/docs/10/static/sql-insert.html and the
// quoted exceirpt above.
continue
}
// Process the UPDATE ON CONFLICT.
// First compute all the updates via SET (or the pseudo-SET generated
// for UPSERT statements).
updateValues, err := tu.evaler.eval(insertRow, existingValues, tu.updateValues)
if err != nil {
return nil, err
}
// Now (re-)compute computed columns. This appends the computed
// columns at the end of updateValues.
//
// TODO(justin): We're currently wasteful here: we construct the
// result row *twice* because we need it once to evaluate any computed
// columns and again to actually perform the update. we need to find a
// way to reuse it. I'm not sure right now how best to factor this -
// suggestions welcome.
if tu.anyComputed {
newValues := make([]tree.Datum, len(existingValues))
copy(newValues, existingValues)
for i, updateValue := range updateValues {
newValues[tu.ru.FetchColIDtoRowIndex[tu.ru.UpdateCols[i].ID]] = updateValue
}
// Now that we have a complete row except for its computed columns,
// since the computed columns are at the end of the update row, we
// must evaluate the computed columns and add the results to the end
// of updateValues.
updateValues, err = tu.evaler.evalComputedCols(newValues, updateValues)
if err != nil {
return nil, err
}
}
// Queue the update in KV. This also returns an "update row"
// containing the updated values for every column in the
// table. This is useful for RETURNING, which we collect below.
updatedRow, err := tu.ru.UpdateRow(
ctx, b, existingValues, updateValues, sqlbase.CheckFKs, traceKV,
)
if err != nil {
return nil, err
}
resultRow = updatedRow
// Keep the slice for reuse.
tu.updateValues = updateValues[:0]
// Maybe the PK was updated by SET. We need to recompute a fresh
// PK for the current row from updatedRow.
// We use tu.evaler.ccIvarContainer.mapping which contains
// the suitable mapping for the table columns.
newUpsertRowPK, _, err := sqlbase.EncodeIndexKey(
tableDesc, &tableDesc.PrimaryIndex, tu.evaler.ccIvarContainer.mapping, updatedRow, tu.indexKeyPrefix)
if err != nil {
return nil, err
}
// Now update the known data in existingRows.
// Perhaps we just also inserted a new row.
if newExistingRowIdx, ok := pkToRowIdx[string(newUpsertRowPK)]; !ok {
// We're sure to have a new PK. This means the previous one
// is guaranteed to not exist any more. Remove it.
delete(pkToRowIdx, string(upsertRowPK))
// Now add the new one.
newExistingRowIdx = len(existingRows)
pkToRowIdx[string(newUpsertRowPK)] = newExistingRowIdx
existingRows = append(existingRows, updatedRow)
} else {
// If the PK has changed, we'll have a new rowIdx. In that case,
// remove the previous mapping.
if newExistingRowIdx != existingRowIdx {
delete(pkToRowIdx, string(upsertRowPK))
}
copy(existingRows[newExistingRowIdx], updatedRow)
}
}
if tu.collectRows {
// Remember the result row for the RETURNING clause.
_, err = tu.rowsUpserted.AddRow(ctx, resultRow)
if err != nil {
return nil, err
}
}
}
if autoCommit == autoCommitEnabled {
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
// coordinator.
err = tu.txn.CommitInBatch(ctx, b)
} else {
err = tu.txn.Run(ctx, b)
}
if err != nil {
return nil, sqlbase.ConvertBatchError(ctx, tableDesc, b)
}
return tu.rowsUpserted, nil
}
func (tu *tableUpserter) makeResultFromInsertRow(
insertRow tree.Datums, cols []sqlbase.ColumnDescriptor,
) tree.Datums {
resultRow := insertRow
if len(resultRow) < len(cols) {
// The row that was just inserted doesn't have this shape yet;
// it does not contain values for nullable columns.
// Build it using rowIdxToRetIdx.
// TODO(nathan/knz): maybe reuse a buffer here to avoid a
// (re-)allocation.
resultRow = make(tree.Datums, len(cols))
// Pre-fill with NULLs.
for i := range resultRow {
resultRow[i] = tree.DNull
}
// Fill the other values from insertRow.
for i, val := range insertRow {
resultRow[tu.rowIdxToRetIdx[i]] = val
}
}
return resultRow
}
// upsertRowPKs returns the primary keys of every row in tu.insertRows
// for which we might get some value from KV.
//
// - if the conflicting index is the PK, the primary key for every
// row in tu.insertRow is computed (with no KV access) and returned.
// fetchExisting() will later determine whether the
// row is already present in KV or not with a lookup.
//
// - if the conflicting index is secondary, that index is used to look
// up the primary key. If the row is absent, no key is generated.
//
// The keys returned are guaranteed to be unique.
//
// The second return value is returned non-nil when the conflicting
// index is a secondary index. It maps each row in insertRow to a PK with which
// it conflicts. Note that this may not be the PK of the row in insertRow
// itself -- merely that of _some_ row that's in KV already with the same PK.
func (tu *tableUpserter) upsertRowPKs(
ctx context.Context, traceKV bool,
) ([]roachpb.Key, map[int]roachpb.Key, error) {
upsertRowPKs := make([]roachpb.Key, 0, tu.insertRows.Len())
uniquePKs := make(map[string]struct{})
tableDesc := tu.tableDesc()
if tu.conflictIndex.ID == tableDesc.PrimaryIndex.ID {
// If the conflict index is the primary index, we can compute them directly.
// In this case, the slice will be filled, but not all rows will have
// conflicts.
for i := 0; i < tu.insertRows.Len(); i++ {
insertRow := tu.insertRows.At(i)
upsertRowPK, _, err := sqlbase.EncodeIndexKey(
tableDesc, &tu.conflictIndex, tu.ri.InsertColIDtoRowIndex, insertRow, tu.indexKeyPrefix)
if err != nil {
return nil, nil, err
}
if _, ok := uniquePKs[string(upsertRowPK)]; !ok {
upsertRowPKs = append(upsertRowPKs, upsertRowPK)
uniquePKs[string(upsertRowPK)] = struct{}{}
}
}
return upsertRowPKs, nil, nil
}
// Otherwise, compute the keys for the conflict index and look them up. The
// primary keys can be constructed from the entries that come back. In this
// case, some spots in the slice will be nil (indicating no conflict) and the
// others will be conflicting rows.
b := tu.txn.NewBatch()
for i := 0; i < tu.insertRows.Len(); i++ {
insertRow := tu.insertRows.At(i)
entries, err := sqlbase.EncodeSecondaryIndex(
tableDesc, &tu.conflictIndex, tu.ri.InsertColIDtoRowIndex, insertRow)
if err != nil {
return nil, nil, err
}
for _, entry := range entries {
if traceKV {
log.VEventf(ctx, 2, "Get %s", entry.Key)
}
b.Get(entry.Key)
}
}
if err := tu.txn.Run(ctx, b); err != nil {
return nil, nil, err
}
conflictingPKs := make(map[int]roachpb.Key)
for i, result := range b.Results {
if len(result.Rows) == 1 {
if result.Rows[0].Value != nil {
upsertRowPK, err := sqlbase.ExtractIndexKey(tu.alloc, tableDesc, result.Rows[0])
if err != nil {
return nil, nil, err
}
conflictingPKs[i] = upsertRowPK
if _, ok := uniquePKs[string(upsertRowPK)]; !ok {
upsertRowPKs = append(upsertRowPKs, upsertRowPK)
uniquePKs[string(upsertRowPK)] = struct{}{}
}
}
} else if len(result.Rows) > 1 {
panic(fmt.Errorf(
"Expected <= 1 but got %d conflicts for row %s", len(result.Rows), tu.insertRows.At(i)))
}
}
return upsertRowPKs, conflictingPKs, nil
}
// fetchExisting returns any existing rows in the table that conflict with the
// ones in tu.insertRows.
// The 1st return value contains data for conflicting rows.
// The 2nd return value returned map relates the primary key values in the
// data source to which entry in the returned slice contain data
// for that primary key.
// The 3rd return value contain the PKs for each row in tu.insertRow that
// has a known conflict. This is populated only if there were some conflicts
// found and the conflict index was a secondary index.
func (tu *tableUpserter) fetchExisting(
ctx context.Context, traceKV bool,
) ([]tree.Datums, map[string]int, map[int]roachpb.Key, error) {
tableDesc := tu.tableDesc()
// primaryKeys contains the PK values to check for conflicts.
primaryKeys, conflictingPKs, err := tu.upsertRowPKs(ctx, traceKV)
if err != nil {
return nil, nil, nil, err
}
// pkToRowIdx maps the PK values to positions in existingRows.
var existingRows []tree.Datums
pkToRowIdx := make(map[string]int)
if len(primaryKeys) == 0 {
// We know already there is no conflicting row, so there's nothing to fetch.
return existingRows, pkToRowIdx, conflictingPKs, nil
}
// pkSpans will contain the spans for every entry in primaryKeys.
pkSpans := make(roachpb.Spans, 0, len(primaryKeys))
for _, primaryKey := range primaryKeys {
pkSpans = append(pkSpans, roachpb.Span{Key: primaryKey, EndKey: primaryKey.PrefixEnd()})
}
// Start retrieving the PKs.
// We don't limit batches here because the spans are unordered.
if err := tu.fetcher.StartScan(ctx, tu.txn, pkSpans, false /* no batch limits */, 0, traceKV); err != nil {
return nil, nil, nil, err
}
// Populate existingRows and pkToRowIdx.
for {
row, _, _, err := tu.fetcher.NextRowDecoded(ctx)
if err != nil {
return nil, nil, nil, err
}
if row == nil {
break // Done
}
rowPrimaryKey, _, err := sqlbase.EncodeIndexKey(
tableDesc, &tableDesc.PrimaryIndex, tu.fetchColIDtoRowIndex, row, tu.indexKeyPrefix)
if err != nil {
return nil, nil, nil, err
}
// The rows returned by rowFetcher are invalidated after the call to
// NextRow, so we have to copy them to save them.
// TODO(knz/nathan): try to reuse a large slice instead
// of making many small slices.
rowCopy := make(tree.Datums, len(row))
copy(rowCopy, row)
pkToRowIdx[string(rowPrimaryKey)] = len(existingRows)
existingRows = append(existingRows, rowCopy)
}
return existingRows, pkToRowIdx, conflictingPKs, nil
}
func (tu *tableUpserter) tableDesc() *sqlbase.TableDescriptor {
return tu.ri.Helper.TableDesc
}
func (tu *tableUpserter) fkSpanCollector() sqlbase.FkSpanCollector {
return tu.ri.Fks
}
func (tu *tableUpserter) close(ctx context.Context) {
tu.insertRows.Close(ctx)
if tu.rowsUpserted != nil {
tu.rowsUpserted.Close(ctx)
}
}
// fastTableUpserter implements the fast path for an upsert. See
// tableUpserter above for the general case.
//
// If certain conditions are met (no secondary indexes, all table
// values being inserted, update expressions of the form `SET a =
// excluded.a`) then the upsert can be done in one `client.Batch` and
// using only `Put`s. In this case, the single batch is created during
// `init`, operated on during `row`, and run during `finalize`. This
// is the same model as the other `tableFoo`s, which are more simple
// than upsert.
type fastTableUpserter struct {
ri sqlbase.RowInserter
// Set by init.
txn *client.Txn
evalCtx *tree.EvalContext
// Used for the fast path.
fastPathBatch *client.Batch
}
func (tu *fastTableUpserter) walkExprs(_ func(_ string, _ int, _ tree.TypedExpr)) {}
// init is part of the tableWriter interface.
func (tu *fastTableUpserter) init(txn *client.Txn, evalCtx *tree.EvalContext) error {
tu.txn = txn
tu.evalCtx = evalCtx
tu.fastPathBatch = tu.txn.NewBatch()
return nil
}
// row is part of the tableWriter interface.
func (tu *fastTableUpserter) row(
ctx context.Context, row tree.Datums, traceKV bool,
) (tree.Datums, error) {
// Use the fast path, ignore conflicts.
err := tu.ri.InsertRow(
ctx, tu.fastPathBatch, row, true /* ignoreConflicts */, sqlbase.CheckFKs, traceKV)
return nil, err
}
// finalize is part of the tableWriter interface.
func (tu *fastTableUpserter) finalize(
ctx context.Context, autoCommit autoCommitOpt, traceKV bool,
) (*sqlbase.RowContainer, error) {
if autoCommit == autoCommitEnabled {
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
// coordinator.
if err := tu.txn.CommitInBatch(ctx, tu.fastPathBatch); err != nil {
return nil, err
}
} else {
if err := tu.txn.Run(ctx, tu.fastPathBatch); err != nil {
return nil, err
}
}
return nil, nil
}
func (tu *fastTableUpserter) fkSpanCollector() sqlbase.FkSpanCollector {
return tu.ri.Fks
}
func (tu *fastTableUpserter) tableDesc() *sqlbase.TableDescriptor {
return tu.ri.Helper.TableDesc
}
func (tu *fastTableUpserter) close(ctx context.Context) {}
// tableDeleter handles writing kvs and forming table rows for deletes.
type tableDeleter struct {
rd sqlbase.RowDeleter
alloc *sqlbase.DatumAlloc
batchSize int
// Set by init.
txn *client.Txn
evalCtx *tree.EvalContext
b *client.Batch
}
func (td *tableDeleter) walkExprs(_ func(desc string, index int, expr tree.TypedExpr)) {}
func (td *tableDeleter) init(txn *client.Txn, evalCtx *tree.EvalContext) error {
td.txn = txn
td.evalCtx = evalCtx
td.b = txn.NewBatch()
return nil
}
func (td *tableDeleter) row(
ctx context.Context, values tree.Datums, traceKV bool,
) (tree.Datums, error) {
// Rudimentarily chunk the deletions to avoid memory blowup in queries such
// as `DELETE FROM mytable`.
const maxBatchSize = 10000
if td.batchSize >= maxBatchSize {
if err := td.txn.Run(ctx, td.b); err != nil {
return nil, err
}
td.b = td.txn.NewBatch()
td.batchSize = 0
}
td.batchSize++
return nil, td.rd.DeleteRow(ctx, td.b, values, sqlbase.CheckFKs, traceKV)
}
// finalize is part of the tableWriter interface.
func (td *tableDeleter) finalize(
ctx context.Context, autoCommit autoCommitOpt, _ bool,
) (*sqlbase.RowContainer, error) {
if autoCommit == autoCommitEnabled {
// An auto-txn can commit the transaction with the batch. This is an
// optimization to avoid an extra round-trip to the transaction
// coordinator.
return nil, td.txn.CommitInBatch(ctx, td.b)
}
return nil, td.txn.Run(ctx, td.b)
}
// fastPathAvailable returns true if the fastDelete optimization can be used.
func (td *tableDeleter) fastPathAvailable(ctx context.Context) bool {
if len(td.rd.Helper.Indexes) != 0 {
if log.V(2) {
log.Infof(ctx, "delete forced to scan: values required to update %d secondary indexes", len(td.rd.Helper.Indexes))
}
return false
}
if td.rd.Helper.TableDesc.IsInterleaved() {
if log.V(2) {
log.Info(ctx, "delete forced to scan: table is interleaved")
}
return false
}
if len(td.rd.Helper.TableDesc.PrimaryIndex.ReferencedBy) > 0 {
if log.V(2) {
log.Info(ctx, "delete forced to scan: table is referenced by foreign keys")
}
return false
}
return true
}
// fastDelete adds to the batch the kv operations necessary to delete sql rows
// without knowing the values that are currently present. fastDelete calls
// finalize, so it should not be called after.
func (td *tableDeleter) fastDelete(
ctx context.Context, scan *scanNode, autoCommit autoCommitOpt, traceKV bool,
) (rowCount int, err error) {
for _, span := range scan.spans {
log.VEvent(ctx, 2, "fast delete: skipping scan")
if traceKV {
log.VEventf(ctx, 2, "DelRange %s - %s", span.Key, span.EndKey)
}
td.b.DelRange(span.Key, span.EndKey, true /* returnKeys */)
}
_, err = td.finalize(ctx, autoCommit, traceKV)
if err != nil {
return 0, err
}