diff --git a/executor/insert_common.go b/executor/insert_common.go index 4ae5efc726179..f8e160b648459 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -58,6 +58,12 @@ type InsertValues struct { colDefaultVals []defaultVal evalBuffer chunk.MutRow evalBufferTypes []*types.FieldType + + // Fill the autoID lazily to datum. This is used for being compatible with JDBC using getGeneratedKeys(). + // `insert|replace values` can guarantee consecutive autoID in a batch. + // Other statements like `insert select from` don't guarantee consecutive autoID. + // https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html + lazyFillAutoID bool } type defaultVal struct { @@ -184,6 +190,8 @@ func (e *InsertValues) insertRows(ctx context.Context, exec func(ctx context.Con batchInsert := sessVars.BatchInsert && !sessVars.InTxn() batchSize := sessVars.DMLBatchSize + e.lazyFillAutoID = true + rows := make([][]types.Datum, 0, len(e.Lists)) for i, list := range e.Lists { e.rowCount++ @@ -193,6 +201,11 @@ func (e *InsertValues) insertRows(ctx context.Context, exec func(ctx context.Con } rows = append(rows, row) if batchInsert && e.rowCount%uint64(batchSize) == 0 { + // Before batch insert, fill the batch allocated autoIDs. + rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows) + if err != nil { + return err + } if err = exec(ctx, rows); err != nil { return err } @@ -202,6 +215,11 @@ func (e *InsertValues) insertRows(ctx context.Context, exec func(ctx context.Con } } } + // Fill the batch allocated autoIDs. + rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows) + if err != nil { + return err + } return exec(ctx, rows) } @@ -259,7 +277,7 @@ func (e *InsertValues) evalRow(ctx context.Context, list []expression.Expression row[offset], hasValue[offset] = *val1.Copy(), true e.evalBuffer.SetDatum(offset, val1) } - + // Row may lack of generated column, autoIncrement column, empty column here. return e.fillRow(ctx, row, hasValue) } @@ -413,6 +431,14 @@ func (e *InsertValues) getColDefaultValue(idx int, col *table.Column) (d types.D func (e *InsertValues) fillColValue(ctx context.Context, datum types.Datum, idx int, column *table.Column, hasValue bool) (types.Datum, error) { if mysql.HasAutoIncrementFlag(column.Flag) { + if e.lazyFillAutoID { + // Handle hasValue info in autoIncrement column previously for lazy handle. + if !hasValue { + datum.SetNull() + } + // Store the plain datum of autoIncrement column directly for lazy handle. + return datum, nil + } d, err := e.adjustAutoIncrementDatum(ctx, datum, hasValue, column) if err != nil { return types.Datum{}, err @@ -431,6 +457,10 @@ func (e *InsertValues) fillColValue(ctx context.Context, datum types.Datum, idx // fillRow fills generated columns, auto_increment column and empty column. // For NOT NULL column, it will return error or use zero value based on sql_mode. +// When lazyFillAutoID is true, fill row will lazily handle auto increment datum for lazy batch allocation. +// `insert|replace values` can guarantee consecutive autoID in a batch. +// Other statements like `insert select from` don't guarantee consecutive autoID. +// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool) ([]types.Datum, error) { gIdx := 0 for i, c := range e.Table.Cols() { @@ -440,6 +470,11 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue if err != nil { return nil, err } + if !e.lazyFillAutoID || (e.lazyFillAutoID && !mysql.HasAutoIncrementFlag(c.Flag)) { + if row[i], err = c.HandleBadNull(row[i], e.ctx.GetSessionVars().StmtCtx); err != nil { + return nil, err + } + } // Evaluate the generated columns. if c.IsGenerated() { @@ -463,6 +498,163 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue return row, nil } +// isAutoNull can help judge whether a datum is AutoIncrement Null quickly. +// This used to help lazyFillAutoIncrement to find consecutive N datum backwards for batch autoID alloc. +func (e *InsertValues) isAutoNull(ctx context.Context, d types.Datum, col *table.Column) bool { + var err error + var recordID int64 + if !d.IsNull() { + recordID, err = getAutoRecordID(d, &col.FieldType, true) + if err != nil { + return false + } + } + // Use the value if it's not null and not 0. + if recordID != 0 { + return false + } + // Change NULL to auto id. + // Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set. + if d.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 { + return true + } + return false +} + +func (e *InsertValues) hasAutoIncrementColumn() (int, bool) { + colIdx := -1 + for i, c := range e.Table.Cols() { + if mysql.HasAutoIncrementFlag(c.Flag) { + colIdx = i + break + } + } + return colIdx, colIdx != -1 +} + +func (e *InsertValues) lazyAdjustAutoIncrementDatumInRetry(ctx context.Context, rows [][]types.Datum, colIdx int) ([][]types.Datum, error) { + // Get the autoIncrement column. + col := e.Table.Cols()[colIdx] + // Consider the colIdx of autoIncrement in row are the same. + length := len(rows) + for i := 0; i < length; i++ { + autoDatum := rows[i][colIdx] + + // autoID can be found in RetryInfo. + retryInfo := e.ctx.GetSessionVars().RetryInfo + if retryInfo.Retrying { + id, err := retryInfo.GetCurrAutoIncrementID() + if err != nil { + return nil, err + } + autoDatum.SetAutoID(id, col.Flag) + + if autoDatum, err = col.HandleBadNull(autoDatum, e.ctx.GetSessionVars().StmtCtx); err != nil { + return nil, err + } + rows[i][colIdx] = autoDatum + } + } + return rows, nil +} + +// lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum +// except it will cache auto increment datum previously for lazy batch allocation of autoID. +func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) ([][]types.Datum, error) { + // Not in lazyFillAutoID mode means no need to fill. + if !e.lazyFillAutoID { + return rows, nil + } + // No autoIncrement column means no need to fill. + colIdx, ok := e.hasAutoIncrementColumn() + if !ok { + return rows, nil + } + // autoID can be found in RetryInfo. + retryInfo := e.ctx.GetSessionVars().RetryInfo + if retryInfo.Retrying { + return e.lazyAdjustAutoIncrementDatumInRetry(ctx, rows, colIdx) + } + // Get the autoIncrement column. + col := e.Table.Cols()[colIdx] + // Consider the colIdx of autoIncrement in row are the same. + length := len(rows) + for i := 0; i < length; i++ { + autoDatum := rows[i][colIdx] + + var err error + var recordID int64 + if !autoDatum.IsNull() { + recordID, err = getAutoRecordID(autoDatum, &col.FieldType, true) + if err != nil { + return nil, err + } + } + // Use the value if it's not null and not 0. + if recordID != 0 { + err = e.Table.RebaseAutoID(e.ctx, recordID, true) + if err != nil { + return nil, err + } + e.ctx.GetSessionVars().StmtCtx.InsertID = uint64(recordID) + retryInfo.AddAutoIncrementID(recordID) + rows[i][colIdx] = autoDatum + continue + } + + // Change NULL to auto id. + // Change value 0 to auto id, if NoAutoValueOnZero SQL mode is not set. + if autoDatum.IsNull() || e.ctx.GetSessionVars().SQLMode&mysql.ModeNoAutoValueOnZero == 0 { + // Find consecutive num. + start := i + cnt := 1 + for i+1 < length && e.isAutoNull(ctx, rows[i+1][colIdx], col) { + i++ + cnt++ + } + // Alloc batch N consecutive (min, max] autoIDs. + // max value can be derived from adding one for cnt times. + min, _, err := table.AllocBatchAutoIncrementValue(ctx, e.Table, e.ctx, cnt) + if e.filterErr(err) != nil { + return nil, err + } + // It's compatible with mysql setting the first allocated autoID to lastInsertID. + // Cause autoID may be specified by user, judge only the first row is not suitable. + if e.lastInsertID == 0 { + e.lastInsertID = uint64(min) + 1 + } + // Assign autoIDs to rows. + for j := 0; j < cnt; j++ { + offset := j + start + d := rows[offset][colIdx] + + id := int64(uint64(min) + uint64(j) + 1) + d.SetAutoID(id, col.Flag) + retryInfo.AddAutoIncrementID(id) + + // The value of d is adjusted by auto ID, so we need to cast it again. + d, err := table.CastValue(e.ctx, d, col.ToInfo()) + if err != nil { + return nil, err + } + rows[offset][colIdx] = d + } + continue + } + + autoDatum.SetAutoID(recordID, col.Flag) + retryInfo.AddAutoIncrementID(recordID) + + // the value of d is adjusted by auto ID, so we need to cast it again. + autoDatum, err = table.CastValue(e.ctx, autoDatum, col.ToInfo()) + if err != nil { + return nil, err + } + rows[i][colIdx] = autoDatum + } + return rows, nil +} + func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) { retryInfo := e.ctx.GetSessionVars().RetryInfo if retryInfo.Retrying { diff --git a/executor/insert_test.go b/executor/insert_test.go index 224b071163049..34c73a3d22fa2 100644 --- a/executor/insert_test.go +++ b/executor/insert_test.go @@ -15,9 +15,11 @@ package executor_test import ( "fmt" + "strings" . "github.com/pingcap/check" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/testkit" ) @@ -311,6 +313,8 @@ func (s *testSuite3) TestInsertWithAutoidSchema(c *C) { tk.MustExec(`create table t5(id int primary key, n float unsigned auto_increment, key I_n(n));`) tk.MustExec(`create table t6(id int primary key, n double auto_increment, key I_n(n));`) tk.MustExec(`create table t7(id int primary key, n double unsigned auto_increment, key I_n(n));`) + // test for inserting multiple values + tk.MustExec(`create table t8(id int primary key auto_increment, n int);`) tests := []struct { insert string @@ -553,11 +557,177 @@ func (s *testSuite3) TestInsertWithAutoidSchema(c *C) { `select * from t7 where id = 3`, testkit.Rows(`3 3`), }, + + // the following is test for insert multiple values. + { + `insert into t8(n) values(1),(2)`, + `select * from t8 where id = 1`, + testkit.Rows(`1 1`), + }, + { + `;`, + `select * from t8 where id = 2`, + testkit.Rows(`2 2`), + }, + { + `;`, + `select last_insert_id();`, + testkit.Rows(`1`), + }, + // test user rebase and auto alloc mixture. + { + `insert into t8 values(null, 3),(-1, -1),(null,4),(null, 5)`, + `select * from t8 where id = 3`, + testkit.Rows(`3 3`), + }, + // -1 won't rebase allocator here cause -1 < base. + { + `;`, + `select * from t8 where id = -1`, + testkit.Rows(`-1 -1`), + }, + { + `;`, + `select * from t8 where id = 4`, + testkit.Rows(`4 4`), + }, + { + `;`, + `select * from t8 where id = 5`, + testkit.Rows(`5 5`), + }, + { + `;`, + `select last_insert_id();`, + testkit.Rows(`3`), + }, + { + `insert into t8 values(null, 6),(10, 7),(null, 8)`, + `select * from t8 where id = 6`, + testkit.Rows(`6 6`), + }, + // 10 will rebase allocator here. + { + `;`, + `select * from t8 where id = 10`, + testkit.Rows(`10 7`), + }, + { + `;`, + `select * from t8 where id = 11`, + testkit.Rows(`11 8`), + }, + { + `;`, + `select last_insert_id()`, + testkit.Rows(`6`), + }, + // fix bug for last_insert_id should be first allocated id in insert rows (skip the rebase id). + { + `insert into t8 values(100, 9),(null,10),(null,11)`, + `select * from t8 where id = 100`, + testkit.Rows(`100 9`), + }, + { + `;`, + `select * from t8 where id = 101`, + testkit.Rows(`101 10`), + }, + { + `;`, + `select * from t8 where id = 102`, + testkit.Rows(`102 11`), + }, + { + `;`, + `select last_insert_id()`, + testkit.Rows(`101`), + }, + // test with sql_mode: NO_AUTO_VALUE_ON_ZERO. + { + `;`, + `select @@sql_mode`, + testkit.Rows(`ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION`), + }, + { + `;`, + "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION,NO_AUTO_VALUE_ON_ZERO`", + nil, + }, + { + `insert into t8 values (0, 12), (null, 13)`, + `select * from t8 where id = 0`, + testkit.Rows(`0 12`), + }, + { + `;`, + `select * from t8 where id = 103`, + testkit.Rows(`103 13`), + }, + { + `;`, + `select last_insert_id()`, + testkit.Rows(`103`), + }, + // test without sql_mode: NO_AUTO_VALUE_ON_ZERO. + { + `;`, + "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION`", + nil, + }, + // value 0 will be substitute by autoid. + { + `insert into t8 values (0, 14), (null, 15)`, + `select * from t8 where id = 104`, + testkit.Rows(`104 14`), + }, + { + `;`, + `select * from t8 where id = 105`, + testkit.Rows(`105 15`), + }, + { + `;`, + `select last_insert_id()`, + testkit.Rows(`104`), + }, + // last test : auto increment allocation can find in retryInfo. + { + `retry : insert into t8 values (null, 16), (null, 17)`, + `select * from t8 where id = 1000`, + testkit.Rows(`1000 16`), + }, + { + `;`, + `select * from t8 where id = 1001`, + testkit.Rows(`1001 17`), + }, + { + `;`, + `select last_insert_id()`, + // this insert doesn't has the last_insert_id, should be same as the last insert case. + testkit.Rows(`104`), + }, } for _, tt := range tests { - tk.MustExec(tt.insert) - tk.MustQuery(tt.query).Check(tt.result) + if strings.HasPrefix(tt.insert, "retry : ") { + // it's the last retry insert case, change the sessionVars. + retryInfo := &variable.RetryInfo{Retrying: true} + retryInfo.AddAutoIncrementID(1000) + retryInfo.AddAutoIncrementID(1001) + tk.Se.GetSessionVars().RetryInfo = retryInfo + tk.MustExec(tt.insert[8:]) + tk.Se.GetSessionVars().RetryInfo = &variable.RetryInfo{} + } else { + tk.MustExec(tt.insert) + } + if tt.query == "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION,NO_AUTO_VALUE_ON_ZERO`" || + tt.query == "set session sql_mode = `ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION`" { + tk.MustExec(tt.query) + } else { + tk.MustQuery(tt.query).Check(tt.result) + } } } diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go old mode 100644 new mode 100755 index 15329e8d7dc77..abd52dde335d9 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -45,9 +45,10 @@ var errInvalidTableID = terror.ClassAutoid.New(codeInvalidTableID, "invalid Tabl // Allocator is an auto increment id generator. // Just keep id unique actually. type Allocator interface { - // Alloc allocs the next autoID for table with tableID. + // Alloc allocs N consecutive autoID for table with tableID, returning (min, max] of the allocated autoID batch. // It gets a batch of autoIDs at a time. So it does not need to access storage for each call. - Alloc(tableID int64) (int64, error) + // The consecutive feature is used to insert multiple rows in a statement. + Alloc(tableID int64, n uint64) (int64, int64, error) // Rebase rebases the autoID base for table with tableID and the new base value. // If allocIDs is true, it will allocate some IDs and save to the cache. // If allocIDs is false, it will not allocate IDs. @@ -220,12 +221,80 @@ func (alloc *allocator) Rebase(tableID, requiredBase int64, allocIDs bool) error return alloc.rebase4Signed(tableID, requiredBase, allocIDs) } -func (alloc *allocator) alloc4Unsigned(tableID int64) (int64, error) { - if alloc.base == alloc.end { // step +// NextStep return new auto id step according to previous step and consuming time. +func NextStep(curStep int64, consumeDur time.Duration) int64 { + failpoint.Inject("mockAutoIDChange", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(step) + } + }) + + consumeRate := defaultConsumeTime.Seconds() / consumeDur.Seconds() + res := int64(float64(curStep) * consumeRate) + if res < minStep { + return minStep + } else if res > maxStep { + return maxStep + } + return res +} + +// NewAllocator returns a new auto increment id generator on the store. +func NewAllocator(store kv.Storage, dbID int64, isUnsigned bool) Allocator { + return &allocator{ + store: store, + dbID: dbID, + isUnsigned: isUnsigned, + step: step, + lastAllocTime: time.Now(), + } +} + +//codeInvalidTableID is the code of autoid error. +const codeInvalidTableID terror.ErrCode = 1 + +var localSchemaID = int64(math.MaxInt64) + +// GenLocalSchemaID generates a local schema ID. +func GenLocalSchemaID() int64 { + return atomic.AddInt64(&localSchemaID, -1) +} + +// Alloc implements autoid.Allocator Alloc interface. +func (alloc *allocator) Alloc(tableID int64, n uint64) (int64, int64, error) { + if tableID == 0 { + return 0, 0, errInvalidTableID.GenWithStackByArgs("Invalid tableID") + } + if n == 0 { + return 0, 0, nil + } + alloc.mu.Lock() + defer alloc.mu.Unlock() + if alloc.isUnsigned { + return alloc.alloc4Unsigned(tableID, n) + } + return alloc.alloc4Signed(tableID, n) +} + +func (alloc *allocator) alloc4Signed(tableID int64, n uint64) (int64, int64, error) { + n1 := int64(n) + // Condition alloc.base+N1 > alloc.end will overflow when alloc.base + N1 > MaxInt64. So need this. + if math.MaxInt64-alloc.base <= n1 { + return 0, 0, ErrAutoincReadFailed + } + // The local rest is not enough for allocN, skip it. + if alloc.base+n1 > alloc.end { var newBase, newEnd int64 startTime := time.Now() + // Although it may skip a segment here, we still think it is consumed. consumeDur := startTime.Sub(alloc.lastAllocTime) - alloc.step = NextStep(alloc.step, consumeDur) + nextStep := NextStep(alloc.step, consumeDur) + // Make sure nextStep is big enough. + if nextStep <= n1 { + alloc.step = mathutil.MinInt64(n1*2, maxStep) + } else { + alloc.step = nextStep + } err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error { m := meta.NewMeta(txn) var err1 error @@ -233,38 +302,53 @@ func (alloc *allocator) alloc4Unsigned(tableID int64) (int64, error) { if err1 != nil { return err1 } - tmpStep := int64(mathutil.MinUint64(math.MaxUint64-uint64(newBase), uint64(alloc.step))) + tmpStep := mathutil.MinInt64(math.MaxInt64-newBase, alloc.step) + // The global rest is not enough for alloc. + if tmpStep < n1 { + return ErrAutoincReadFailed + } newEnd, err1 = m.GenAutoTableID(alloc.dbID, tableID, tmpStep) return err1 }) metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDAlloc, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) if err != nil { - return 0, err + return 0, 0, err } alloc.lastAllocTime = time.Now() - if uint64(newBase) == math.MaxUint64 { - return 0, ErrAutoincReadFailed + if newBase == math.MaxInt64 { + return 0, 0, ErrAutoincReadFailed } alloc.base, alloc.end = newBase, newEnd } - - if uint64(alloc.base)+uint64(1) == math.MaxUint64 { - return 0, ErrAutoincReadFailed - } - alloc.base = int64(uint64(alloc.base) + 1) - logutil.Logger(context.Background()).Debug("alloc unsigned ID", - zap.Uint64("ID", uint64(alloc.base)), + logutil.Logger(context.TODO()).Debug("alloc N signed ID", + zap.Uint64("from ID", uint64(alloc.base)), + zap.Uint64("to ID", uint64(alloc.base+n1)), zap.Int64("table ID", tableID), zap.Int64("database ID", alloc.dbID)) - return alloc.base, nil + min := alloc.base + alloc.base += n1 + return min, alloc.base, nil } -func (alloc *allocator) alloc4Signed(tableID int64) (int64, error) { - if alloc.base == alloc.end { // step +func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64) (int64, int64, error) { + n1 := int64(n) + // Condition alloc.base+n1 > alloc.end will overflow when alloc.base + n1 > MaxInt64. So need this. + if math.MaxUint64-uint64(alloc.base) <= n { + return 0, 0, ErrAutoincReadFailed + } + // The local rest is not enough for alloc, skip it. + if uint64(alloc.base)+n > uint64(alloc.end) { var newBase, newEnd int64 startTime := time.Now() + // Although it may skip a segment here, we still treat it as consumed. consumeDur := startTime.Sub(alloc.lastAllocTime) - alloc.step = NextStep(alloc.step, consumeDur) + nextStep := NextStep(alloc.step, consumeDur) + // Make sure nextStep is big enough. + if nextStep <= n1 { + alloc.step = mathutil.MinInt64(n1*2, maxStep) + } else { + alloc.step = nextStep + } err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error { m := meta.NewMeta(txn) var err1 error @@ -272,80 +356,31 @@ func (alloc *allocator) alloc4Signed(tableID int64) (int64, error) { if err1 != nil { return err1 } - tmpStep := mathutil.MinInt64(math.MaxInt64-newBase, alloc.step) + tmpStep := int64(mathutil.MinUint64(math.MaxUint64-uint64(newBase), uint64(alloc.step))) + // The global rest is not enough for alloc. + if tmpStep < n1 { + return ErrAutoincReadFailed + } newEnd, err1 = m.GenAutoTableID(alloc.dbID, tableID, tmpStep) return err1 }) metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDAlloc, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) if err != nil { - return 0, err + return 0, 0, err } alloc.lastAllocTime = time.Now() - if newBase == math.MaxInt64 { - return 0, ErrAutoincReadFailed + if uint64(newBase) == math.MaxUint64 { + return 0, 0, ErrAutoincReadFailed } alloc.base, alloc.end = newBase, newEnd } - - if alloc.base+1 == math.MaxInt64 { - return 0, ErrAutoincReadFailed - } - alloc.base++ - logutil.Logger(context.Background()).Debug("alloc signed ID", - zap.Uint64("ID", uint64(alloc.base)), + logutil.Logger(context.TODO()).Debug("alloc unsigned ID", + zap.Uint64(" from ID", uint64(alloc.base)), + zap.Uint64("to ID", uint64(alloc.base+n1)), zap.Int64("table ID", tableID), zap.Int64("database ID", alloc.dbID)) - return alloc.base, nil -} - -// Alloc implements autoid.Allocator Alloc interface. -func (alloc *allocator) Alloc(tableID int64) (int64, error) { - if tableID == 0 { - return 0, errInvalidTableID.GenWithStack("Invalid tableID") - } - alloc.mu.Lock() - defer alloc.mu.Unlock() - if alloc.isUnsigned { - return alloc.alloc4Unsigned(tableID) - } - return alloc.alloc4Signed(tableID) -} - -// NextStep return new auto id step according to previous step and consuming time. -func NextStep(curStep int64, consumeDur time.Duration) int64 { - failpoint.Inject("mockAutoIDChange", func(val failpoint.Value) { - if val.(bool) { - failpoint.Return(step) - } - }) - - consumeRate := defaultConsumeTime.Seconds() / consumeDur.Seconds() - res := int64(float64(curStep) * consumeRate) - if res < minStep { - return minStep - } else if res > maxStep { - return maxStep - } - return res -} - -// NewAllocator returns a new auto increment id generator on the store. -func NewAllocator(store kv.Storage, dbID int64, isUnsigned bool) Allocator { - return &allocator{ - store: store, - dbID: dbID, - isUnsigned: isUnsigned, - step: step, - lastAllocTime: time.Now(), - } -} - -//autoid error codes. -const codeInvalidTableID terror.ErrCode = 1 - -var localSchemaID = int64(math.MaxInt64) - -// GenLocalSchemaID generates a local schema ID. -func GenLocalSchemaID() int64 { - return atomic.AddInt64(&localSchemaID, -1) + min := alloc.base + // Use uint64 n directly. + alloc.base = int64(uint64(alloc.base) + n) + return min, alloc.base, nil } diff --git a/meta/autoid/autoid_test.go b/meta/autoid/autoid_test.go index 23961eac150e7..9adacbc82d21b 100644 --- a/meta/autoid/autoid_test.go +++ b/meta/autoid/autoid_test.go @@ -16,6 +16,7 @@ package autoid_test import ( "fmt" "math" + "math/rand" "sync" "testing" "time" @@ -60,6 +61,8 @@ func (*testSuite) TestT(c *C) { c.Assert(err, IsNil) err = m.CreateTableOrView(1, &model.TableInfo{ID: 3, Name: model.NewCIStr("t1")}) c.Assert(err, IsNil) + err = m.CreateTableOrView(1, &model.TableInfo{ID: 4, Name: model.NewCIStr("t2")}) + c.Assert(err, IsNil) return nil }) c.Assert(err, IsNil) @@ -70,13 +73,13 @@ func (*testSuite) TestT(c *C) { globalAutoID, err := alloc.NextGlobalAutoID(1) c.Assert(err, IsNil) c.Assert(globalAutoID, Equals, int64(1)) - id, err := alloc.Alloc(1) + _, id, err := alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(1)) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(2)) - _, err = alloc.Alloc(0) + _, _, err = alloc.Alloc(0, 1) c.Assert(err, NotNil) globalAutoID, err = alloc.NextGlobalAutoID(1) c.Assert(err, IsNil) @@ -85,28 +88,28 @@ func (*testSuite) TestT(c *C) { // rebase err = alloc.Rebase(1, int64(1), true) c.Assert(err, IsNil) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(3)) err = alloc.Rebase(1, int64(3), true) c.Assert(err, IsNil) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(4)) err = alloc.Rebase(1, int64(10), true) c.Assert(err, IsNil) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(11)) err = alloc.Rebase(1, int64(3010), true) c.Assert(err, IsNil) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(3011)) alloc = autoid.NewAllocator(store, 1, false) c.Assert(alloc, NotNil) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(autoid.GetStep()+1)) @@ -114,7 +117,7 @@ func (*testSuite) TestT(c *C) { c.Assert(alloc, NotNil) err = alloc.Rebase(2, int64(1), false) c.Assert(err, IsNil) - id, err = alloc.Alloc(2) + _, id, err = alloc.Alloc(2, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(2)) @@ -126,22 +129,65 @@ func (*testSuite) TestT(c *C) { c.Assert(alloc, NotNil) err = alloc.Rebase(3, int64(3000), false) c.Assert(err, IsNil) - id, err = alloc.Alloc(3) + _, id, err = alloc.Alloc(3, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(3211)) err = alloc.Rebase(3, int64(6543), false) c.Assert(err, IsNil) - id, err = alloc.Alloc(3) + _, id, err = alloc.Alloc(3, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(6544)) // Test the MaxInt64 is the upper bound of `alloc` function but not `rebase`. err = alloc.Rebase(3, int64(math.MaxInt64-1), true) c.Assert(err, IsNil) - _, err = alloc.Alloc(3) + _, _, err = alloc.Alloc(3, 1) c.Assert(alloc, NotNil) err = alloc.Rebase(3, int64(math.MaxInt64), true) c.Assert(err, IsNil) + + // alloc N for signed + alloc = autoid.NewAllocator(store, 1, false) + c.Assert(alloc, NotNil) + globalAutoID, err = alloc.NextGlobalAutoID(4) + c.Assert(err, IsNil) + c.Assert(globalAutoID, Equals, int64(1)) + min, max, err := alloc.Alloc(4, 1) + c.Assert(err, IsNil) + c.Assert(max-min, Equals, int64(1)) + c.Assert(min+1, Equals, int64(1)) + + min, max, err = alloc.Alloc(4, 2) + c.Assert(err, IsNil) + c.Assert(max-min, Equals, int64(2)) + c.Assert(min+1, Equals, int64(2)) + c.Assert(max, Equals, int64(3)) + + min, max, err = alloc.Alloc(4, 100) + c.Assert(err, IsNil) + c.Assert(max-min, Equals, int64(100)) + expected := int64(4) + for i := min + 1; i <= max; i++ { + c.Assert(i, Equals, expected) + expected++ + } + + err = alloc.Rebase(4, int64(1000), false) + c.Assert(err, IsNil) + min, max, err = alloc.Alloc(4, 3) + c.Assert(err, IsNil) + c.Assert(max-min, Equals, int64(3)) + c.Assert(min+1, Equals, int64(1001)) + c.Assert(min+2, Equals, int64(1002)) + c.Assert(max, Equals, int64(1003)) + + lastRemainOne := alloc.End() + err = alloc.Rebase(4, alloc.End()-2, false) + c.Assert(err, IsNil) + min, max, err = alloc.Alloc(4, 5) + c.Assert(err, IsNil) + c.Assert(max-min, Equals, int64(5)) + c.Assert(min+1, Greater, lastRemainOne) } func (*testSuite) TestUnsignedAutoid(c *C) { @@ -164,6 +210,8 @@ func (*testSuite) TestUnsignedAutoid(c *C) { c.Assert(err, IsNil) err = m.CreateTableOrView(1, &model.TableInfo{ID: 3, Name: model.NewCIStr("t1")}) c.Assert(err, IsNil) + err = m.CreateTableOrView(1, &model.TableInfo{ID: 4, Name: model.NewCIStr("t2")}) + c.Assert(err, IsNil) return nil }) c.Assert(err, IsNil) @@ -174,13 +222,13 @@ func (*testSuite) TestUnsignedAutoid(c *C) { globalAutoID, err := alloc.NextGlobalAutoID(1) c.Assert(err, IsNil) c.Assert(globalAutoID, Equals, int64(1)) - id, err := alloc.Alloc(1) + _, id, err := alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(1)) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(2)) - _, err = alloc.Alloc(0) + _, _, err = alloc.Alloc(0, 1) c.Assert(err, NotNil) globalAutoID, err = alloc.NextGlobalAutoID(1) c.Assert(err, IsNil) @@ -189,28 +237,28 @@ func (*testSuite) TestUnsignedAutoid(c *C) { // rebase err = alloc.Rebase(1, int64(1), true) c.Assert(err, IsNil) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(3)) err = alloc.Rebase(1, int64(3), true) c.Assert(err, IsNil) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(4)) err = alloc.Rebase(1, int64(10), true) c.Assert(err, IsNil) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(11)) err = alloc.Rebase(1, int64(3010), true) c.Assert(err, IsNil) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(3011)) alloc = autoid.NewAllocator(store, 1, true) c.Assert(alloc, NotNil) - id, err = alloc.Alloc(1) + _, id, err = alloc.Alloc(1, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(autoid.GetStep()+1)) @@ -218,7 +266,7 @@ func (*testSuite) TestUnsignedAutoid(c *C) { c.Assert(alloc, NotNil) err = alloc.Rebase(2, int64(1), false) c.Assert(err, IsNil) - id, err = alloc.Alloc(2) + _, id, err = alloc.Alloc(2, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(2)) @@ -230,12 +278,12 @@ func (*testSuite) TestUnsignedAutoid(c *C) { c.Assert(alloc, NotNil) err = alloc.Rebase(3, int64(3000), false) c.Assert(err, IsNil) - id, err = alloc.Alloc(3) + _, id, err = alloc.Alloc(3, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(3211)) err = alloc.Rebase(3, int64(6543), false) c.Assert(err, IsNil) - id, err = alloc.Alloc(3) + _, id, err = alloc.Alloc(3, 1) c.Assert(err, IsNil) c.Assert(id, Equals, int64(6544)) @@ -244,11 +292,40 @@ func (*testSuite) TestUnsignedAutoid(c *C) { un := int64(n) err = alloc.Rebase(3, un, true) c.Assert(err, IsNil) - _, err = alloc.Alloc(3) + _, _, err = alloc.Alloc(3, 1) c.Assert(err, NotNil) un = int64(n + 1) err = alloc.Rebase(3, un, true) c.Assert(err, IsNil) + + // alloc N for unsigned + alloc = autoid.NewAllocator(store, 1, true) + c.Assert(alloc, NotNil) + globalAutoID, err = alloc.NextGlobalAutoID(4) + c.Assert(err, IsNil) + c.Assert(globalAutoID, Equals, int64(1)) + + min, max, err := alloc.Alloc(4, 2) + c.Assert(err, IsNil) + c.Assert(max-min, Equals, int64(2)) + c.Assert(min+1, Equals, int64(1)) + c.Assert(max, Equals, int64(2)) + + err = alloc.Rebase(4, int64(500), true) + c.Assert(err, IsNil) + min, max, err = alloc.Alloc(4, 2) + c.Assert(err, IsNil) + c.Assert(max-min, Equals, int64(2)) + c.Assert(min+1, Equals, int64(501)) + c.Assert(max, Equals, int64(502)) + + lastRemainOne := alloc.End() + err = alloc.Rebase(4, alloc.End()-2, false) + c.Assert(err, IsNil) + min, max, err = alloc.Alloc(4, 5) + c.Assert(err, IsNil) + c.Assert(max-min, Equals, int64(5)) + c.Assert(min+1, Greater, lastRemainOne) } // TestConcurrentAlloc is used for the test that @@ -283,7 +360,7 @@ func (*testSuite) TestConcurrentAlloc(c *C) { allocIDs := func() { alloc := autoid.NewAllocator(store, dbID, false) for j := 0; j < int(autoid.GetStep())+5; j++ { - id, err1 := alloc.Alloc(tblID) + _, id, err1 := alloc.Alloc(tblID, 1) if err1 != nil { errCh <- err1 break @@ -297,6 +374,30 @@ func (*testSuite) TestConcurrentAlloc(c *C) { } m[id] = struct{}{} mu.Unlock() + + //test Alloc N + N := rand.Uint64() % 100 + min, max, err1 := alloc.Alloc(tblID, N) + if err1 != nil { + errCh <- err1 + break + } + + errFlag := false + mu.Lock() + for i := min + 1; i <= max; i++ { + if _, ok := m[i]; ok { + errCh <- fmt.Errorf("duplicate id:%v", i) + errFlag = true + mu.Unlock() + break + } + m[i] = struct{}{} + } + if errFlag { + break + } + mu.Unlock() } } for i := 0; i < count; i++ { @@ -336,7 +437,7 @@ func (*testSuite) TestRollbackAlloc(c *C) { injectConf.SetCommitError(errors.New("injected")) injectedStore := kv.NewInjectedStore(store, injectConf) alloc := autoid.NewAllocator(injectedStore, 1, false) - _, err = alloc.Alloc(2) + _, _, err = alloc.Alloc(2, 1) c.Assert(err, NotNil) c.Assert(alloc.Base(), Equals, int64(0)) c.Assert(alloc.End(), Equals, int64(0)) @@ -356,3 +457,34 @@ func (*testSuite) TestNextStep(c *C) { nextStep = autoid.NextStep(50000, 10*time.Minute) c.Assert(nextStep, Equals, int64(1000)) } + +func BenchmarkAllocator_Alloc(b *testing.B) { + b.StopTimer() + store, err := mockstore.NewMockTikvStore() + if err != nil { + return + } + defer store.Close() + dbID := int64(1) + tblID := int64(2) + err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + m := meta.NewMeta(txn) + err = m.CreateDatabase(&model.DBInfo{ID: dbID, Name: model.NewCIStr("a")}) + if err != nil { + return err + } + err = m.CreateTableOrView(dbID, &model.TableInfo{ID: tblID, Name: model.NewCIStr("t")}) + if err != nil { + return err + } + return nil + }) + if err != nil { + return + } + alloc := autoid.NewAllocator(store, 1, false) + b.StartTimer() + for i := 0; i < b.N; i++ { + alloc.Alloc(2, 1) + } +} diff --git a/table/table.go b/table/table.go index 1a3aad7a81d7e..c06fa60186333 100644 --- a/table/table.go +++ b/table/table.go @@ -173,7 +173,20 @@ func AllocAutoIncrementValue(ctx context.Context, t Table, sctx sessionctx.Conte span1 := span.Tracer().StartSpan("table.AllocAutoIncrementValue", opentracing.ChildOf(span.Context())) defer span1.Finish() } - return t.Allocator(sctx).Alloc(t.Meta().ID) + _, max, err := t.Allocator(sctx).Alloc(t.Meta().ID, uint64(1)) + if err != nil { + return 0, err + } + return max, err +} + +// AllocBatchAutoIncrementValue allocates batch auto_increment value (min and max] for rows. +func AllocBatchAutoIncrementValue(ctx context.Context, t Table, sctx sessionctx.Context, N int) (int64, int64, error) { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("table.AllocBatchAutoIncrementValue", opentracing.ChildOf(span.Context())) + defer span1.Finish() + } + return t.Allocator(sctx).Alloc(t.Meta().ID, uint64(N)) } // PhysicalTable is an abstraction for two kinds of table representation: partition or non-partitioned table. diff --git a/table/tables/tables.go b/table/tables/tables.go index e56d77677df71..7ca78677ab508 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -916,7 +916,7 @@ func GetColDefaultValue(ctx sessionctx.Context, col *table.Column, defaultVals [ // AllocHandle implements table.Table AllocHandle interface. func (t *tableCommon) AllocHandle(ctx sessionctx.Context) (int64, error) { - rowID, err := t.Allocator(ctx).Alloc(t.tableID) + _, rowID, err := t.Allocator(ctx).Alloc(t.tableID, 1) if err != nil { return 0, err }