Skip to content

Commit

Permalink
sql: fix panic in insert on conflict do update
Browse files Browse the repository at this point in the history
There was an assumption that the row inserter and row updater had the same
columns in the same order and if that wasn't the case row inserter's columns
would be larger.

This assumption is incorrect, so to fix it, when storing the existing rows
(which are used to see if they conflicts with other rows further on in the
statement) instead of storing the row used for inputs, convert the rows
directly to those required by the row updater.

Fixes #32834.

Release note (bug fix): Fixed an issue in which if a nullable column wasn't
supplied in an INSERT ON CONFLICT DO UPDATE statement, it may cause a panic.
  • Loading branch information
BramGruneir committed Jan 2, 2019
1 parent 28340a5 commit 16071de
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 7 deletions.
48 changes: 48 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/upsert
Original file line number Diff line number Diff line change
Expand Up @@ -727,3 +727,51 @@ x

statement ok
COMMIT

subtest regression_32834

statement ok
CREATE TABLE test_table (
id BIGINT PRIMARY KEY,
a BIGINT,
b BIGINT
);

statement ok
INSERT INTO test_table (id, a) VALUES
(123, 456), (123, 456)
ON CONFLICT (id) DO UPDATE SET a = EXCLUDED.a;

query II colnames
SELECT id, a
FROM test_table
----
id a
123 456

statement ok
INSERT INTO test_table (id, a) VALUES
(123, 1), (123, 2), (123, 1)
ON CONFLICT (id) DO UPDATE SET a = EXCLUDED.a;

query II colnames
SELECT id, a
FROM test_table
----
id a
123 1

statement ok
INSERT INTO test_table (id, a) VALUES
(123, 1), (123, 2), (123, 3)
ON CONFLICT (id) DO UPDATE SET a = EXCLUDED.a;

query II colnames
SELECT id, a
FROM test_table
----
id a
123 3

statement ok
DROP TABLE test_table;
24 changes: 17 additions & 7 deletions pkg/sql/tablewriter_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,7 @@ func (tu *tableUpserter) atBatchEnd(ctx context.Context, traceKV bool) error {
existingRow := existingRows[conflictingRowIdx]

// Check the ON CONFLICT DO UPDATE WHERE ... clause.
conflictingRowValues := existingRow[:len(tu.ru.FetchCols)]
shouldUpdate, err := tu.evaler.shouldUpdate(insertRow, conflictingRowValues)
shouldUpdate, err := tu.evaler.shouldUpdate(insertRow, existingRow)
if err != nil {
return err
}
Expand All @@ -430,7 +429,7 @@ func (tu *tableUpserter) atBatchEnd(ctx context.Context, traceKV bool) error {
// We know there was a row already, and we know we need to update it. Do it.
resultRow, existingRows, err = tu.updateConflictingRow(
ctx, tu.b, insertRow,
conflictingRowPK, conflictingRowIdx, conflictingRowValues,
conflictingRowPK, conflictingRowIdx, existingRow,
existingRows, pkToRowIdx,
tableDesc, traceKV)
if err != nil {
Expand Down Expand Up @@ -609,7 +608,7 @@ func (tu *tableUpserter) updateConflictingRow(
pkChanged = true

// Now add the new one.
existingRows = appendKnownConflictingRow(updatedRow, updatedConflictingRowPK, existingRows, pkToRowIdx)
existingRows = tu.appendKnownConflictingRow(updatedRow, updatedConflictingRowPK, existingRows, pkToRowIdx)
}

if pkChanged {
Expand Down Expand Up @@ -668,7 +667,7 @@ func (tu *tableUpserter) insertNonConflictingRow(
}

// Then remember it for further upserts.
existingRows = appendKnownConflictingRow(insertRow, conflictingRowPK, existingRows, pkToRowIdx)
existingRows = tu.appendKnownConflictingRow(insertRow, conflictingRowPK, existingRows, pkToRowIdx)

if !tu.collectRows {
return nil, existingRows, nil
Expand All @@ -683,11 +682,22 @@ func (tu *tableUpserter) insertNonConflictingRow(

// appendKnownConflictingRow adds a new row to existingRows and
// remembers its position in pkToRowIdx.
func appendKnownConflictingRow(
func (tu *tableUpserter) appendKnownConflictingRow(
newRow tree.Datums, newRowPK roachpb.Key, existingRows []tree.Datums, pkToRowIdx map[string]int,
) (newExistingRows []tree.Datums) {
pkToRowIdx[string(newRowPK)] = len(existingRows)
return append(existingRows, newRow)
// We need to convert the new row to match the fetch columns required for
// checking if there is a conflict.
cleanedRow := make(tree.Datums, len(tu.fetchColIDtoRowIndex))
for fetchColID, fetchRowIndex := range tu.fetchColIDtoRowIndex {
insertRowIndex, ok := tu.ri.InsertColIDtoRowIndex[fetchColID]
if ok {
cleanedRow[fetchRowIndex] = newRow[insertRowIndex]
} else {
cleanedRow[fetchRowIndex] = tree.DNull
}
}
return append(existingRows, cleanedRow)
}

// getConflictingRowPK returns the primary key of the row that may
Expand Down

0 comments on commit 16071de

Please sign in to comment.