Skip to content

Commit

Permalink
dml : Multiple rows insert in a statement should have consecutive aut…
Browse files Browse the repository at this point in the history
…oID if needed. (pingcap#11876)
  • Loading branch information
AilinKid committed Oct 10, 2019
1 parent b103dff commit 6a01b63
Show file tree
Hide file tree
Showing 6 changed files with 657 additions and 115 deletions.
194 changes: 193 additions & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ type InsertValues struct {
colDefaultVals []defaultVal
evalBuffer chunk.MutRow
evalBufferTypes []*types.FieldType

// Fill the autoID lazily to datum. This is used for being compatible with JDBC using getGeneratedKeys().
// `insert|replace values` can guarantee consecutive autoID in a batch.
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
lazyFillAutoID bool
}

type defaultVal struct {
Expand Down Expand Up @@ -184,6 +190,8 @@ func (e *InsertValues) insertRows(ctx context.Context, exec func(ctx context.Con
batchInsert := sessVars.BatchInsert && !sessVars.InTxn()
batchSize := sessVars.DMLBatchSize

e.lazyFillAutoID = true

rows := make([][]types.Datum, 0, len(e.Lists))
for i, list := range e.Lists {
e.rowCount++
Expand All @@ -193,6 +201,11 @@ func (e *InsertValues) insertRows(ctx context.Context, exec func(ctx context.Con
}
rows = append(rows, row)
if batchInsert && e.rowCount%uint64(batchSize) == 0 {
// Before batch insert, fill the batch allocated autoIDs.
rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows)
if err != nil {
return err
}
if err = exec(ctx, rows); err != nil {
return err
}
Expand All @@ -202,6 +215,11 @@ func (e *InsertValues) insertRows(ctx context.Context, exec func(ctx context.Con
}
}
}
// Fill the batch allocated autoIDs.
rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows)
if err != nil {
return err
}
return exec(ctx, rows)
}

Expand Down Expand Up @@ -259,7 +277,7 @@ func (e *InsertValues) evalRow(ctx context.Context, list []expression.Expression
row[offset], hasValue[offset] = *val1.Copy(), true
e.evalBuffer.SetDatum(offset, val1)
}

// Row may lack of generated column, autoIncrement column, empty column here.
return e.fillRow(ctx, row, hasValue)
}

Expand Down Expand Up @@ -413,6 +431,14 @@ func (e *InsertValues) getColDefaultValue(idx int, col *table.Column) (d types.D
func (e *InsertValues) fillColValue(ctx context.Context, datum types.Datum, idx int, column *table.Column, hasValue bool) (types.Datum,
error) {
if mysql.HasAutoIncrementFlag(column.Flag) {
if e.lazyFillAutoID {
// Handle hasValue info in autoIncrement column previously for lazy handle.
if !hasValue {
datum.SetNull()
}
// Store the plain datum of autoIncrement column directly for lazy handle.
return datum, nil
}
d, err := e.adjustAutoIncrementDatum(ctx, datum, hasValue, column)
if err != nil {
return types.Datum{}, err
Expand All @@ -431,6 +457,10 @@ func (e *InsertValues) fillColValue(ctx context.Context, datum types.Datum, idx

// fillRow fills generated columns, auto_increment column and empty column.
// For NOT NULL column, it will return error or use zero value based on sql_mode.
// When lazyFillAutoID is true, fill row will lazily handle auto increment datum for lazy batch allocation.
// `insert|replace values` can guarantee consecutive autoID in a batch.
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool) ([]types.Datum, error) {
gIdx := 0
for i, c := range e.Table.Cols() {
Expand All @@ -440,6 +470,11 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
if err != nil {
return nil, err
}
if !e.lazyFillAutoID || (e.lazyFillAutoID && !mysql.HasAutoIncrementFlag(c.Flag)) {
if row[i], err = c.HandleBadNull(row[i], e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, err
}
}

// Evaluate the generated columns.
if c.IsGenerated() {
Expand All @@ -463,6 +498,163 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
return row, nil
}

// isAutoNull can help judge whether a datum is AutoIncrement Null quickly.
// This used to help lazyFillAutoIncrement to find consecutive N datum backwards for batch autoID alloc.
func (e *InsertValues) isAutoNull(ctx context.Context, d types.Datum, col *table.Column) bool {
var err error
var recordID int64
if !d.IsNull() {
recordID, err = getAutoRecordID(d, &col.FieldType, true)
if err != nil {
return false
}
}
// Use the value if it's not null and not 0.
if recordID != 0 {
return false
}
// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
return true
}
return false
}

func (e *InsertValues) hasAutoIncrementColumn() (int, bool) {
colIdx := -1
for i, c := range e.Table.Cols() {
if mysql.HasAutoIncrementFlag(c.Flag) {
colIdx = i
break
}
}
return colIdx, colIdx != -1
}

func (e *InsertValues) lazyAdjustAutoIncrementDatumInRetry(ctx context.Context, rows [][]types.Datum, colIdx int) ([][]types.Datum, error) {
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]

// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
id, err := retryInfo.GetCurrAutoIncrementID()
if err != nil {
return nil, err
}
autoDatum.SetAutoID(id, col.Flag)

if autoDatum, err = col.HandleBadNull(autoDatum, e.ctx.GetSessionVars().StmtCtx); err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
}
}
return rows, nil
}

// lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum
// except it will cache auto increment datum previously for lazy batch allocation of autoID.
func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) ([][]types.Datum, error) {
// Not in lazyFillAutoID mode means no need to fill.
if !e.lazyFillAutoID {
return rows, nil
}
// No autoIncrement column means no need to fill.
colIdx, ok := e.hasAutoIncrementColumn()
if !ok {
return rows, nil
}
// autoID can be found in RetryInfo.
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
return e.lazyAdjustAutoIncrementDatumInRetry(ctx, rows, colIdx)
}
// Get the autoIncrement column.
col := e.Table.Cols()[colIdx]
// Consider the colIdx of autoIncrement in row are the same.
length := len(rows)
for i := 0; i < length; i++ {
autoDatum := rows[i][colIdx]

var err error
var recordID int64
if !autoDatum.IsNull() {
recordID, err = getAutoRecordID(autoDatum, &col.FieldType, true)
if err != nil {
return nil, err
}
}
// Use the value if it's not null and not 0.
if recordID != 0 {
err = e.Table.RebaseAutoID(e.ctx, recordID, true)
if err != nil {
return nil, err
}
e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID)
retryInfo.AddAutoIncrementID(recordID)
rows[i][colIdx] = autoDatum
continue
}

// Change NULL to auto id.
// Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set.
if autoDatum.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 {
// Find consecutive num.
start := i
cnt := 1
for i+1 < length && e.isAutoNull(ctx, rows[i+1][colIdx], col) {
i++
cnt++
}
// Alloc batch N consecutive (min, max] autoIDs.
// max value can be derived from adding one for cnt times.
min, _, err := table.AllocBatchAutoIncrementValue(ctx, e.Table, e.ctx, cnt)
if e.filterErr(err) != nil {
return nil, err
}
// It's compatible with mysql setting the first allocated autoID to lastInsertID.
// Cause autoID may be specified by user, judge only the first row is not suitable.
if e.lastInsertID == 0 {
e.lastInsertID = uint64(min) + 1
}
// Assign autoIDs to rows.
for j := 0; j < cnt; j++ {
offset := j + start
d := rows[offset][colIdx]

id := int64(uint64(min) + uint64(j) + 1)
d.SetAutoID(id, col.Flag)
retryInfo.AddAutoIncrementID(id)

// The value of d is adjusted by auto ID, so we need to cast it again.
d, err := table.CastValue(e.ctx, d, col.ToInfo())
if err != nil {
return nil, err
}
rows[offset][colIdx] = d
}
continue
}

autoDatum.SetAutoID(recordID, col.Flag)
retryInfo.AddAutoIncrementID(recordID)

// the value of d is adjusted by auto ID, so we need to cast it again.
autoDatum, err = table.CastValue(e.ctx, autoDatum, col.ToInfo())
if err != nil {
return nil, err
}
rows[i][colIdx] = autoDatum
}
return rows, nil
}

func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
Expand Down
Loading

0 comments on commit 6a01b63

Please sign in to comment.