diff --git a/pkg/sql/logictest/testdata/logic_test/upsert b/pkg/sql/logictest/testdata/logic_test/upsert index ed779a55183a..ba0084b11550 100644 --- a/pkg/sql/logictest/testdata/logic_test/upsert +++ b/pkg/sql/logictest/testdata/logic_test/upsert @@ -1213,3 +1213,24 @@ DROP TABLE source statement ok DROP TABLE target + +# Regression test for UPSERT batching logic (#51391). +statement ok +SET CLUSTER SETTING kv.raft.command.max_size='4MiB'; +CREATE TABLE src (s STRING); +CREATE TABLE dest (s STRING); +INSERT INTO src +SELECT + 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' +FROM + generate_series(1, 50000) + +# This statement produces a raft command of about 6.6 MiB in size, so if the +# batching logic is incorrect, we'll encounter "command is too large" error. +statement ok +UPSERT INTO dest (s) (SELECT s FROM src) + +statement ok +RESET CLUSTER SETTING kv.raft.command.max_size; +DROP TABLE src; +DROP TABLE dest diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 5da25178bf0b..cf6b82acbb23 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1514,7 +1514,6 @@ func (ef *execFactory) ConstructUpsert( insertCols: ri.InsertCols, tw: optTableUpserter{ ri: ri, - alloc: &ef.planner.alloc, canaryOrdinal: int(canaryCol), fkTables: fkTables, fetchCols: fetchColDescs, diff --git a/pkg/sql/tablewriter_upsert_opt.go b/pkg/sql/tablewriter_upsert_opt.go index a7ce5f786bc3..9b399ea250e9 100644 --- a/pkg/sql/tablewriter_upsert_opt.go +++ b/pkg/sql/tablewriter_upsert_opt.go @@ -45,8 +45,7 @@ import ( type optTableUpserter struct { tableWriterBase - ri row.Inserter - alloc *sqlbase.DatumAlloc + ri row.Inserter // Should we collect the rows for a RETURNING clause? collectRows bool @@ -67,16 +66,6 @@ type optTableUpserter struct { // collectRows is set, counted separately otherwise. resultCount int - // Contains all the rows to be inserted. - insertRows rowcontainer.RowContainer - - // existingRows is used to store rows in a batch when checking for conflicts - // with rows earlier in the batch. Is is reused per batch. - existingRows *rowcontainer.RowContainer - - // For allocation avoidance. - indexKeyPrefix []byte - // fetchCols indicate which columns need to be fetched from the target table, // in order to detect whether a conflict has occurred, as well as to provide // existing values for updates. @@ -119,18 +108,15 @@ func (tu *optTableUpserter) init( ctx context.Context, txn *kv.Txn, evalCtx *tree.EvalContext, ) error { tu.tableWriterBase.init(txn) - tableDesc := tu.tableDesc() - tu.insertRows.Init( - evalCtx.Mon.MakeBoundAccount(), sqlbase.ColTypeInfoFromColDescs(tu.ri.InsertCols), 0, - ) - - // collectRows, set upon initialization, indicates whether or not we want rows returned from the operation. + // collectRows, set upon initialization, indicates whether or not we want + // rows returned from the operation. if tu.collectRows { + tu.resultRow = make(tree.Datums, len(tu.returnCols)) tu.rowsUpserted = rowcontainer.NewRowContainer( evalCtx.Mon.MakeBoundAccount(), - sqlbase.ColTypeInfoFromColDescs(tableDesc.Columns), - tu.insertRows.Len(), + sqlbase.ColTypeInfoFromColDescs(tu.returnCols), + 0, /* rowCapacity */ ) // Create the map from colIds to the expected columns. @@ -138,6 +124,7 @@ func (tu *optTableUpserter) init( // because even though we might insert values into mutation columns, we // never return them back to the user. tu.colIDToReturnIndex = map[sqlbase.ColumnID]int{} + tableDesc := tu.tableDesc() for i := range tableDesc.Columns { id := tableDesc.Columns[i].ID tu.colIDToReturnIndex[id] = i @@ -156,33 +143,14 @@ func (tu *optTableUpserter) init( } } - tu.insertRows.Init( - evalCtx.Mon.MakeBoundAccount(), sqlbase.ColTypeInfoFromColDescs(tu.ri.InsertCols), 0, - ) - - tu.indexKeyPrefix = sqlbase.MakeIndexKeyPrefix(tableDesc.TableDesc(), tableDesc.PrimaryIndex.ID) - - if tu.collectRows { - tu.resultRow = make(tree.Datums, len(tu.returnCols)) - tu.rowsUpserted = rowcontainer.NewRowContainer( - evalCtx.Mon.MakeBoundAccount(), - sqlbase.ColTypeInfoFromColDescs(tu.returnCols), - tu.insertRows.Len(), - ) - } - return nil } // flushAndStartNewBatch is part of the tableWriter interface. func (tu *optTableUpserter) flushAndStartNewBatch(ctx context.Context) error { - tu.insertRows.Clear(ctx) if tu.collectRows { tu.rowsUpserted.Clear(ctx) } - if tu.existingRows != nil { - tu.existingRows.Clear(ctx) - } return tu.tableWriterBase.flushAndStartNewBatch(ctx, tu.tableDesc()) } @@ -197,14 +165,8 @@ func (tu *optTableUpserter) batchedValues(rowIdx int) tree.Datums { return tu.rowsUpserted.At(rowIdx) } -func (tu *optTableUpserter) curBatchSize() int { return tu.insertRows.Len() } - // close is part of the tableWriter interface. func (tu *optTableUpserter) close(ctx context.Context) { - tu.insertRows.Close(ctx) - if tu.existingRows != nil { - tu.existingRows.Close(ctx) - } if tu.rowsUpserted != nil { tu.rowsUpserted.Close(ctx) } diff --git a/pkg/sql/upsert.go b/pkg/sql/upsert.go index 5deebf632102..09a11ea6d8cd 100644 --- a/pkg/sql/upsert.go +++ b/pkg/sql/upsert.go @@ -174,7 +174,7 @@ func (n *upsertNode) processSourceRow(params runParams, rowVals tree.Datums) err // BatchedCount implements the batchedPlanNode interface. func (n *upsertNode) BatchedCount() int { return n.run.tw.batchedCount() } -// BatchedCount implements the batchedPlanNode interface. +// BatchedValues implements the batchedPlanNode interface. func (n *upsertNode) BatchedValues(rowIdx int) tree.Datums { return n.run.tw.batchedValues(rowIdx) } func (n *upsertNode) Close(ctx context.Context) {