Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl, meta: support altering auto_increment ID to a smaller value #25868

Merged
merged 10 commits into from
Jul 13, 2021
64 changes: 64 additions & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl_test
import (
"context"
"fmt"
"math"
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -2621,6 +2622,69 @@ func (s *testSerialDBSuite1) TestAutoIncrementTableOption(c *C) {
tk.MustQuery("select * from t;").Check(testkit.Rows("12345678901234567890"))
}

func (s *testIntegrationSuite3) TestAutoIncrementForce(c *C) {
bb7133 marked this conversation as resolved.
Show resolved Hide resolved
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("drop database if exists auto_inc_force;")
tk.MustExec("create database auto_inc_force;")
tk.MustExec("use auto_inc_force;")
getNextGlobalID := func() uint64 {
gidStr := tk.MustQuery("show table t next_row_id").Rows()[0][3]
gid, err := strconv.ParseUint(gidStr.(string), 10, 64)
c.Assert(err, IsNil)
return gid
}
// rebase _tidb_row_id.
tk.MustExec("create table t (a int);")
tk.MustExec("insert into t values (1),(2);")
tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("1 1", "2 2"))
tk.MustExec("alter table t force auto_increment = 1;")
c.Assert(getNextGlobalID(), Equals, uint64(1))
// inserting new rows can overwrite the existing data.
tk.MustExec("insert into t values (3);")
tk.MustExec("insert into t values (3);")
tk.MustQuery("select a, _tidb_rowid from t;").Check(testkit.Rows("3 1", "3 2"))
Comment on lines +2647 to +2649
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we better report duplicate errors instead of direct overwrite existing data?

for auto pk handle, it seem will report dup err:

mysql> create table t (a bigint primary key auto_increment);
Query OK, 0 rows affected (0.01 sec)

mysql> alter table t FORCE auto_increment = 20;
Query OK, 0 rows affected (0.00 sec)

mysql> insert into t values();
Query OK, 1 row affected (0.00 sec)

mysql> alter table t FORCE auto_increment = 20;
Query OK, 0 rows affected (0.01 sec)

mysql> insert into t values();
ERROR 1062 (23000): Duplicate entry '20' for key 'PRIMARY'


// rebase auto_increment.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int primary key auto_increment, b int);")
tk.MustExec("insert into t values (1, 1);")
tk.MustExec("insert into t values (100000000, 1);")
tk.MustExec("delete from t where a = 100000000;")
c.Assert(getNextGlobalID(), Greater, uint64(100000000))
tk.MustExec("alter table t /*T![force_inc] force */ auto_increment = 2;")
c.Assert(getNextGlobalID(), Equals, uint64(2))
tk.MustExec("insert into t(b) values (2);")
tk.MustQuery("select a, b from t;").Check(testkit.Rows("1 1", "2 2"))

// rebase auto_random.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint primary key auto_random(5));")
tk.MustExec("insert into t values ();")
tk.MustExec("set @@allow_auto_random_explicit_insert = true")
tk.MustExec("insert into t values (100000000);")
tk.MustExec("delete from t where a = 100000000;")
c.Assert(getNextGlobalID(), Greater, uint64(100000000))
tk.MustExec("alter table t force auto_random_base = 2;")
c.Assert(getNextGlobalID(), Equals, uint64(2))
tk.MustExec("insert into t values ();")
tk.MustQuery("select (a & 3) from t order by 1;").Check(testkit.Rows("1", "2"))

// change next_global_id.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a bigint primary key auto_increment);")
tk.MustExec("insert into t values (1);")
bases := []uint64{1, 65535, 10, math.MaxUint64, math.MaxInt64 + 1, 1, math.MaxUint64, math.MaxInt64, 2}
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
for _, b := range bases {
tk.MustExec(fmt.Sprintf("alter table t force auto_increment = %d;", b))
c.Assert(getNextGlobalID(), Equals, b)
}
tk.MustExec("insert into t values ();")
tk.MustQuery("select a from t;").Check(testkit.Rows("1", "2"))

// cannot set next global ID to 0.
tk.MustGetErrCode("alter table t force auto_increment = 0;", errno.ErrAutoincReadFailed)
}

