Skip to content

Commit

Permalink
Merge pull request #51626 from yuzefovich/backport20.1-51608
Browse files Browse the repository at this point in the history
release-20.1: sql: fix pagination in UPSERT
  • Loading branch information
yuzefovich authored Jul 21, 2020
2 parents 1554f94 + b59a768 commit e69bc7b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 47 deletions.
21 changes: 21 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/upsert
Original file line number Diff line number Diff line change
Expand Up @@ -1213,3 +1213,24 @@ DROP TABLE source

statement ok
DROP TABLE target

# Regression test for UPSERT batching logic (#51391).
statement ok
SET CLUSTER SETTING kv.raft.command.max_size='4MiB';
CREATE TABLE src (s STRING);
CREATE TABLE dest (s STRING);
INSERT INTO src
SELECT
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'
FROM
generate_series(1, 50000)

# This statement produces a raft command of about 6.6 MiB in size, so if the
# batching logic is incorrect, we'll encounter "command is too large" error.
statement ok
UPSERT INTO dest (s) (SELECT s FROM src)

statement ok
RESET CLUSTER SETTING kv.raft.command.max_size;
DROP TABLE src;
DROP TABLE dest
1 change: 0 additions & 1 deletion pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,6 @@ func (ef *execFactory) ConstructUpsert(
insertCols: ri.InsertCols,
tw: optTableUpserter{
ri: ri,
alloc: &ef.planner.alloc,
canaryOrdinal: int(canaryCol),
fkTables: fkTables,
fetchCols: fetchColDescs,
Expand Down
52 changes: 7 additions & 45 deletions pkg/sql/tablewriter_upsert_opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ import (
type optTableUpserter struct {
tableWriterBase

ri row.Inserter
alloc *sqlbase.DatumAlloc
ri row.Inserter

// Should we collect the rows for a RETURNING clause?
collectRows bool
Expand All @@ -67,16 +66,6 @@ type optTableUpserter struct {
// collectRows is set, counted separately otherwise.
resultCount int

// Contains all the rows to be inserted.
insertRows rowcontainer.RowContainer

// existingRows is used to store rows in a batch when checking for conflicts
// with rows earlier in the batch. Is is reused per batch.
existingRows *rowcontainer.RowContainer

// For allocation avoidance.
indexKeyPrefix []byte

// fetchCols indicate which columns need to be fetched from the target table,
// in order to detect whether a conflict has occurred, as well as to provide
// existing values for updates.
Expand Down Expand Up @@ -119,25 +108,23 @@ func (tu *optTableUpserter) init(
ctx context.Context, txn *kv.Txn, evalCtx *tree.EvalContext,
) error {
tu.tableWriterBase.init(txn)
tableDesc := tu.tableDesc()

tu.insertRows.Init(
evalCtx.Mon.MakeBoundAccount(), sqlbase.ColTypeInfoFromColDescs(tu.ri.InsertCols), 0,
)

// collectRows, set upon initialization, indicates whether or not we want rows returned from the operation.
// collectRows, set upon initialization, indicates whether or not we want
// rows returned from the operation.
if tu.collectRows {
tu.resultRow = make(tree.Datums, len(tu.returnCols))
tu.rowsUpserted = rowcontainer.NewRowContainer(
evalCtx.Mon.MakeBoundAccount(),
sqlbase.ColTypeInfoFromColDescs(tableDesc.Columns),
tu.insertRows.Len(),
sqlbase.ColTypeInfoFromColDescs(tu.returnCols),
0, /* rowCapacity */
)

// Create the map from colIds to the expected columns.
// Note that this map will *not* contain any mutation columns - that's
// because even though we might insert values into mutation columns, we
// never return them back to the user.
tu.colIDToReturnIndex = map[sqlbase.ColumnID]int{}
tableDesc := tu.tableDesc()
for i := range tableDesc.Columns {
id := tableDesc.Columns[i].ID
tu.colIDToReturnIndex[id] = i
Expand All @@ -156,33 +143,14 @@ func (tu *optTableUpserter) init(
}
}

tu.insertRows.Init(
evalCtx.Mon.MakeBoundAccount(), sqlbase.ColTypeInfoFromColDescs(tu.ri.InsertCols), 0,
)

tu.indexKeyPrefix = sqlbase.MakeIndexKeyPrefix(tableDesc.TableDesc(), tableDesc.PrimaryIndex.ID)

if tu.collectRows {
tu.resultRow = make(tree.Datums, len(tu.returnCols))
tu.rowsUpserted = rowcontainer.NewRowContainer(
evalCtx.Mon.MakeBoundAccount(),
sqlbase.ColTypeInfoFromColDescs(tu.returnCols),
tu.insertRows.Len(),
)
}

return nil
}

// flushAndStartNewBatch is part of the tableWriter interface.
func (tu *optTableUpserter) flushAndStartNewBatch(ctx context.Context) error {
tu.insertRows.Clear(ctx)
if tu.collectRows {
tu.rowsUpserted.Clear(ctx)
}
if tu.existingRows != nil {
tu.existingRows.Clear(ctx)
}
return tu.tableWriterBase.flushAndStartNewBatch(ctx, tu.tableDesc())
}

Expand All @@ -197,14 +165,8 @@ func (tu *optTableUpserter) batchedValues(rowIdx int) tree.Datums {
return tu.rowsUpserted.At(rowIdx)
}

func (tu *optTableUpserter) curBatchSize() int { return tu.insertRows.Len() }

// close is part of the tableWriter interface.
func (tu *optTableUpserter) close(ctx context.Context) {
tu.insertRows.Close(ctx)
if tu.existingRows != nil {
tu.existingRows.Close(ctx)
}
if tu.rowsUpserted != nil {
tu.rowsUpserted.Close(ctx)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (n *upsertNode) processSourceRow(params runParams, rowVals tree.Datums) err
// BatchedCount implements the batchedPlanNode interface.
func (n *upsertNode) BatchedCount() int { return n.run.tw.batchedCount() }

// BatchedCount implements the batchedPlanNode interface.
// BatchedValues implements the batchedPlanNode interface.
func (n *upsertNode) BatchedValues(rowIdx int) tree.Datums { return n.run.tw.batchedValues(rowIdx) }

func (n *upsertNode) Close(ctx context.Context) {
Expand Down

0 comments on commit e69bc7b

Please sign in to comment.