Skip to content

Commit

Permalink
sql: fix large UPSERTs
Browse files Browse the repository at this point in the history
In cockroachdb#51944 I fixed a bug but introduced an even worse one. The original
bug was that if an UPSERT has RETURNING clause, we could have returned
incorrect values because internally `resultCount` and `rowsUpserted`
could be not synchronized. It was fixed by resetting `resultCount`,
however, the resetting was done unconditionally. This is incorrect
because `resultCount` variable is used by `upsertNode.BatchedNext` to
indicate whether there is more work to do (and if `resultCount==0`, then
we should finish). This bug would result in an UPSERT with or without
RETURNING clause of more than 10k rows actually process only 10k and
exit early. This is now fixed.

Relatedly, an UPSERT with RETURNING clause would incorrectly return no
rows when it was processing more than 10k rows.

Additionally, cockroachdb#51626 fixed a bug with pagination of UPSERTs which
exposed another bug when RETURNING clause is present - we were clearing
`rowsUpserted` in `BatchedNext` (as part of `flushAndStartNewBatch`
call), but that clear happens too early - we are accessing it after
`BatchedNext` returns with `BatchedValues`. This would lead to an index
out of bounds crush. Before cockroachdb#51626 there was no pagination done, so we
always had a single batch and `flushAndStartNewBatch` was never called
(to reset the row container too early). This is also now fixed. Note
that this second bug was impossible to run into because of the first one
(we would never reach this code).

Release note (bug fix): CockroachDB in 20.1.4 and 20.1.5 releases could
finish UPSERT operation too early - namely, it would correctly insert
only up to 10000 rows and ignoring the rest. Furthermore, an UPSERT with
RETURNING clause in such scenario would return no rows (it would only
process 10k rows but return 0 rows).
  • Loading branch information
yuzefovich committed Sep 16, 2020
1 parent 9910e71 commit a1f667a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 23 deletions.
15 changes: 15 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/upsert
Original file line number Diff line number Diff line change
Expand Up @@ -1234,3 +1234,18 @@ statement ok
RESET CLUSTER SETTING kv.raft.command.max_size;
DROP TABLE src;
DROP TABLE dest

# Regression test for finishing UPSERT too early (#54456).
statement ok
CREATE TABLE t54456 (c INT PRIMARY KEY);
UPSERT INTO t54456 SELECT i FROM generate_series(1, 25000) AS i

query I
SELECT count(*) FROM t54456
----
25000

query I
WITH cte(c) AS (UPSERT INTO t54456 SELECT i FROM generate_series(25001, 40000) AS i RETURNING c) SELECT count(*) FROM cte
----
15000
23 changes: 4 additions & 19 deletions pkg/sql/tablewriter_upsert_opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ type optTableUpserter struct {
// collectRows is true.
insertReorderingRequired bool

// resultCount is the number of upserts. Mirrors rowsUpserted.Len() if
// collectRows is set, counted separately otherwise.
resultCount int
// rowsInLastProcessedBatch tracks the number of upserts that were
// performed in the last processed batch. If collectRows is true, it will
// be equal to rowsUpserted.Len() after the batch has been created.
rowsInLastProcessedBatch int

// fetchCols indicate which columns need to be fetched from the target table,
// in order to detect whether a conflict has occurred, as well as to provide
Expand Down Expand Up @@ -148,24 +149,9 @@ func (tu *optTableUpserter) init(

// flushAndStartNewBatch is part of the tableWriter interface.
func (tu *optTableUpserter) flushAndStartNewBatch(ctx context.Context) error {
tu.resultCount = 0
if tu.collectRows {
tu.rowsUpserted.Clear(ctx)
}
return tu.tableWriterBase.flushAndStartNewBatch(ctx, tu.tableDesc())
}

// batchedCount is part of the batchedTableWriter interface.
func (tu *optTableUpserter) batchedCount() int { return tu.resultCount }

// batchedValues is part of the batchedTableWriter interface.
func (tu *optTableUpserter) batchedValues(rowIdx int) tree.Datums {
if !tu.collectRows {
panic("return row requested but collect rows was not set")
}
return tu.rowsUpserted.At(rowIdx)
}

// close is part of the tableWriter interface.
func (tu *optTableUpserter) close(ctx context.Context) {
if tu.rowsUpserted != nil {
Expand Down Expand Up @@ -210,7 +196,6 @@ func (*optTableUpserter) desc() string { return "opt upserter" }
// row is part of the tableWriter interface.
func (tu *optTableUpserter) row(ctx context.Context, row tree.Datums, traceKV bool) error {
tu.batchSize++
tu.resultCount++

// Consult the canary column to determine whether to insert or update. For
// more details on how canary columns work, see the block comment on
Expand Down
23 changes: 19 additions & 4 deletions pkg/sql/upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)

var upsertNodePool = sync.Pool{
Expand Down Expand Up @@ -83,6 +84,11 @@ func (n *upsertNode) BatchedNext(params runParams) (bool, error) {

tracing.AnnotateTrace()

// Advance one batch. First, clear the current batch.
if n.run.tw.collectRows {
n.run.tw.rowsUpserted.Clear(params.ctx)
}

// Now consume/accumulate the rows for this batch.
lastBatch := false
for {
Expand Down Expand Up @@ -136,13 +142,17 @@ func (n *upsertNode) BatchedNext(params runParams) (bool, error) {
n.run.done = true
}

// We've just finished processing this batch, and we need to remember how
// many rows were in it.
n.run.tw.rowsInLastProcessedBatch = batchSize

// Possibly initiate a run of CREATE STATISTICS.
params.ExecCfg().StatsRefresher.NotifyMutation(
n.run.tw.tableDesc().ID,
n.run.tw.batchedCount(),
n.run.tw.rowsInLastProcessedBatch,
)

return n.run.tw.batchedCount() > 0, nil
return n.run.tw.rowsInLastProcessedBatch > 0, nil
}

// processSourceRow processes one row from the source for upsertion.
Expand Down Expand Up @@ -172,10 +182,15 @@ func (n *upsertNode) processSourceRow(params runParams, rowVals tree.Datums) err
}

// BatchedCount implements the batchedPlanNode interface.
func (n *upsertNode) BatchedCount() int { return n.run.tw.batchedCount() }
func (n *upsertNode) BatchedCount() int { return n.run.tw.rowsInLastProcessedBatch }

// BatchedValues implements the batchedPlanNode interface.
func (n *upsertNode) BatchedValues(rowIdx int) tree.Datums { return n.run.tw.batchedValues(rowIdx) }
func (n *upsertNode) BatchedValues(rowIdx int) tree.Datums {
if !n.run.tw.collectRows {
panic(errors.AssertionFailedf("return row requested but collect rows was not set"))
}
return n.run.tw.rowsUpserted.At(rowIdx)
}

func (n *upsertNode) Close(ctx context.Context) {
n.source.Close(ctx)
Expand Down

0 comments on commit a1f667a

Please sign in to comment.