tangenta marked this conversation as resolved.
Show resolved Hide resolved
func (s *testIntegrationSuite3) TestIssue20490(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test;")
Expand Down
37 changes: 24 additions & 13 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2532,15 +2532,15 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
}
err = d.ShardRowID(ctx, ident, opt.UintValue)
case ast.TableOptionAutoIncrement:
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue), autoid.RowIDAllocType)
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue), autoid.RowIDAllocType, opt.BoolValue)
case ast.TableOptionAutoIdCache:
if opt.UintValue > uint64(math.MaxInt64) {
// TODO: Refine this error.
return errors.New("table option auto_id_cache overflows int64")
}
err = d.AlterTableAutoIDCache(ctx, ident, int64(opt.UintValue))
case ast.TableOptionAutoRandomBase:
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue), autoid.AutoRandomType)
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue), autoid.AutoRandomType, opt.BoolValue)
case ast.TableOptionComment:
spec.Comment = opt.StrValue
err = d.AlterTableComment(ctx, ident, spec)
Expand Down Expand Up @@ -2596,7 +2596,7 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
return nil
}

func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64, tp autoid.AllocatorType) error {
func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64, tp autoid.AllocatorType, force bool) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, ident)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -2625,31 +2625,42 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6
actionType = model.ActionRebaseAutoID
}

