Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

* : Multiple rows insert in a statement should have consecutive autoID if needed. (#11876) #12619

Merged
merged 4 commits into from
Oct 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 194 additions & 7 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type InsertValues struct {
colDefaultVals []defaultVal
evalBuffer chunk.MutRow
evalBufferTypes []*types.FieldType

// Fill the autoID lazily to datum. This is used for being compatible with JDBC using getGeneratedKeys().
// `insert|replace values` can guarantee consecutive autoID in a batch.
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
lazyFillAutoID bool
}

type defaultVal struct {
Expand Down Expand Up @@ -182,7 +188,7 @@ func (e *InsertValues) insertRows(exec func(rows [][]types.Datum) error) (err er
if err = e.processSetList(); err != nil {
return errors.Trace(err)
}

e.lazyFillAutoID = true
sessVars := e.ctx.GetSessionVars()
batchInsert := (sessVars.BatchInsert && !sessVars.InTxn()) || config.GetGlobalConfig().EnableBatchDML
rows := make([][]types.Datum, 0, len(e.Lists))
Expand All @@ -194,6 +200,11 @@ func (e *InsertValues) insertRows(exec func(rows [][]types.Datum) error) (err er
}
rows = append(rows, row)
if batchInsert && int(e.rowCount)%sessVars.DMLBatchSize == 0 {
// Before batch insert, fill the batch allocated autoIDs.
rows, err = e.lazyAdjustAutoIncrementDatum(rows)
if err != nil {
return err
}
if err := exec(rows); err != nil {
return errors.Trace(err)
}
Expand All @@ -203,6 +214,11 @@ func (e *InsertValues) insertRows(exec func(rows [][]types.Datum) error) (err er
rows = rows[:0]
}
}
// Fill the batch allocated autoIDs.
rows, err = e.lazyAdjustAutoIncrementDatum(rows)
if err != nil {
return err
}
return errors.Trace(exec(rows))
}

Expand Down Expand Up @@ -260,7 +276,7 @@ func (e *InsertValues) evalRow(list []expression.Expression, rowIdx int) ([]type
row[offset], hasValue[offset] = *val1.Copy(), true
e.evalBuffer.SetDatum(offset, val1)
}

// Row may lack of generated column, autoIncrement column, empty column here.
return e.fillRow(row, hasValue)
}

Expand Down Expand Up @@ -404,6 +420,14 @@ func (e *InsertValues) getColDefaultValue(idx int, col *table.Column) (d types.D
func (e *InsertValues) fillColValue(datum types.Datum, idx int, column *table.Column, hasValue bool) (types.Datum,
error) {
if mysql.HasAutoIncrementFlag(column.Flag) {
if e.lazyFillAutoID {
// Handle hasValue info in autoIncrement column previously for lazy handle.
if !hasValue {
datum.SetNull()
}
// Store the plain datum of autoIncrement column directly for lazy handle.
return datum, nil
}
d, err := e.adjustAutoIncrementDatum(datum, hasValue, column)
if err != nil {
return types.Datum{}, errors.Trace(err)
Expand All @@ -422,6 +446,10 @@ func (e *InsertValues) fillColValue(datum types.Datum, idx int, column *table.Co

// fillRow fills generated columns, auto_increment column and empty column.
// For NOT NULL column, it will return error or use zero value based on sql_mode.
// When lazyFillAutoID is true, fill row will lazily handle auto increment datum for lazy batch allocation.
// `insert|replace values` can guarantee consecutive autoID in a batch.
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
func (e *InsertValues) fillRow(row []types.Datum, hasValue []bool) ([]types.Datum, error) {
gIdx := 0
for i, c := range e.Table.Cols() {
Expand All @@ -445,15 +473,174 @@ func (e *InsertValues) fillRow(row []types.Datum, hasValue []bool) ([]types.Datu
return nil, errors.Trace(err)
}
}

// Handle the bad null error.
if row[i], err = c.HandleBadNull(row[i], e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, errors.Trace(err)
// Handle the bad null error. Cause generated column with `not null` flag will get default value datum in fillColValue
// which should be override by generated expr first, then handle the bad null logic here.
if !e.lazyFillAutoID || (e.lazyFillAutoID && !mysql.HasAutoIncrementFlag(c.Flag)) {
if row[i], err = c.HandleBadNull(row[i], e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, err
}
}
}
return row, nil
}

// isAutoNull can help judge whether a datum is AutoIncrement Null quickly.
// This used to help lazyFillAutoIncrement to find consecutive N datum backwards for batch autoID alloc.
func (e *InsertValues) isAutoNull(d types.Datum, col *table.Column) bool {
var err error
var recordID int64
if !d.IsNull() {
recordID, err = getAutoRecordID(d, &col.FieldType, true)
if err != nil {
return false
}
}
// Use the value if it's not null and not 0.
if recordID != 0 {
return false
}
// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
return true
}
return false
}

func (e *InsertValues) hasAutoIncrementColumn() (int, bool) {
colIdx := -1
for i, c := range e.Table.Cols() {
if mysql.HasAutoIncrementFlag(c.Flag) {
colIdx = i
break
}
}
return colIdx, colIdx != -1
}

func (e *InsertValues) lazyAdjustAutoIncrementDatumInRetry(rows [][]types.Datum, colIdx int) ([][]types.Datum, error) {
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]

// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
id, err := retryInfo.GetCurrAutoIncrementID()
if err != nil {
return nil, err
}
autoDatum.SetAutoID(id, col.Flag)

if autoDatum, err = col.HandleBadNull(autoDatum, e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
}
}
return rows, nil
}

// lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum
// except it will cache auto increment datum previously for lazy batch allocation of autoID.
func (e *InsertValues) lazyAdjustAutoIncrementDatum(rows [][]types.Datum) ([][]types.Datum, error) {
// Not in lazyFillAutoID mode means no need to fill.
if !e.lazyFillAutoID {
return rows, nil
}
// No autoIncrement column means no need to fill.
colIdx, ok := e.hasAutoIncrementColumn()
if !ok {
return rows, nil
}
// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
return e.lazyAdjustAutoIncrementDatumInRetry(rows, colIdx)
}
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]

var err error
var recordID int64
if !autoDatum.IsNull() {
recordID, err = getAutoRecordID(autoDatum, &col.FieldType, true)
if err != nil {
return nil, err
}
}
// Use the value if it's not null and not 0.
if recordID != 0 {
err = e.Table.RebaseAutoID(e.ctx, recordID, true)
if err != nil {
return nil, err
}
e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID)
retryInfo.AddAutoIncrementID(recordID)
rows[i][colIdx] = autoDatum
continue
}

// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if autoDatum.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
// Find consecutive num.
start := i
cnt := 1
for i+1 < length && e.isAutoNull(rows[i+1][colIdx], col) {
i++
cnt++
}
// Alloc batch N consecutive (min, max] autoIDs.
// max value can be derived from adding one for cnt times.
min, _, err := e.Table.AllocAutoIncrementValue(e.ctx, cnt)
if e.filterErr(err) != nil {
return nil, err
}
// It's compatible with mysql setting the first allocated autoID to lastInsertID.
// Cause autoID may be specified by user, judge only the first row is not suitable.
if e.lastInsertID == 0 {
e.lastInsertID = uint64(min) + 1
}
// Assign autoIDs to rows.
for j := 0; j < cnt; j++ {
offset := j + start
d := rows[offset][colIdx]

id := int64(uint64(min) + uint64(j) + 1)
d.SetAutoID(id, col.Flag)
retryInfo.AddAutoIncrementID(id)

// The value of d is adjusted by auto ID, so we need to cast it again.
d, err := table.CastValue(e.ctx, d, col.ToInfo())
if err != nil {
return nil, err
}
rows[offset][colIdx] = d
}
continue
}

autoDatum.SetAutoID(recordID, col.Flag)
retryInfo.AddAutoIncrementID(recordID)

// the value of d is adjusted by auto ID, so we need to cast it again.
autoDatum, err = table.CastValue(e.ctx, autoDatum, col.ToInfo())
if err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
}
return rows, nil
}

func (e *InsertValues) adjustAutoIncrementDatum(d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
Expand Down Expand Up @@ -490,7 +677,7 @@ func (e *InsertValues) adjustAutoIncrementDatum(d types.Datum, hasValue bool, c
// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
recordID, err = e.Table.AllocAutoIncrementValue(e.ctx)
_, recordID, err = e.Table.AllocAutoIncrementValue(e.ctx, 1)
if e.filterErr(err) != nil {
return types.Datum{}, errors.Trace(err)
}
Expand Down
Loading