Skip to content

Commit

Permalink
sql: fix UPSERT with columns after partial index PUT and DEL columns
Browse files Browse the repository at this point in the history
This commit fixes a bug in UPSERT execution that was caused by the
incorrect assumption that there are never columns following the partial
index PUT and DEL columns in the mutation's input.

I was unable to reproduce this bug for INSERTs or DELETEs. However, I
updated the execution code for both out of caution. The logic for both
no longer makes the same assumption that partial index PUT and DEL
columns will be the last columns in the input.

Fixes cockroachdb#61414

Release justification: This is a low-risk fix for a bug causing UPSERT
statements to err when executed on tables with partial indexes.

Release note (bug fix): A bug which caused UPSERT and INSERT..ON
CONFLICT..DO UPDATE statements to fail on tables with both partial
indexes and foreign key references has been fixed. This bug has been
present since version 20.2.0.
  • Loading branch information
mgartner committed Mar 4, 2021
1 parent 91d3444 commit 81abb62
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 20 deletions.
6 changes: 3 additions & 3 deletions pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ func (d *deleteNode) processSourceRow(params runParams, sourceVals tree.Datums)
// satisfy the predicate and therefore do not exist in the partial index.
// This set is passed as a argument to tableDeleter.row below.
var pm row.PartialIndexUpdateHelper
partialIndexOrds := d.run.td.tableDesc().PartialIndexOrds()
if !partialIndexOrds.Empty() {
partialIndexDelVals := sourceVals[d.run.partialIndexDelValsOffset:]
if n := d.run.td.tableDesc().PartialIndexOrds().Len(); n > 0 {
offset := d.run.partialIndexDelValsOffset
partialIndexDelVals := sourceVals[offset : offset+n]

err := pm.Init(tree.Datums{}, partialIndexDelVals, d.run.td.tableDesc())
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (r *insertRun) processSourceRow(params runParams, rowVals tree.Datums) erro
// written to when they are partial indexes and the row does not satisfy the
// predicate. This set is passed as a parameter to tableInserter.row below.
var pm row.PartialIndexUpdateHelper
partialIndexOrds := r.ti.tableDesc().PartialIndexOrds()
if !partialIndexOrds.Empty() {
partialIndexPutVals := rowVals[len(r.insertCols)+r.checkOrds.Len():]
if n := r.ti.tableDesc().PartialIndexOrds().Len(); n > 0 {
offset := len(r.insertCols) + r.checkOrds.Len()
partialIndexPutVals := rowVals[offset : offset+n]

err := pm.Init(partialIndexPutVals, tree.Datums{}, r.ti.tableDesc())
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/partial_index
Original file line number Diff line number Diff line change
Expand Up @@ -1428,3 +1428,31 @@ CREATE TABLE t58390 (

statement ok
ALTER TABLE t58390 ALTER PRIMARY KEY USING COLUMNS (b, a)

# Regression tests for #61414. Upsert execution should not error if partial
# index PUT and DEL columns are not the last columns in the input of the
# mutation.

statement ok
create table t61414_a (
k INT PRIMARY KEY
)

statement ok
create table t61414_b (
k INT PRIMARY KEY,
a STRING,
b INT REFERENCES t61414_a(k),
UNIQUE INDEX (a, b),
INDEX (a) WHERE a = 'x'
)

statement ok
INSERT INTO t61414_a VALUES (2)

statement ok
INSERT INTO t61414_b (k, a, b)
VALUES (1, 'a', 2)
ON CONFLICT (a, b) DO UPDATE SET a = excluded.a
WHERE t61414_b.a = 'x'
RETURNING k
11 changes: 5 additions & 6 deletions pkg/sql/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,11 @@ func (u *updateNode) processSourceRow(params runParams, sourceVals tree.Datums)
// Create a set of partial index IDs to not add entries or remove entries
// from.
var pm row.PartialIndexUpdateHelper
partialIndexOrds := u.run.tu.tableDesc().PartialIndexOrds()
if !partialIndexOrds.Empty() {
partialIndexValOffset := len(u.run.tu.ru.FetchCols) + len(u.run.tu.ru.UpdateCols) + u.run.checkOrds.Len() + u.run.numPassthrough
partialIndexVals := sourceVals[partialIndexValOffset:]
partialIndexPutVals := partialIndexVals[:partialIndexOrds.Len()]
partialIndexDelVals := partialIndexVals[partialIndexOrds.Len() : partialIndexOrds.Len()*2]
if n := u.run.tu.tableDesc().PartialIndexOrds().Len(); n > 0 {
offset := len(u.run.tu.ru.FetchCols) + len(u.run.tu.ru.UpdateCols) + u.run.checkOrds.Len() + u.run.numPassthrough
partialIndexVals := sourceVals[offset:]
partialIndexPutVals := partialIndexVals[:n]
partialIndexDelVals := partialIndexVals[n : n*2]

err := pm.Init(partialIndexPutVals, partialIndexDelVals, u.run.tu.tableDesc())
if err != nil {
Expand Down
15 changes: 7 additions & 8 deletions pkg/sql/upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,14 @@ func (n *upsertNode) processSourceRow(params runParams, rowVals tree.Datums) err

// Create a set of partial index IDs to not add or remove entries from.
var pm row.PartialIndexUpdateHelper
partialIndexOrds := n.run.tw.tableDesc().PartialIndexOrds()
if !partialIndexOrds.Empty() {
partialIndexValOffset := len(n.run.insertCols) + len(n.run.tw.fetchCols) + len(n.run.tw.updateCols) + n.run.checkOrds.Len()
if numPartialIndexes := n.run.tw.tableDesc().PartialIndexOrds().Len(); numPartialIndexes > 0 {
offset := len(n.run.insertCols) + len(n.run.tw.fetchCols) + len(n.run.tw.updateCols) + n.run.checkOrds.Len()
if n.run.tw.canaryOrdinal != -1 {
partialIndexValOffset++
offset++
}
partialIndexVals := rowVals[partialIndexValOffset:]
partialIndexPutVals := partialIndexVals[:len(partialIndexVals)/2]
partialIndexDelVals := partialIndexVals[len(partialIndexVals)/2:]
partialIndexVals := rowVals[offset:]
partialIndexPutVals := partialIndexVals[:numPartialIndexes]
partialIndexDelVals := partialIndexVals[numPartialIndexes : numPartialIndexes*2]

err := pm.Init(partialIndexPutVals, partialIndexDelVals, n.run.tw.tableDesc())
if err != nil {
Expand All @@ -163,7 +162,7 @@ func (n *upsertNode) processSourceRow(params runParams, rowVals tree.Datums) err

// Truncate rowVals so that it no longer includes partial index predicate
// values.
rowVals = rowVals[:partialIndexValOffset]
rowVals = rowVals[:offset]
}

// Verify the CHECK constraints by inspecting boolean columns from the input that
Expand Down

0 comments on commit 81abb62

Please sign in to comment.