if alloc := t.Allocators(ctx).Get(tp); alloc != nil {
autoID, err := alloc.NextGlobalAutoID(t.Meta().ID)
if !force {
newBase, err = adjustNewBaseToNextGlobalID(ctx, t, tp, newBase)
if err != nil {
return errors.Trace(err)
return err
}
// If newBase < autoID, we need to do a rebase before returning.
// Assume there are 2 TiDB servers: TiDB-A with allocator range of 0 ~ 30000; TiDB-B with allocator range of 30001 ~ 60000.
// If the user sends SQL `alter table t1 auto_increment = 100` to TiDB-B,
// and TiDB-B finds 100 < 30001 but returns without any handling,
// then TiDB-A may still allocate 99 for auto_increment column. This doesn't make sense for the user.
newBase = int64(mathutil.MaxUint64(uint64(newBase), uint64(autoID)))
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: actionType,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{newBase},
Args: []interface{}{newBase, force},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func adjustNewBaseToNextGlobalID(ctx sessionctx.Context, t table.Table, tp autoid.AllocatorType, newBase int64) (int64, error) {
alloc := t.Allocators(ctx).Get(tp)
if alloc == nil {
return newBase, nil
}
autoID, err := alloc.NextGlobalAutoID(t.Meta().ID)
if err != nil {
return newBase, errors.Trace(err)
}
// If newBase < autoID, we need to do a rebase before returning.
// Assume there are 2 TiDB servers: TiDB-A with allocator range of 0 ~ 30000; TiDB-B with allocator range of 30001 ~ 60000.
// If the user sends SQL `alter table t1 auto_increment = 100` to TiDB-B,
// and TiDB-B finds 100 < 30001 but returns without any handling,
// then TiDB-A may still allocate 99 for auto_increment column. This doesn't make sense for the user.
return int64(mathutil.MaxUint64(uint64(newBase), uint64(autoID))), nil
}

// ShardRowID shards the implicit row ID by adding shard value to the row ID's first few bits.
func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint64) error {
schema, t, err := d.getSchemaAndTableByIdent(ctx, tableIdent)
Expand Down
14 changes: 11 additions & 3 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,11 @@ func onRebaseAutoRandomType(store kv.Storage, t *meta.Meta, job *model.Job) (ver

func onRebaseAutoID(store kv.Storage, t *meta.Meta, job *model.Job, tp autoid.AllocatorType) (ver int64, _ error) {
schemaID := job.SchemaID
var newBase int64
err := job.DecodeArgs(&newBase)
var (
newBase int64
force bool
)
err := job.DecodeArgs(&newBase, &force)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand All @@ -581,8 +584,13 @@ func onRebaseAutoID(store kv.Storage, t *meta.Meta, job *model.Job, tp autoid.Al
if alloc := tbl.Allocators(nil).Get(tp); alloc != nil {
// The next value to allocate is `newBase`.
newEnd := newBase - 1
err = alloc.Rebase(tblInfo.ID, newEnd, false)
if force {
err = alloc.ForceRebase(tblInfo.ID, newEnd)
} else {
err = alloc.Rebase(tblInfo.ID, newEnd, false)
}
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
}
Expand Down
40 changes: 38 additions & 2 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ type Allocator interface {
// If allocIDs is false, it will not allocate IDs.
Rebase(tableID, newBase int64, allocIDs bool) error

// ForceRebase set the next global auto ID to newBase.
ForceRebase(tableID, newBase int64) error

// RebaseSeq rebases the sequence value in number axis with tableID and the new base value.
RebaseSeq(table, newBase int64) (int64, bool, error)

Expand Down Expand Up @@ -370,16 +373,49 @@ func (alloc *allocator) Rebase(tableID, requiredBase int64, allocIDs bool) error
if tableID == 0 {
return errInvalidTableID.GenWithStack("Invalid tableID")
}

alloc.mu.Lock()
defer alloc.mu.Unlock()

if alloc.isUnsigned {
return alloc.rebase4Unsigned(tableID, uint64(requiredBase), allocIDs)
}
return alloc.rebase4Signed(tableID, requiredBase, allocIDs)
}

// ForceRebase implements autoid.Allocator ForceRebase interface.
func (alloc *allocator) ForceRebase(tableID, requiredBase int64) error {
if tableID <= 0 {
return errInvalidTableID.GenWithStack("Invalid tableID")
}
if requiredBase == -1 {
return ErrAutoincReadFailed.GenWithStack("Cannot force rebase the next global ID to '0'")
}
alloc.mu.Lock()
defer alloc.mu.Unlock()
startTime := time.Now()
err := kv.RunInNewTxn(context.Background(), alloc.store, true, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
currentEnd, err1 := GetAutoID(m, alloc.dbID, tableID, alloc.allocType)
if err1 != nil {
return err1
}
var step int64
if !alloc.isUnsigned {
step = requiredBase - currentEnd
} else {
uRequiredBase, uCurrentEnd := uint64(requiredBase), uint64(currentEnd)
step = int64(uRequiredBase - uCurrentEnd)
}
_, err1 = GenerateAutoID(m, alloc.dbID, tableID, step, alloc.allocType)
return err1
})
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDRebase, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err != nil {
return err
}
alloc.base, alloc.end = requiredBase, requiredBase
return nil
}

// Rebase implements autoid.Allocator RebaseSeq interface.
// The return value is quite same as expression function, bool means whether it should be NULL,
// here it will be used in setval expression function (true meaning the set value has been satisfied, return NULL).
Expand Down
6 changes: 6 additions & 0 deletions meta/autoid/memid.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ func (alloc *inMemoryAllocator) Rebase(tableID, requiredBase int64, allocIDs boo
return nil
}

// ForceRebase implements autoid.Allocator ForceRebase interface.
func (alloc *inMemoryAllocator) ForceRebase(tableID, requiredBase int64) error {
alloc.base = requiredBase
return nil
}

func (alloc *inMemoryAllocator) alloc4Signed(n uint64, increment, offset int64) (int64, int64, error) {
// Check offset rebase if necessary.
if offset-1 > alloc.base {
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,14 @@ func (s *testBinlogSuite) TestAddSpecialComment(c *C) {
"create table t1 (id int, a varchar(255) key clustered);",
"create table t1 (id int, a varchar(255) key /*T![clustered_index] clustered */ );",
},
{
"alter table t force auto_increment = 12;",
"alter table t /*T![force_inc] force */ auto_increment = 12;",
},
{
"alter table t force, auto_increment = 12;",
"alter table t force, auto_increment = 12;",
},
}
for _, ca := range testCase {
re := binloginfo.AddSpecialComment(ca.input)
Expand Down
4 changes: 4 additions & 0 deletions types/parser_driver/special_cmt_ctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
parser.SpecialCommentsController.Register(string(FeatureIDAutoIDCache))
parser.SpecialCommentsController.Register(string(FeatureIDAutoRandomBase))
parser.SpecialCommentsController.Register(string(FeatureClusteredIndex))
parser.SpecialCommentsController.Register(string(FeatureForceAutoInc))
}

// SpecialCommentVersionPrefix is the prefix of TiDB executable comments.
Expand All @@ -61,6 +62,8 @@ const (
FeatureIDAutoRandomBase featureID = "auto_rand_base"
// FeatureClusteredIndex is the `clustered_index` feature.
FeatureClusteredIndex featureID = "clustered_index"
// FeatureForceAutoInc is the `force auto_increment` feature.
FeatureForceAutoInc featureID = "force_inc"
)

// FeatureIDPatterns is used to record special comments patterns.
Expand All @@ -69,4 +72,5 @@ var FeatureIDPatterns = map[featureID]*regexp.Regexp{
FeatureIDAutoIDCache: regexp.MustCompile(`(?P<REPLACE>(?i)AUTO_ID_CACHE\s*=?\s*\d+\s*)`),
FeatureIDAutoRandomBase: regexp.MustCompile(`(?P<REPLACE>(?i)AUTO_RANDOM_BASE\s*=?\s*\d+\s*)`),
FeatureClusteredIndex: regexp.MustCompile(`(?i)(PRIMARY)?\s+KEY(\s*\(.*\))?\s+(?P<REPLACE>(NON)?CLUSTERED\b)`),
FeatureForceAutoInc: regexp.MustCompile(`(?P<REPLACE>(?i)FORCE)\b\s*AUTO_INCREMENT\s*`),
}