Skip to content

Commit

Permalink
cherry pick pingcap#17271 to release-2.1
Browse files Browse the repository at this point in the history
Signed-off-by: sre-bot <[email protected]>
  • Loading branch information
AilinKid authored and sre-bot committed Jun 1, 2020
1 parent 5a50d6a commit 321ff26
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 0 deletions.
165 changes: 165 additions & 0 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,99 @@ func (alloc *allocator) Rebase(tableID, requiredBase int64, allocIDs bool) error
return alloc.rebase4Signed(tableID, requiredBase, allocIDs)
}

<<<<<<< HEAD
=======
// Rebase implements autoid.Allocator RebaseSeq interface.
// The return value is quite same as expression function, bool means whether it should be NULL,
// here it will be used in setval expression function (true meaning the set value has been satisfied, return NULL).
// case1:When requiredBase is satisfied with current value, it will return (0, true, nil),
// case2:When requiredBase is successfully set in, it will return (requiredBase, false, nil).
// If some error occurs in the process, return it immediately.
func (alloc *allocator) RebaseSeq(tableID, requiredBase int64) (int64, bool, error) {
if tableID == 0 {
return 0, false, errInvalidTableID.GenWithStack("Invalid tableID")
}

alloc.mu.Lock()
defer alloc.mu.Unlock()
return alloc.rebase4Sequence(tableID, requiredBase)
}

func (alloc *allocator) GetType() AllocatorType {
return alloc.allocType
}

// NextStep return new auto id step according to previous step and consuming time.
func NextStep(curStep int64, consumeDur time.Duration) int64 {
failpoint.Inject("mockAutoIDCustomize", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(3)
}
})
failpoint.Inject("mockAutoIDChange", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(step)
}
})

consumeRate := defaultConsumeTime.Seconds() / consumeDur.Seconds()
res := int64(float64(curStep) * consumeRate)
if res < minStep {
return minStep
} else if res > maxStep {
return maxStep
}
return res
}

// NewAllocator returns a new auto increment id generator on the store.
func NewAllocator(store kv.Storage, dbID int64, isUnsigned bool, allocType AllocatorType, opts ...AllocOption) Allocator {
alloc := &allocator{
store: store,
dbID: dbID,
isUnsigned: isUnsigned,
step: step,
lastAllocTime: time.Now(),
allocType: allocType,
}
for _, fn := range opts {
fn.ApplyOn(alloc)
}
return alloc
}

// NewSequenceAllocator returns a new sequence value generator on the store.
func NewSequenceAllocator(store kv.Storage, dbID int64, info *model.SequenceInfo) Allocator {
return &allocator{
store: store,
dbID: dbID,
// Sequence allocator is always signed.
isUnsigned: false,
lastAllocTime: time.Now(),
allocType: SequenceType,
sequence: info,
}
}

// NewAllocatorsFromTblInfo creates an array of allocators of different types with the information of model.TableInfo.
func NewAllocatorsFromTblInfo(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) Allocators {
var allocs []Allocator
dbID := tblInfo.GetDBID(schemaID)
if tblInfo.AutoIdCache > 0 {
allocs = append(allocs, NewAllocator(store, dbID, tblInfo.IsAutoIncColUnsigned(), RowIDAllocType, CustomAutoIncCacheOption(tblInfo.AutoIdCache)))
} else {
allocs = append(allocs, NewAllocator(store, dbID, tblInfo.IsAutoIncColUnsigned(), RowIDAllocType))
}
if tblInfo.ContainsAutoRandomBits() {
allocs = append(allocs, NewAllocator(store, dbID, tblInfo.IsAutoRandomBitColUnsigned(), AutoRandomType))
}
if tblInfo.IsSequence() {
allocs = append(allocs, NewSequenceAllocator(store, dbID, tblInfo.Sequence))
}
return NewAllocators(allocs...)
}

