Skip to content

Commit

Permalink
sql: fix large UPSERTs with RETURNING
Browse files Browse the repository at this point in the history
In cockroachdb#51608 we fixed a bug with pagination of UPSERTs (now it is possible
to have multiple batches when performing an UPSERT of over 10k rows),
and it exposed another bug in how we're handling an UPSERT with
RETURNING clause - we were clearing the row container too early which
would result in an index of bounds crash. This is now fixed.

Release note (bug fix): Starting from v20.2.0-alpha.3 CockroachDB would
crash when performing an UPSERT with RETURNING clause of more than 10k
rows, and this is now fixed.
  • Loading branch information
yuzefovich committed Sep 16, 2020
1 parent 1bffabd commit 01e6a9a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 42 deletions.
16 changes: 16 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/upsert
Original file line number Diff line number Diff line change
Expand Up @@ -1231,3 +1231,19 @@ statement ok
RESET CLUSTER SETTING kv.raft.command.max_size;
DROP TABLE src;
DROP TABLE dest

# Regression test for finishing UPSERT too early (#54456).
statement ok
CREATE TABLE t54456 (c INT PRIMARY KEY);
UPSERT INTO t54456 SELECT i FROM generate_series(1, 25000) AS i

query I
SELECT count(*) FROM t54456
----
25000

# Regression test for clearing up upserted rows too early (#54465).
query I
WITH cte(c) AS (UPSERT INTO t54456 SELECT i FROM generate_series(25001, 40000) AS i RETURNING c) SELECT count(*) FROM cte
----
15000
2 changes: 1 addition & 1 deletion pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ func (ef *execFactory) ConstructUpsert(
// in the table.
ups.run.tw.tabColIdxToRetIdx = row.ColMapping(tabDesc.Columns, returnColDescs)
ups.run.tw.returnCols = returnColDescs
ups.run.tw.collectRows = true
ups.run.tw.rowsNeeded = true
}

if autoCommit {
Expand Down
53 changes: 13 additions & 40 deletions pkg/sql/tablewriter_upsert_opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,15 @@ type optTableUpserter struct {
ri row.Inserter

// Should we collect the rows for a RETURNING clause?
collectRows bool

// Rows returned if collectRows is true.
rowsUpserted *rowcontainer.RowContainer
rowsNeeded bool

// A mapping of column IDs to the return index used to shape the resulting
// rows to those required by the returning clause. Only required if
// collectRows is true.
// rowsNeeded is true.
colIDToReturnIndex map[descpb.ColumnID]int

// Do the result rows have a different order than insert rows. Only set if
// collectRows is true.
// rowsNeeded is true.
insertReorderingRequired bool

// fetchCols indicate which columns need to be fetched from the target table,
Expand Down Expand Up @@ -104,11 +101,11 @@ func (tu *optTableUpserter) init(
) error {
tu.tableWriterBase.init(txn, tu.ri.Helper.TableDesc)

// collectRows, set upon initialization, indicates whether or not we want
// rowsNeeded, set upon initialization, indicates whether or not we want
// rows returned from the operation.
if tu.collectRows {
if tu.rowsNeeded {
tu.resultRow = make(tree.Datums, len(tu.returnCols))
tu.rowsUpserted = rowcontainer.NewRowContainer(
tu.rows = rowcontainer.NewRowContainer(
evalCtx.Mon.MakeBoundAccount(),
colinfo.ColTypeInfoFromColDescs(tu.returnCols),
)
Expand All @@ -135,30 +132,6 @@ func (tu *optTableUpserter) init(
return nil
}

// flushAndStartNewBatch is part of the tableWriter interface.
func (tu *optTableUpserter) flushAndStartNewBatch(ctx context.Context) error {
if tu.collectRows {
tu.rowsUpserted.Clear(ctx)
}
return tu.tableWriterBase.flushAndStartNewBatch(ctx)
}

// batchedValues is a helper in implementing batchedPlanNode interface.
func (tu *optTableUpserter) batchedValues(rowIdx int) tree.Datums {
if !tu.collectRows {
panic("return row requested but collect rows was not set")
}
return tu.rowsUpserted.At(rowIdx)
}

// close is part of the tableWriter interface.
func (tu *optTableUpserter) close(ctx context.Context) {
tu.tableWriterBase.close(ctx)
if tu.rowsUpserted != nil {
tu.rowsUpserted.Close(ctx)
}
}

// makeResultFromRow reshapes a row that was inserted or updated to a row
// suitable for storing for a RETURNING clause, shaped by the target table's
// descriptor.
Expand Down Expand Up @@ -209,10 +182,10 @@ func (tu *optTableUpserter) row(
// If no columns need to be updated, then possibly collect the unchanged row.
fetchEnd := insertEnd + len(tu.fetchCols)
if len(tu.updateCols) == 0 {
if !tu.collectRows {
if !tu.rowsNeeded {
return nil
}
_, err := tu.rowsUpserted.AddRow(ctx, row[insertEnd:fetchEnd])
_, err := tu.rows.AddRow(ctx, row[insertEnd:fetchEnd])
return err
}

Expand Down Expand Up @@ -243,7 +216,7 @@ func (tu *optTableUpserter) insertNonConflictingRow(
return err
}

if !tu.collectRows {
if !tu.rowsNeeded {
return nil
}

Expand All @@ -259,7 +232,7 @@ func (tu *optTableUpserter) insertNonConflictingRow(
tu.resultRow[retIdx] = tableRow[tabIdx]
}
}
_, err := tu.rowsUpserted.AddRow(ctx, tu.resultRow)
_, err := tu.rows.AddRow(ctx, tu.resultRow)
return err
}

Expand All @@ -269,7 +242,7 @@ func (tu *optTableUpserter) insertNonConflictingRow(
tu.resultRow[retIdx] = insertRow[tabIdx]
}
}
_, err := tu.rowsUpserted.AddRow(ctx, tu.resultRow)
_, err := tu.rows.AddRow(ctx, tu.resultRow)
return err
}

Expand Down Expand Up @@ -307,7 +280,7 @@ func (tu *optTableUpserter) updateConflictingRow(
}

// We only need a result row if we're collecting rows.
if !tu.collectRows {
if !tu.rowsNeeded {
return nil
}

Expand Down Expand Up @@ -335,7 +308,7 @@ func (tu *optTableUpserter) updateConflictingRow(

// The resulting row may have nil values for columns that aren't
// being upserted, updated or fetched.
_, err = tu.rowsUpserted.AddRow(ctx, tu.resultRow)
_, err = tu.rows.AddRow(ctx, tu.resultRow)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (n *upsertNode) processSourceRow(params runParams, rowVals tree.Datums) err
func (n *upsertNode) BatchedCount() int { return n.run.tw.lastBatchSize }

// BatchedValues implements the batchedPlanNode interface.
func (n *upsertNode) BatchedValues(rowIdx int) tree.Datums { return n.run.tw.batchedValues(rowIdx) }
func (n *upsertNode) BatchedValues(rowIdx int) tree.Datums { return n.run.tw.rows.At(rowIdx) }

func (n *upsertNode) Close(ctx context.Context) {
n.source.Close(ctx)
Expand Down

0 comments on commit 01e6a9a

Please sign in to comment.