diff --git a/pkg/sql/logictest/testdata/logic_test/upsert b/pkg/sql/logictest/testdata/logic_test/upsert index f6691774ae0a..dac5d5832e8a 100644 --- a/pkg/sql/logictest/testdata/logic_test/upsert +++ b/pkg/sql/logictest/testdata/logic_test/upsert @@ -40,23 +40,33 @@ INSERT INTO kv VALUES (4, 10) ON CONFLICT DO UPDATE SET v = kv.v + 20 statement error duplicate key value \(k\)=\(3\) violates unique constraint "primary" INSERT INTO kv VALUES (2, 10) ON CONFLICT (k) DO UPDATE SET k = 3, v = 10 -# Until #23660 is fixed. -statement error UPSERT/ON CONFLICT DO UPDATE command cannot affect row a second time -UPSERT INTO kv VALUES (10, 10), (10, 11) - statement ok INSERT INTO kv VALUES (9, 9) ON CONFLICT (k) DO UPDATE SET (k, v) = (excluded.k + 2, excluded.v + 3) +statement ok +UPSERT INTO kv VALUES (10, 10), (10, 11) + +query II +UPSERT INTO kv VALUES (11, 10), (11, 12) RETURNING k, v +---- +11 10 +11 12 + statement ok INSERT INTO kv VALUES (13, 13), (7, 8) ON CONFLICT (k) DO NOTHING statement error there is no unique or exclusion constraint matching the ON CONFLICT specification INSERT INTO kv VALUES (13, 13), (7, 8) ON CONFLICT DO NOTHING -# Until #23660 is fixed. -statement error UPSERT/ON CONFLICT DO UPDATE command cannot affect row a second time +statement ok INSERT INTO kv VALUES (14, 14), (14, 15) ON CONFLICT (k) DO UPDATE SET v = excluded.v + 1 +statement ok +INSERT INTO kv VALUES (15, 15), (15, 16) ON CONFLICT (k) DO UPDATE SET k = excluded.k * 10 + +statement ok +INSERT INTO kv VALUES (16, 16), (16, 17) ON CONFLICT (k) DO UPDATE SET k = excluded.k * 10, v = excluded.v + query II SELECT * FROM kv ORDER BY (k, v) ---- @@ -66,15 +76,35 @@ SELECT * FROM kv ORDER BY (k, v) 4 24 6 6 7 7 +10 11 11 12 13 13 +14 16 +150 15 +160 17 -query II +query II rowsort +UPSERT INTO kv(k) VALUES (6), (8) RETURNING k,v +---- +8 NULL + +query II rowsort INSERT INTO kv VALUES (10, 10), (11, 11) ON CONFLICT (k) DO UPDATE SET v = excluded.v RETURNING * ---- 10 10 11 11 +query II rowsort +INSERT INTO kv VALUES (10, 2), (10, 3) ON CONFLICT (k) DO UPDATE SET v = excluded.v + kv.v RETURNING * +---- +10 12 +10 15 + +query II rowsort +INSERT INTO kv VALUES (10, 14), (15, 15) ON CONFLICT (k) DO NOTHING RETURNING * +---- +15 15 + statement ok CREATE TABLE abc ( a INT, @@ -97,6 +127,11 @@ INSERT INTO abc VALUES (1, 2, 3) statement ok INSERT INTO abc VALUES (1, 2, 3) ON CONFLICT (c) DO UPDATE SET a = 4 +query III +SELECT * FROM abc +---- +4 2 3 + statement ok INSERT INTO abc VALUES (1, 2, 3) ON CONFLICT (c) DO UPDATE SET b = 5 @@ -179,7 +214,7 @@ statement ok INSERT INTO upsert_returning VALUES (1, 1, NULL) # Handle INSERT ... ON CONFLICT ... RETURNING -query IIII +query IIII rowsort INSERT INTO upsert_returning (a, c) VALUES (1, 1), (2, 2) ON CONFLICT (a) DO UPDATE SET c = excluded.c RETURNING * ---- 1 1 1 -1 @@ -192,7 +227,7 @@ INSERT INTO upsert_returning (a, c) VALUES (1, 1), (3, 3) ON CONFLICT (a) DO NOT 3 NULL 3 -1 # Handle UPSERT ... RETURNING -query IIII +query IIII rowsort UPSERT INTO upsert_returning (a, c) VALUES (1, 10), (3, 30) RETURNING * ---- 1 1 10 -1 @@ -210,7 +245,7 @@ INSERT INTO upsert_returning (a, b) VALUES (1, 1) ON CONFLICT (a) DO UPDATE SET 3 # Handle expressions within returning clause -query I +query I rowsort UPSERT INTO upsert_returning (a, b) VALUES (1, 2), (2, 3), (4, 3) RETURNING a+b+d ---- 2 @@ -218,7 +253,7 @@ UPSERT INTO upsert_returning (a, b) VALUES (1, 2), (2, 3), (4, 3) RETURNING a+b+ 6 # Handle upsert fast path with autocommit -query IIII +query IIII rowsort UPSERT INTO upsert_returning VALUES (1, 2, 3, 4), (5, 6, 7, 8) RETURNING * ---- 1 2 3 4 @@ -228,7 +263,7 @@ UPSERT INTO upsert_returning VALUES (1, 2, 3, 4), (5, 6, 7, 8) RETURNING * statement ok BEGIN -query IIII +query IIII rowsort upsert INTO upsert_returning VALUES (1, 5, 4, 3), (6, 5, 4, 3) RETURNING * ---- 1 5 4 3 diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index 74d0393794bf..978953b68f85 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -291,6 +291,9 @@ type tableUpserter struct { rowsUpserted *sqlbase.RowContainer // rowTemplate is used to prepare rows to add to rowsUpserted. rowTemplate tree.Datums + // rowIdxToRetIdx maps the indices in the inserted rows + // back to indices in rowTemplate. + rowIdxToRetIdx []int // For allocation avoidance. indexKeyPrefix []byte @@ -328,6 +331,16 @@ func (tu *tableUpserter) init(txn *client.Txn, evalCtx *tree.EvalContext) error tu.rowTemplate = make(tree.Datums, len(tableDesc.Columns)) } + colIDToRetIndex := map[sqlbase.ColumnID]int{} + for i, col := range tableDesc.Columns { + colIDToRetIndex[col.ID] = i + } + + tu.rowIdxToRetIdx = make([]int, len(tu.ri.InsertCols)) + for i, col := range tu.ri.InsertCols { + tu.rowIdxToRetIdx[i] = colIDToRetIndex[col.ID] + } + // TODO(dan): This could be made tighter, just the rows needed for the ON // CONFLICT and RETURNING exprs. requestedCols := tableDesc.Columns @@ -395,122 +408,134 @@ func (tu *tableUpserter) row( func (tu *tableUpserter) finalize( ctx context.Context, autoCommit autoCommitOpt, traceKV bool, ) (*sqlbase.RowContainer, error) { - tableDesc := tu.tableDesc() - existingRows, err := tu.fetchExisting(ctx, traceKV) + // Fetch the information about which rows in tu.insertRows currently + // conflict with rows in-db. + existingRows, pkToRowIdx, conflictingPKs, err := tu.fetchExisting(ctx, traceKV) if err != nil { return nil, err } - colIDToRetIndex := map[sqlbase.ColumnID]int{} - for i, col := range tableDesc.Columns { - colIDToRetIndex[col.ID] = i - } + // At this point existingRows contains data for the conflicting + // rows, and pkToRowIdx maps PKs in tu.insertRows that are known to + // have a conflict to an entry in existingRows. - rowIdxToRetIdx := make([]int, len(tu.ri.InsertCols)) - for i, col := range tu.ri.InsertCols { - rowIdxToRetIdx[i] = colIDToRetIndex[col.ID] - } + // During the upsert processing below, existingRows will contain + // initially the data from KV, but will be extended with each new + // inserted row that didn't exist in KV. Then each update will + // modify existingRows in-place, so that subsequent updates can + // discover the modified values. + tableDesc := tu.tableDesc() b := tu.txn.NewBatch() + for i := 0; i < tu.insertRows.Len(); i++ { insertRow := tu.insertRows.At(i) - existingRow := existingRows[i] - if existingRow == nil { - err := tu.ri.InsertRow(ctx, b, insertRow, false, sqlbase.CheckFKs, traceKV) - if err != nil { - return nil, err + // conflictingRowPK will be the key of the conflicting row. This may + // be different from insertRow's PK if the conflicting index is a + // secondary index. + conflictingRowPK, err := tu.getConflictingRowPK(insertRow, i, conflictingPKs, tableDesc) + if err != nil { + return nil, err + } + + // At this point, conflictingRowPK is either: + // - nil if a secondary index was used and it was determined there + // is no conflict already; + // - non-nil if a conflict may be present. In that case + // we must consult pkToRowIdx to determine whether we already + // have data (i.e. a conflict) in existingRows. + + // conflictingRowIdx will be set to a valid value if a conflict is + // detected. + conflictingRowIdx := -1 + if conflictingRowPK != nil { + if rowIdx, ok := pkToRowIdx[string(conflictingRowPK)]; ok { + // There was a conflict after all. + conflictingRowIdx = rowIdx } + } - if tu.collectRows { - // Pre-fill with NULLs. - for i := range tu.rowTemplate { - tu.rowTemplate[i] = tree.DNull - } - // Fill the other values from insertRow. - for i, val := range insertRow { - tu.rowTemplate[rowIdxToRetIdx[i]] = val - } + // We'll use resultRow to produce a RETURNING row below, if one is needed. + var resultRow tree.Datums - _, err = tu.rowsUpserted.AddRow(ctx, tu.rowTemplate) - if err != nil { - return nil, err - } + // Do we have a conflict? + if conflictingRowIdx == -1 { + // We don't have a conflict. This is a new row in KV. Create it. + resultRow, existingRows, err = tu.insertNonConflictingRow( + ctx, b, insertRow, conflictingRowPK, existingRows, pkToRowIdx, tableDesc, traceKV) + if err != nil { + return nil, err } } else { + // There was a row already. Do we need to update it? + if len(tu.updateCols) == 0 { - // If len(tu.updateCols) == 0, then we're in the DO NOTHING case. - // There is no update to be done. + // If len(tu.updateCols) == 0, then we're in the DO NOTHING + // case. There is no update to be done, also no result row to be collected. + // See the pg docs, e.g.: https://www.postgresql.org/docs/10/static/sql-insert.html + // + // The optional RETURNING clause causes INSERT to compute and + // return value(s) based on each row actually inserted (or + // updated, if an ON CONFLICT DO UPDATE clause was used). This + // is primarily useful for obtaining values that were supplied + // by defaults, such as a serial sequence number. However, any + // expression using the table's columns is allowed. The syntax + // of the RETURNING list is identical to that of the output list + // of SELECT. Only rows that were successfully inserted or + // updated will be returned. + // continue } + // This is the ON CONFLICT DO UPDATE ... clause. + // + // However we still don't know yet whether to do an update; + // we'll need to ask the WHERE clause first, if any. + + // existingRow carries the values previously seen in + // KV or newly inserted earlier in this batch. + existingRow := existingRows[conflictingRowIdx] + // Check the ON CONFLICT DO UPDATE WHERE ... clause. - existingValues := existingRow[:len(tu.ru.FetchCols)] - shouldUpdate, err := tu.evaler.shouldUpdate(insertRow, existingValues) + conflictingRowValues := existingRow[:len(tu.ru.FetchCols)] + shouldUpdate, err := tu.evaler.shouldUpdate(insertRow, conflictingRowValues) if err != nil { return nil, err } if !shouldUpdate { // WHERE tells us there is nothing to do. Stop here. + // There is also no RETURNING result. + // See https://www.postgresql.org/docs/10/static/sql-insert.html and the + // quoted excerpt above. continue } - // Process the UPDATE ON CONFLICT. - - // First compute all the updates via SET (or the pseudo-SET generated - // for UPSERT statements). - updateValues, err := tu.evaler.eval(insertRow, existingValues, tu.updateValues) + // We know there was a row already, and we know we need to update it. Do it. + resultRow, existingRows, err = tu.updateConflictingRow( + ctx, b, insertRow, + conflictingRowPK, conflictingRowIdx, conflictingRowValues, + existingRows, pkToRowIdx, + tableDesc, traceKV) if err != nil { return nil, err } + } - // Now (re-)compute computed columns. This appends the computed - // columns at the end of updateValues. - // - // TODO(justin): We're currently wasteful here: we construct the - // result row *twice* because we need it once to evaluate any computed - // columns and again to actually perform the update. we need to find a - // way to reuse it. I'm not sure right now how best to factor this - - // suggestions welcome. - if tu.anyComputed { - newValues := make([]tree.Datum, len(existingValues)) - copy(newValues, existingValues) - for i, updateValue := range updateValues { - newValues[tu.ru.FetchColIDtoRowIndex[tu.ru.UpdateCols[i].ID]] = updateValue - } - - // Now that we have a complete row except for its computed columns, - // since the computed columns are at the end of the update row, we - // must evaluate the computed columns and add the results to the end - // of updateValues. - updateValues, err = tu.evaler.evalComputedCols(newValues, updateValues) - if err != nil { - return nil, err - } - } - - // Queue the update in KV. This also returns an "update row" - // containing the updated values for every column in the - // table. This is useful for RETURNING, which we collect below. - updatedRow, err := tu.ru.UpdateRow( - ctx, b, existingValues, updateValues, sqlbase.CheckFKs, traceKV, - ) + // Do we need to remember a result for RETURNING? + if tu.collectRows { + // Yes, collect it. + _, err = tu.rowsUpserted.AddRow(ctx, resultRow) if err != nil { return nil, err } - - if tu.collectRows { - _, err = tu.rowsUpserted.AddRow(ctx, updatedRow) - if err != nil { - return nil, err - } - } - - // Keep the slice for reuse. - tu.updateValues = updateValues[:0] } } + // The upsert resolution is finished. + // This has prepared a KV batch in b. + // Now run/commit the KV batch. + if autoCommit == autoCommitEnabled { // An auto-txn can commit the transaction with the batch. This is an // optimization to avoid an extra round-trip to the transaction @@ -525,10 +550,267 @@ func (tu *tableUpserter) finalize( return tu.rowsUpserted, nil } -// upsertRowPKs returns the primary keys of any rows with potential upsert -// conflicts. -func (tu *tableUpserter) upsertRowPKs(ctx context.Context, traceKV bool) ([]roachpb.Key, error) { - upsertRowPKs := make([]roachpb.Key, tu.insertRows.Len()) +// updateConflictingRow updates the existing row +// in the table, when there was a conflict. +// Inputs: +// - b is the KV batch to use for the insert. +// - insertRow is the new row to upsert, containing the "excluded" values. +// - conflictingRowPK is the PK of the previously seen conflicting row. +// - conflictingRowIdx is the index of the values of the previously seen conflicting row in existingRows. +// - conflictingRowValues is the prefetched existingRows[conflictingRowIdx]. +// Outputs: +// - resultRow is the row that was updated, shaped in the order +// of the table descriptor. This may be different than the +// shape of insertRow if there are nullable columns. +// Input/Outputs: +// - existingRows contains the previously seen rows, and is modified +// or extended depending on how the PK columns are updated by the SET +// clauses. +// - pkToRowIdx is extended with the index of the new entry in existingRows. +func (tu *tableUpserter) updateConflictingRow( + ctx context.Context, + b *client.Batch, + insertRow tree.Datums, + conflictingRowPK roachpb.Key, + conflictingRowIdx int, + conflictingRowValues tree.Datums, + existingRows []tree.Datums, + pkToRowIdx map[string]int, + tableDesc *sqlbase.TableDescriptor, + traceKV bool, +) (resultRow tree.Datums, newExistingRows []tree.Datums, err error) { + // First compute all the updates via SET (or the pseudo-SET generated + // for UPSERT statements). + updateValues, err := tu.evaler.eval(insertRow, conflictingRowValues, tu.updateValues) + if err != nil { + return nil, nil, err + } + + // Do we need to (re-)compute computed columns? + if tu.anyComputed { + // Yes, do it. This appends the + // computed columns at the end of updateValues. + // + // TODO(justin): We're currently wasteful here: we construct the + // result row *twice* because we need it once to evaluate any computed + // columns and again to actually perform the update. we need to find a + // way to reuse it. I'm not sure right now how best to factor this - + // suggestions welcome. + // TODO(nathan/knz): Reuse a row buffer here. + newValues := make([]tree.Datum, len(conflictingRowValues)) + copy(newValues, conflictingRowValues) + for i, updateValue := range updateValues { + newValues[tu.ru.FetchColIDtoRowIndex[tu.ru.UpdateCols[i].ID]] = updateValue + } + + // Now that we have a complete row except for its computed columns, + // since the computed columns are at the end of the update row, we + // must evaluate the computed columns and add the results to the end + // of updateValues. + updateValues, err = tu.evaler.evalComputedCols(newValues, updateValues) + if err != nil { + return nil, nil, err + } + } + + // Queue the update in KV. This also returns an "update row" + // containing the updated values for every column in the + // table. This is useful for RETURNING, which we collect below. + updatedRow, err := tu.ru.UpdateRow( + ctx, b, conflictingRowValues, updateValues, sqlbase.CheckFKs, traceKV, + ) + if err != nil { + return nil, nil, err + } + + // Keep the slice for reuse. + tu.updateValues = updateValues[:0] + + // Maybe the PK was updated by SET. We need to recompute a fresh PK + // for the current row from updatedRow. We use + // tu.evaler.ccIvarContainer.mapping which contains the suitable + // mapping for the table columns already. + updatedConflictingRowPK, _, err := sqlbase.EncodeIndexKey( + tableDesc, &tableDesc.PrimaryIndex, tu.evaler.ccIvarContainer.mapping, updatedRow, tu.indexKeyPrefix) + if err != nil { + return nil, nil, err + } + + // It's possible that the PK for the updated values is different + // from the PK for the original conflicting row, if the SET has + // updated some of the PK columns. We need to detect that in + // pkChanged. + var pkChanged bool + + // Now update the known data in existingRows. + // Perhaps we just also inserted a new row. + if updatedConflictingRowIdx, ok := pkToRowIdx[string(updatedConflictingRowPK)]; ok { + // This case indicates that the (possibly new) PK of the updated + // row was already seen in a previous iteration (by a previous + // upsert resolution). + // + // We need to update that known copy, so that subsequent + // iterations can find it. + copy(existingRows[updatedConflictingRowIdx], updatedRow) + + // The following line is meant to read: + // + // pkChanged = !bytes.Equal(updatedConflictingRowPK, conflictingRowPK) + // + // However we already know that the row indices in existingRows are different + // if the PKs are different, so we can compare the row indices instead + // for efficiency. + pkChanged = updatedConflictingRowIdx != conflictingRowIdx + } else { + // We're sure to have a new PK. + pkChanged = true + + // Now add the new one. + existingRows = appendKnownConflictingRow(updatedRow, updatedConflictingRowPK, existingRows, pkToRowIdx) + } + + if pkChanged { + // The previous PK is guaranteed to not exist any more. Remove it. + delete(pkToRowIdx, string(conflictingRowPK)) + } + + // We're done! + return updatedRow, existingRows, nil +} + +// insertNonConflictingRow inserts the source row insertRow +// into the table, when there was no conflict. +// Inputs: +// - b is the KV batch to use for the insert. +// - insertRow is the new row to insert. +// - conflictingRowPK is the PK of that new row, if it is known already +// (e.g. by getConflictingRowPK from the primary index). +// Outputs: +// - resultRow is the row that was inserted, shaped in the order +// of the table descriptor. This may be different than the +// shape of insertRow if there are nullablec olumns. +// Input/Outputs: +// - existingRows is extended with resultRow to produce newExistingRows. +// - pkToRowIdx is extended with the index of the new entry in existingRows. +func (tu *tableUpserter) insertNonConflictingRow( + ctx context.Context, + b *client.Batch, + insertRow tree.Datums, + conflictingRowPK roachpb.Key, + existingRows []tree.Datums, + pkToRowIdx map[string]int, + tableDesc *sqlbase.TableDescriptor, + traceKV bool, +) (resultRow tree.Datums, newExistingRows []tree.Datums, err error) { + // Perform the insert proper. + if err := tu.ri.InsertRow( + ctx, b, insertRow, false /* ignoreConflicts */, sqlbase.CheckFKs, traceKV); err != nil { + return nil, nil, err + } + + // We may not know the conflictingRowPK yet for the new row, for + // example when the conflicting index was a secondary index. + // In that case, compute it now. + if conflictingRowPK == nil { + conflictingRowPK, _, err = sqlbase.EncodeIndexKey( + tableDesc, &tableDesc.PrimaryIndex, tu.ri.InsertColIDtoRowIndex, insertRow, tu.indexKeyPrefix) + if err != nil { + return nil, nil, err + } + } + + // We now need a row that has the shape of the result row. + resultRow = tu.makeResultFromInsertRow(insertRow, tableDesc.Columns) + // Then remember it for further upserts. + existingRows = appendKnownConflictingRow(resultRow, conflictingRowPK, existingRows, pkToRowIdx) + + return resultRow, existingRows, nil +} + +// appendKnownConflictingRow adds a new row to existingRows and +// remembers its position in pkToRowIdx. +func appendKnownConflictingRow( + newRow tree.Datums, newRowPK roachpb.Key, existingRows []tree.Datums, pkToRowIdx map[string]int, +) (newExistingRows []tree.Datums) { + pkToRowIdx[string(newRowPK)] = len(existingRows) + return append(existingRows, newRow) +} + +// getConflictingRowPK returns the primary key of the row that may +// conflict with insertRow, if any. It returns a nil PK if there was +// definitely no conflict. +// +// rowIdx is the index of insertRow in tu.insertRows, and can be used +// to index conflictingPKs. +// +// conflictingPKs is an array of pre-computed PKs for each row in +// tu.insertRows, in the case the conflict was resolved using a +// secondary index. +func (tu *tableUpserter) getConflictingRowPK( + insertRow tree.Datums, + rowIdx int, + conflictingPKs map[int]roachpb.Key, + tableDesc *sqlbase.TableDescriptor, +) (conflictingRowPK roachpb.Key, err error) { + if conflictingPKs != nil { + // If a secondary index helped us find the conflicting PK for this + // row, use that information. A nil here indicates the row + // definitely did not exist (because it was not present in the + // secondary index). + return conflictingPKs[rowIdx], nil + } + + // Otherwise, encode the values to determine the primary key. + insertRowPK, _, err := sqlbase.EncodeIndexKey( + tableDesc, &tableDesc.PrimaryIndex, tu.ri.InsertColIDtoRowIndex, insertRow, tu.indexKeyPrefix) + return insertRowPK, err +} + +// makeResultFromInsertRow reshapes a row that was inserted by the +// data source (in tu.insertRow) to a row suitable for storing for a +// later RETURNING clause, shaped by the target table's descriptor. +// For example, the inserted row may not contain values for nullable +// columns. +func (tu *tableUpserter) makeResultFromInsertRow( + insertRow tree.Datums, cols []sqlbase.ColumnDescriptor, +) tree.Datums { + resultRow := insertRow + if len(resultRow) < len(cols) { + resultRow = make(tree.Datums, len(cols)) + // Pre-fill with NULLs. + for i := range resultRow { + resultRow[i] = tree.DNull + } + // Fill the other values from insertRow. + for i, val := range insertRow { + resultRow[tu.rowIdxToRetIdx[i]] = val + } + } + return resultRow +} + +// upsertRowPKs returns the primary key of every row in tu.insertRows +// with potential upsert conflicts. +// +// - if the conflicting index is the PK, the primary key for every +// row in tu.insertRow is computed (with no KV access) and returned. +// fetchExisting() will later determine whether the +// row is already present in KV or not with a lookup. +// +// - if the conflicting index is secondary, that index is used to look +// up the primary key. If the row is absent, no key is generated. +// +// The keys returned are guaranteed to be unique. +// +// The second return value is returned non-nil when the conflicting +// index is a secondary index. It maps each row in insertRow to a PK with which +// it conflicts. Note that this may not be the PK of the row in insertRow +// itself -- merely that of _some_ row that's in KV already with the same PK. +func (tu *tableUpserter) upsertRowPKs( + ctx context.Context, traceKV bool, +) ([]roachpb.Key, map[int]roachpb.Key, error) { + upsertRowPKs := make([]roachpb.Key, 0, tu.insertRows.Len()) + uniquePKs := make(map[string]struct{}) tableDesc := tu.tableDesc() if tu.conflictIndex.ID == tableDesc.PrimaryIndex.ID { @@ -537,18 +819,29 @@ func (tu *tableUpserter) upsertRowPKs(ctx context.Context, traceKV bool) ([]roac // conflicts. for i := 0; i < tu.insertRows.Len(); i++ { insertRow := tu.insertRows.At(i) + + // Compute the PK for the current row. upsertRowPK, _, err := sqlbase.EncodeIndexKey( tableDesc, &tu.conflictIndex, tu.ri.InsertColIDtoRowIndex, insertRow, tu.indexKeyPrefix) if err != nil { - return nil, err + return nil, nil, err + } + + // If the row has been seen already, we already know there's a + // conflict. There's nothing to do in that case. Otherwise, we + // need to remember there's a conflict by storing that row in + // `upsertRowPKs`. + if _, ok := uniquePKs[string(upsertRowPK)]; !ok { + // Conflict was not previously known. Remember it. + upsertRowPKs = append(upsertRowPKs, upsertRowPK) + uniquePKs[string(upsertRowPK)] = struct{}{} } - upsertRowPKs[i] = upsertRowPK } - return upsertRowPKs, nil + return upsertRowPKs, nil, nil } // Otherwise, compute the keys for the conflict index and look them up. The - // primary keys can be constructed from the entries that come back. In this + // primary key can be constructed from the entries that come back. In this // case, some spots in the slice will be nil (indicating no conflict) and the // others will be conflicting rows. b := tu.txn.NewBatch() @@ -557,7 +850,7 @@ func (tu *tableUpserter) upsertRowPKs(ctx context.Context, traceKV bool) ([]roac entries, err := sqlbase.EncodeSecondaryIndex( tableDesc, &tu.conflictIndex, tu.ri.InsertColIDtoRowIndex, insertRow) if err != nil { - return nil, err + return nil, nil, err } for _, entry := range entries { @@ -569,20 +862,21 @@ func (tu *tableUpserter) upsertRowPKs(ctx context.Context, traceKV bool) ([]roac } if err := tu.txn.Run(ctx, b); err != nil { - return nil, err + return nil, nil, err } + conflictingPKs := make(map[int]roachpb.Key) for i, result := range b.Results { - // if len(result.Rows) == 0, then no conflict for this row, so leave - // upsertRowPKs[i] as nil. if len(result.Rows) == 1 { - if result.Rows[0].Value == nil { - upsertRowPKs[i] = nil - } else { + if result.Rows[0].Value != nil { upsertRowPK, err := sqlbase.ExtractIndexKey(tu.alloc, tableDesc, result.Rows[0]) if err != nil { - return nil, err + return nil, nil, err + } + conflictingPKs[i] = upsertRowPK + if _, ok := uniquePKs[string(upsertRowPK)]; !ok { + upsertRowPKs = append(upsertRowPKs, upsertRowPK) + uniquePKs[string(upsertRowPK)] = struct{}{} } - upsertRowPKs[i] = upsertRowPK } } else if len(result.Rows) > 1 { panic(fmt.Errorf( @@ -590,46 +884,60 @@ func (tu *tableUpserter) upsertRowPKs(ctx context.Context, traceKV bool) ([]roac } } - return upsertRowPKs, nil + return upsertRowPKs, conflictingPKs, nil } // fetchExisting returns any existing rows in the table that conflict with the -// ones in tu.insertRows. The returned slice is the same length as tu.insertRows -// and a nil entry indicates no conflict. -func (tu *tableUpserter) fetchExisting(ctx context.Context, traceKV bool) ([]tree.Datums, error) { +// ones in tu.insertRows. +// Outputs: +// - existingRows contains data for conflicting rows. +// - pkToRowIdx relates the primary key values in the +// data source to which entry in the returned slice contain data +// for that primary key. +// - conflictingPKs contains the PK for each row in tu.insertRow that +// has a known conflict. This is populated only if there were some +// conflicts found and the conflict index was a secondary index. +func (tu *tableUpserter) fetchExisting( + ctx context.Context, traceKV bool, +) ( + existingRows []tree.Datums, + pkToRowIdx map[string]int, + conflictingPKs map[int]roachpb.Key, + err error, +) { tableDesc := tu.tableDesc() - primaryKeys, err := tu.upsertRowPKs(ctx, traceKV) + // primaryKeys contains the PK values to check for conflicts. + primaryKeys, conflictingPKs, err := tu.upsertRowPKs(ctx, traceKV) if err != nil { - return nil, err + return nil, nil, nil, err } - pkSpans := make(roachpb.Spans, 0, len(primaryKeys)) - rowIdxForPrimaryKey := make(map[string]int, len(primaryKeys)) - for i, primaryKey := range primaryKeys { - if primaryKey != nil { - pkSpans = append(pkSpans, roachpb.Span{Key: primaryKey, EndKey: primaryKey.PrefixEnd()}) - if _, ok := rowIdxForPrimaryKey[string(primaryKey)]; ok { - return nil, fmt.Errorf("UPSERT/ON CONFLICT DO UPDATE command cannot affect row a second time") - } - rowIdxForPrimaryKey[string(primaryKey)] = i - } + // pkToRowIdx maps the PK values to positions in existingRows. + pkToRowIdx = make(map[string]int) + + if len(primaryKeys) == 0 { + // We know already there is no conflicting row, so there's nothing to fetch. + return existingRows, pkToRowIdx, conflictingPKs, nil } - if len(pkSpans) == 0 { - // Every key was empty, so there's nothing to fetch. - return make([]tree.Datums, len(primaryKeys)), nil + + // pkSpans will contain the spans for every entry in primaryKeys. + pkSpans := make(roachpb.Spans, 0, len(primaryKeys)) + for _, primaryKey := range primaryKeys { + pkSpans = append(pkSpans, roachpb.Span{Key: primaryKey, EndKey: primaryKey.PrefixEnd()}) } + // Start retrieving the PKs. // We don't limit batches here because the spans are unordered. if err := tu.fetcher.StartScan(ctx, tu.txn, pkSpans, false /* no batch limits */, 0, traceKV); err != nil { - return nil, err + return nil, nil, nil, err } - rows := make([]tree.Datums, len(primaryKeys)) + // Populate existingRows and pkToRowIdx. for { row, _, _, err := tu.fetcher.NextRowDecoded(ctx) if err != nil { - return nil, err + return nil, nil, nil, err } if row == nil { break // Done @@ -638,16 +946,20 @@ func (tu *tableUpserter) fetchExisting(ctx context.Context, traceKV bool) ([]tre rowPrimaryKey, _, err := sqlbase.EncodeIndexKey( tableDesc, &tableDesc.PrimaryIndex, tu.fetchColIDtoRowIndex, row, tu.indexKeyPrefix) if err != nil { - return nil, err + return nil, nil, nil, err } - // The rows returned by rowFetcher are invalidated after the call to // NextRow, so we have to copy them to save them. + // TODO(knz/nathan): try to reuse a large slice instead + // of making many small slices. rowCopy := make(tree.Datums, len(row)) copy(rowCopy, row) - rows[rowIdxForPrimaryKey[string(rowPrimaryKey)]] = rowCopy + + pkToRowIdx[string(rowPrimaryKey)] = len(existingRows) + existingRows = append(existingRows, rowCopy) } - return rows, nil + + return existingRows, pkToRowIdx, conflictingPKs, nil } func (tu *tableUpserter) tableDesc() *sqlbase.TableDescriptor { @@ -678,16 +990,6 @@ func (tu *tableUpserter) close(ctx context.Context) { type fastTableUpserter struct { ri sqlbase.RowInserter - // collectRows indicates whether the upserted rows should be - // collected in the row container. - collectRows bool - upsertedRows *sqlbase.RowContainer - - // fastPathKeys and indexKeyPrefix are used to detect that no - // duplicate row is being upserted. - fastPathKeys map[string]struct{} - indexKeyPrefix []byte - // Set by init. txn *client.Txn evalCtx *tree.EvalContext @@ -700,19 +1002,9 @@ func (tu *fastTableUpserter) walkExprs(_ func(_ string, _ int, _ tree.TypedExpr) // init is part of the tableWriter interface. func (tu *fastTableUpserter) init(txn *client.Txn, evalCtx *tree.EvalContext) error { - tableDesc := tu.tableDesc() - tu.txn = txn tu.evalCtx = evalCtx - tu.indexKeyPrefix = sqlbase.MakeIndexKeyPrefix(tableDesc, tableDesc.PrimaryIndex.ID) - - if tu.collectRows { - tu.upsertedRows = sqlbase.NewRowContainer( - tu.evalCtx.Mon.MakeBoundAccount(), sqlbase.ColTypeInfoFromColDescs(tu.ri.InsertCols), 0, - ) - } - tu.fastPathKeys = make(map[string]struct{}) tu.fastPathBatch = tu.txn.NewBatch() return nil } @@ -721,28 +1013,9 @@ func (tu *fastTableUpserter) init(txn *client.Txn, evalCtx *tree.EvalContext) er func (tu *fastTableUpserter) row( ctx context.Context, row tree.Datums, traceKV bool, ) (tree.Datums, error) { - tableDesc := tu.tableDesc() - - // Verify we are not upserting a duplicate. - primaryKey, _, err := sqlbase.EncodeIndexKey( - tableDesc, &tableDesc.PrimaryIndex, tu.ri.InsertColIDtoRowIndex, row, tu.indexKeyPrefix) - if err != nil { - return nil, err - } - if _, ok := tu.fastPathKeys[string(primaryKey)]; ok { - return nil, fmt.Errorf("UPSERT/ON CONFLICT DO UPDATE command cannot affect row a second time") - } - tu.fastPathKeys[string(primaryKey)] = struct{}{} - // Use the fast path, ignore conflicts. - if err := tu.ri.InsertRow( - ctx, tu.fastPathBatch, row, true /* ignoreConflicts */, sqlbase.CheckFKs, traceKV); err != nil { - return nil, err - } - - if tu.collectRows { - _, err = tu.upsertedRows.AddRow(ctx, row) - } + err := tu.ri.InsertRow( + ctx, tu.fastPathBatch, row, true /* ignoreConflicts */, sqlbase.CheckFKs, traceKV) return nil, err } @@ -763,7 +1036,7 @@ func (tu *fastTableUpserter) finalize( } } - return tu.upsertedRows, nil + return nil, nil } func (tu *fastTableUpserter) fkSpanCollector() sqlbase.FkSpanCollector { @@ -774,12 +1047,7 @@ func (tu *fastTableUpserter) tableDesc() *sqlbase.TableDescriptor { return tu.ri.Helper.TableDesc } -func (tu *fastTableUpserter) close(ctx context.Context) { - if tu.upsertedRows != nil { - tu.upsertedRows.Close(ctx) - tu.upsertedRows = nil - } -} +func (tu *fastTableUpserter) close(ctx context.Context) {} // tableDeleter handles writing kvs and forming table rows for deletes. type tableDeleter struct { diff --git a/pkg/sql/upsert.go b/pkg/sql/upsert.go index 2b29f65aa282..322353f26f6d 100644 --- a/pkg/sql/upsert.go +++ b/pkg/sql/upsert.go @@ -174,14 +174,16 @@ func (p *planner) newUpsertNode( // path is disabled during all mutations. len(en.tableDesc.Mutations) == 0 && // For the fast path, all columns must be specified in the insert. - len(ri.InsertCols) == len(en.tableDesc.Columns) + len(ri.InsertCols) == len(en.tableDesc.Columns) && + // We cannot use the fast path if we also have a RETURNING clause, because + // RETURNING wants to see only the updated rows. + !needRows if enableFastPath { // We then use the super-simple, super-fast writer. There's not // much else to prepare. un.tw = &fastTableUpserter{ - ri: ri, - collectRows: needRows, + ri: ri, } } else { // General/slow path. @@ -624,13 +626,9 @@ func (p *planner) newUpsertHelper( // // This will use the layout from the table columns. The mapping from // column IDs to row datum positions is straightforward. - mapping := make(map[sqlbase.ColumnID]int) - for i, c := range tableDesc.Columns { - mapping[c.ID] = i - } helper.ccIvarContainer = rowIndexedVarContainer{ cols: tableDesc.Columns, - mapping: mapping, + mapping: sqlbase.ColIDtoRowIndexFromCols(tableDesc.Columns), } return helper, nil