Skip to content

Commit

Permalink
* : Multiple rows insert in a statement should have consecutive autoI…
Browse files Browse the repository at this point in the history
…D if needed. (#11876) (#12619)
  • Loading branch information
AilinKid authored and ngaut committed Oct 14, 2019
1 parent f043ecc commit df41474
Show file tree
Hide file tree
Showing 10 changed files with 621 additions and 96 deletions.
201 changes: 194 additions & 7 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type InsertValues struct {
colDefaultVals []defaultVal
evalBuffer chunk.MutRow
evalBufferTypes []*types.FieldType

// Fill the autoID lazily to datum. This is used for being compatible with JDBC using getGeneratedKeys().
// `insert|replace values` can guarantee consecutive autoID in a batch.
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
lazyFillAutoID bool
}

type defaultVal struct {
Expand Down Expand Up @@ -182,7 +188,7 @@ func (e *InsertValues) insertRows(exec func(rows [][]types.Datum) error) (err er
if err = e.processSetList(); err != nil {
return errors.Trace(err)
}

e.lazyFillAutoID = true
sessVars := e.ctx.GetSessionVars()
batchInsert := (sessVars.BatchInsert && !sessVars.InTxn()) || config.GetGlobalConfig().EnableBatchDML
rows := make([][]types.Datum, 0, len(e.Lists))
Expand All @@ -194,6 +200,11 @@ func (e *InsertValues) insertRows(exec func(rows [][]types.Datum) error) (err er
}
rows = append(rows, row)
if batchInsert && int(e.rowCount)%sessVars.DMLBatchSize == 0 {
// Before batch insert, fill the batch allocated autoIDs.
rows, err = e.lazyAdjustAutoIncrementDatum(rows)
if err != nil {
return err
}
if err := exec(rows); err != nil {
return errors.Trace(err)
}
Expand All @@ -203,6 +214,11 @@ func (e *InsertValues) insertRows(exec func(rows [][]types.Datum) error) (err er
rows = rows[:0]
}
}
// Fill the batch allocated autoIDs.
rows, err = e.lazyAdjustAutoIncrementDatum(rows)
if err != nil {
return err
}
return errors.Trace(exec(rows))
}

Expand Down Expand Up @@ -260,7 +276,7 @@ func (e *InsertValues) evalRow(list []expression.Expression, rowIdx int) ([]type
row[offset], hasValue[offset] = *val1.Copy(), true
e.evalBuffer.SetDatum(offset, val1)
}

// Row may lack of generated column, autoIncrement column, empty column here.
return e.fillRow(row, hasValue)
}

Expand Down Expand Up @@ -404,6 +420,14 @@ func (e *InsertValues) getColDefaultValue(idx int, col *table.Column) (d types.D
func (e *InsertValues) fillColValue(datum types.Datum, idx int, column *table.Column, hasValue bool) (types.Datum,
error) {
if mysql.HasAutoIncrementFlag(column.Flag) {
if e.lazyFillAutoID {
// Handle hasValue info in autoIncrement column previously for lazy handle.
if !hasValue {
datum.SetNull()
}
// Store the plain datum of autoIncrement column directly for lazy handle.
return datum, nil
}
d, err := e.adjustAutoIncrementDatum(datum, hasValue, column)
if err != nil {
return types.Datum{}, errors.Trace(err)
Expand All @@ -422,6 +446,10 @@ func (e *InsertValues) fillColValue(datum types.Datum, idx int, column *table.Co

// fillRow fills generated columns, auto_increment column and empty column.
// For NOT NULL column, it will return error or use zero value based on sql_mode.
// When lazyFillAutoID is true, fill row will lazily handle auto increment datum for lazy batch allocation.
// `insert|replace values` can guarantee consecutive autoID in a batch.
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
func (e *InsertValues) fillRow(row []types.Datum, hasValue []bool) ([]types.Datum, error) {
gIdx := 0
for i, c := range e.Table.Cols() {
Expand All @@ -445,15 +473,174 @@ func (e *InsertValues) fillRow(row []types.Datum, hasValue []bool) ([]types.Datu
return nil, errors.Trace(err)
}
}

// Handle the bad null error.
if row[i], err = c.HandleBadNull(row[i], e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, errors.Trace(err)
// Handle the bad null error. Cause generated column with `not null` flag will get default value datum in fillColValue
// which should be override by generated expr first, then handle the bad null logic here.
if !e.lazyFillAutoID || (e.lazyFillAutoID && !mysql.HasAutoIncrementFlag(c.Flag)) {
if row[i], err = c.HandleBadNull(row[i], e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, err
}
}
}
return row, nil
}

// isAutoNull can help judge whether a datum is AutoIncrement Null quickly.
// This used to help lazyFillAutoIncrement to find consecutive N datum backwards for batch autoID alloc.
func (e *InsertValues) isAutoNull(d types.Datum, col *table.Column) bool {
var err error
var recordID int64
if !d.IsNull() {
recordID, err = getAutoRecordID(d, &col.FieldType, true)
if err != nil {
return false
}
}
// Use the value if it's not null and not 0.
if recordID != 0 {
return false
}
// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
return true
}
return false
}

func (e *InsertValues) hasAutoIncrementColumn() (int, bool) {
colIdx := -1
for i, c := range e.Table.Cols() {
if mysql.HasAutoIncrementFlag(c.Flag) {
colIdx = i
break
}
}
return colIdx, colIdx != -1
}

func (e *InsertValues) lazyAdjustAutoIncrementDatumInRetry(rows [][]types.Datum, colIdx int) ([][]types.Datum, error) {
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]

// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
id, err := retryInfo.GetCurrAutoIncrementID()
if err != nil {
return nil, err
}
autoDatum.SetAutoID(id, col.Flag)

if autoDatum, err = col.HandleBadNull(autoDatum, e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
}
}
return rows, nil
}

// lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum
// except it will cache auto increment datum previously for lazy batch allocation of autoID.
func (e *InsertValues) lazyAdjustAutoIncrementDatum(rows [][]types.Datum) ([][]types.Datum, error) {
// Not in lazyFillAutoID mode means no need to fill.
if !e.lazyFillAutoID {
return rows, nil
}
// No autoIncrement column means no need to fill.
colIdx, ok := e.hasAutoIncrementColumn()
if !ok {
return rows, nil
}
// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
return e.lazyAdjustAutoIncrementDatumInRetry(rows, colIdx)
}
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]

var err error
var recordID int64
if !autoDatum.IsNull() {
recordID, err = getAutoRecordID(autoDatum, &col.FieldType, true)
if err != nil {
return nil, err
}
}
// Use the value if it's not null and not 0.
if recordID != 0 {
err = e.Table.RebaseAutoID(e.ctx, recordID, true)
if err != nil {
return nil, err
}
e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID)
retryInfo.AddAutoIncrementID(recordID)
rows[i][colIdx] = autoDatum
continue
}

// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if autoDatum.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
// Find consecutive num.
start := i
cnt := 1
for i+1 < length && e.isAutoNull(rows[i+1][colIdx], col) {
i++
cnt++
}
// Alloc batch N consecutive (min, max] autoIDs.
// max value can be derived from adding one for cnt times.
min, _, err := e.Table.AllocAutoIncrementValue(e.ctx, cnt)
if e.filterErr(err) != nil {
return nil, err
}
// It's compatible with mysql setting the first allocated autoID to lastInsertID.
// Cause autoID may be specified by user, judge only the first row is not suitable.
if e.lastInsertID == 0 {
e.lastInsertID = uint64(min) + 1
}
// Assign autoIDs to rows.
for j := 0; j < cnt; j++ {
offset := j + start
d := rows[offset][colIdx]

id := int64(uint64(min) + uint64(j) + 1)
d.SetAutoID(id, col.Flag)
retryInfo.AddAutoIncrementID(id)

// The value of d is adjusted by auto ID, so we need to cast it again.
d, err := table.CastValue(e.ctx, d, col.ToInfo())
if err != nil {
return nil, err
}
rows[offset][colIdx] = d
}
continue
}

autoDatum.SetAutoID(recordID, col.Flag)
retryInfo.AddAutoIncrementID(recordID)

// the value of d is adjusted by auto ID, so we need to cast it again.
autoDatum, err = table.CastValue(e.ctx, autoDatum, col.ToInfo())
if err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
}
return rows, nil
}

func (e *InsertValues) adjustAutoIncrementDatum(d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
Expand Down Expand Up @@ -490,7 +677,7 @@ func (e *InsertValues) adjustAutoIncrementDatum(d types.Datum, hasValue bool, c
// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
recordID, err = e.Table.AllocAutoIncrementValue(e.ctx)
_, recordID, err = e.Table.AllocAutoIncrementValue(e.ctx, 1)
if e.filterErr(err) != nil {
return types.Datum{}, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit df41474

Please sign in to comment.