>>>>>>> 9162cfa... meta: fix the allocator batch size compute logic (#17271)
// Alloc implements autoid.Allocator Alloc interface.
func (alloc *allocator) Alloc(tableID int64, n uint64) (int64, int64, error) {
if tableID == 0 {
Expand All @@ -244,6 +337,7 @@ func (alloc *allocator) alloc4Signed(tableID int64, n uint64) (int64, int64, err
if alloc.base+n1 > alloc.end {
var newBase, newEnd int64
startTime := time.Now()
<<<<<<< HEAD
// Although it may skip a segment here, we still think it is consumed.
consumeDur := startTime.Sub(alloc.lastAllocTime)
nextStep := NextStep(alloc.step, consumeDur)
Expand All @@ -252,6 +346,13 @@ func (alloc *allocator) alloc4Signed(tableID int64, n uint64) (int64, int64, err
alloc.step = mathutil.MinInt64(n1*2, maxStep)
} else {
alloc.step = nextStep
=======
nextStep := alloc.step
if !alloc.customStep {
// Although it may skip a segment here, we still think it is consumed.
consumeDur := startTime.Sub(alloc.lastAllocTime)
nextStep = NextStep(alloc.step, consumeDur)
>>>>>>> 9162cfa... meta: fix the allocator batch size compute logic (#17271)
}
err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
Expand All @@ -260,7 +361,17 @@ func (alloc *allocator) alloc4Signed(tableID int64, n uint64) (int64, int64, err
if err1 != nil {
return errors.Trace(err1)
}
<<<<<<< HEAD
tmpStep := mathutil.MinInt64(math.MaxInt64-newBase, alloc.step)
=======
// CalcNeededBatchSize calculates the total batch size needed on global base.
n1 = CalcNeededBatchSize(newBase, int64(n), increment, offset, alloc.isUnsigned)
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep < n1 {
nextStep = n1
}
tmpStep := mathutil.MinInt64(math.MaxInt64-newBase, nextStep)
>>>>>>> 9162cfa... meta: fix the allocator batch size compute logic (#17271)
// The global rest is not enough for alloc.
if tmpStep < n1 {
return ErrAutoincReadFailed
Expand All @@ -272,6 +383,10 @@ func (alloc *allocator) alloc4Signed(tableID int64, n uint64) (int64, int64, err
if err != nil {
return 0, 0, err
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
alloc.lastAllocTime = time.Now()
if newBase == math.MaxInt64 {
return 0, 0, ErrAutoincReadFailed
Expand All @@ -298,6 +413,7 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64) (int64, int64, e
if uint64(alloc.base)+n > uint64(alloc.end) {
var newBase, newEnd int64
startTime := time.Now()
<<<<<<< HEAD
// Although it may skip a segment here, we still treat it as consumed.
consumeDur := startTime.Sub(alloc.lastAllocTime)
nextStep := NextStep(alloc.step, consumeDur)
Expand All @@ -306,6 +422,13 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64) (int64, int64, e
alloc.step = mathutil.MinInt64(n1*2, maxStep)
} else {
alloc.step = nextStep
=======
nextStep := alloc.step
if !alloc.customStep {
// Although it may skip a segment here, we still treat it as consumed.
consumeDur := startTime.Sub(alloc.lastAllocTime)
nextStep = NextStep(alloc.step, consumeDur)
>>>>>>> 9162cfa... meta: fix the allocator batch size compute logic (#17271)
}
err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
Expand All @@ -314,7 +437,17 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64) (int64, int64, e
if err1 != nil {
return errors.Trace(err1)
}
<<<<<<< HEAD
tmpStep := int64(mathutil.MinUint64(math.MaxUint64-uint64(newBase), uint64(alloc.step)))
=======
// CalcNeededBatchSize calculates the total batch size needed on new base.
n1 = CalcNeededBatchSize(newBase, int64(n), increment, offset, alloc.isUnsigned)
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep < n1 {
nextStep = n1
}
tmpStep := int64(mathutil.MinUint64(math.MaxUint64-uint64(newBase), uint64(nextStep)))
>>>>>>> 9162cfa... meta: fix the allocator batch size compute logic (#17271)
// The global rest is not enough for alloc.
if tmpStep < n1 {
return ErrAutoincReadFailed
Expand All @@ -326,6 +459,10 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64) (int64, int64, e
if err != nil {
return 0, 0, err
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
alloc.lastAllocTime = time.Now()
if uint64(newBase) == math.MaxUint64 {
return 0, 0, ErrAutoincReadFailed
Expand Down Expand Up @@ -401,10 +538,38 @@ func (alloc *memoryAllocator) NextGlobalAutoID(tableID int64) (int64, error) {
return memID + 1, nil
}

<<<<<<< HEAD
// Rebase implements autoid.Allocator Rebase interface.
func (alloc *memoryAllocator) Rebase(tableID, newBase int64, allocIDs bool) error {
// TODO: implement it.
return nil
=======
// TestModifyBaseAndEndInjection exported for testing modifying the base and end.
func TestModifyBaseAndEndInjection(alloc Allocator, base, end int64) {
alloc.(*allocator).mu.Lock()
alloc.(*allocator).base = base
alloc.(*allocator).end = end
alloc.(*allocator).mu.Unlock()
}

// AutoRandomIDLayout is used to calculate the bits length of different section in auto_random id.
// The primary key with auto_random can only be `bigint` column, the total layout length of auto random is 64 bits.
// These are two type of layout:
// 1. Signed bigint:
// | [sign_bit] | [shard_bits] | [incremental_bits] |
// sign_bit(1 fixed) + shard_bits(15 max) + incremental_bits(the rest) = total_layout_bits(64 fixed)
// 2. Unsigned bigint:
// | [shard_bits] | [incremental_bits] |
// shard_bits(15 max) + incremental_bits(the rest) = total_layout_bits(64 fixed)
// Please always use NewAutoRandomIDLayout() to instantiate.
type AutoRandomIDLayout struct {
FieldType *types.FieldType
ShardBits uint64
// Derived fields.
TypeBitsLength uint64
IncrementalBits uint64
HasSignBit bool
>>>>>>> 9162cfa... meta: fix the allocator batch size compute logic (#17271)
}

// Alloc implements autoid.Allocator Alloc interface.
Expand Down
51 changes: 51 additions & 0 deletions meta/autoid/autoid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,54 @@ func BenchmarkAllocator_Alloc(b *testing.B) {
alloc.Alloc(2, 1)
}
}

// Fix a computation logic bug in allocator computation.
func (*testSuite) TestAllocComputationIssue(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/meta/autoid/mockAutoIDCustomize", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/meta/autoid/mockAutoIDCustomize"), IsNil)
}()

store, err := mockstore.NewMockStore()
c.Assert(err, IsNil)
defer store.Close()

err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
err = m.CreateDatabase(&model.DBInfo{ID: 1, Name: model.NewCIStr("a")})
c.Assert(err, IsNil)
err = m.CreateTableOrView(1, &model.TableInfo{ID: 1, Name: model.NewCIStr("t")})
c.Assert(err, IsNil)
err = m.CreateTableOrView(1, &model.TableInfo{ID: 2, Name: model.NewCIStr("t1")})
c.Assert(err, IsNil)
return nil
})
c.Assert(err, IsNil)

// Since the test here is applicable to any type of allocators, autoid.RowIDAllocType is chosen.
unsignedAlloc := autoid.NewAllocator(store, 1, true, autoid.RowIDAllocType)
c.Assert(unsignedAlloc, NotNil)
signedAlloc := autoid.NewAllocator(store, 1, false, autoid.RowIDAllocType)
c.Assert(signedAlloc, NotNil)

// the next valid two value must be 13 & 16, batch size = 6.
err = unsignedAlloc.Rebase(1, 10, false)
c.Assert(err, IsNil)
// the next valid two value must be 10 & 13, batch size = 6.
err = signedAlloc.Rebase(2, 7, false)
c.Assert(err, IsNil)
// Simulate the rest cache is not enough for next batch, assuming 10 & 13, batch size = 4.
autoid.TestModifyBaseAndEndInjection(unsignedAlloc, 9, 9)
// Simulate the rest cache is not enough for next batch, assuming 10 & 13, batch size = 4.
autoid.TestModifyBaseAndEndInjection(signedAlloc, 4, 6)

// Here will recompute the new allocator batch size base on new base = 10, which will get 6.
min, max, err := unsignedAlloc.Alloc(1, 2, 3, 1)
c.Assert(err, IsNil)
c.Assert(min, Equals, int64(10))
c.Assert(max, Equals, int64(16))
min, max, err = signedAlloc.Alloc(2, 2, 3, 1)
c.Assert(err, IsNil)
c.Assert(min, Equals, int64(7))
c.Assert(max, Equals, int64(13))
}

0 comments on commit 321ff26

Please sign in to comment.