Skip to content

Commit

Permalink
cherry pick pingcap#17271 to release-3.0
Browse files Browse the repository at this point in the history
Signed-off-by: sre-bot <[email protected]>
  • Loading branch information
AilinKid authored and sre-bot committed Jun 1, 2020
1 parent 30209e3 commit 3c1b30b
Show file tree
Hide file tree
Showing 2 changed files with 542 additions and 16 deletions.
243 changes: 227 additions & 16 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ func (alloc *allocator) Rebase(tableID, requiredBase int64, allocIDs bool) error

// NextStep return new auto id step according to previous step and consuming time.
func NextStep(curStep int64, consumeDur time.Duration) int64 {
failpoint.Inject("mockAutoIDCustomize", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(3)
}
})
failpoint.Inject("mockAutoIDChange", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(step)
Expand Down Expand Up @@ -387,21 +392,19 @@ func (alloc *allocator) alloc4Signed(tableID int64, n uint64, increment, offset
consumeDur := startTime.Sub(alloc.lastAllocTime)
nextStep = NextStep(alloc.step, consumeDur)
}
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep <= n1 {
nextStep = mathutil.MinInt64(n1*2, maxStep)
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err1 error
newBase, err1 = m.GetAutoTableID(alloc.dbID, tableID)
if err1 != nil {
return err1
}
// CalcNeededBatchSize calculates the total batch size needed on global base.
n1 = CalcNeededBatchSize(newBase, int64(n), increment, offset, alloc.isUnsigned)
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep < n1 {
nextStep = n1
}
tmpStep := mathutil.MinInt64(math.MaxInt64-newBase, nextStep)
// The global rest is not enough for alloc.
if tmpStep < n1 {
Expand All @@ -414,6 +417,10 @@ func (alloc *allocator) alloc4Signed(tableID int64, n uint64, increment, offset
if err != nil {
return 0, 0, err
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
alloc.lastAllocTime = time.Now()
if newBase == math.MaxInt64 {
return 0, 0, ErrAutoincReadFailed
Expand Down Expand Up @@ -454,21 +461,19 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64, increment, offse
consumeDur := startTime.Sub(alloc.lastAllocTime)
nextStep = NextStep(alloc.step, consumeDur)
}
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep <= n1 {
nextStep = mathutil.MinInt64(n1*2, maxStep)
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
var err1 error
newBase, err1 = m.GetAutoTableID(alloc.dbID, tableID)
if err1 != nil {
return err1
}
// CalcNeededBatchSize calculates the total batch size needed on new base.
n1 = CalcNeededBatchSize(newBase, int64(n), increment, offset, alloc.isUnsigned)
// Although the step is customized by user, we still need to make sure nextStep is big enough for insert batch.
if nextStep < n1 {
nextStep = n1
}
tmpStep := int64(mathutil.MinUint64(math.MaxUint64-uint64(newBase), uint64(nextStep)))
// The global rest is not enough for alloc.
if tmpStep < n1 {
Expand All @@ -481,6 +486,10 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64, increment, offse
if err != nil {
return 0, 0, err
}
// Store the step for non-customized-step allocator to calculate next dynamic step.
if !alloc.customStep {
alloc.step = nextStep
}
alloc.lastAllocTime = time.Now()
if uint64(newBase) == math.MaxUint64 {
return 0, 0, ErrAutoincReadFailed
Expand All @@ -497,3 +506,205 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64, increment, offse
alloc.base = int64(uint64(alloc.base) + uint64(n1))
return min, alloc.base, nil
}
<<<<<<< HEAD
=======

// alloc4Sequence is used to alloc value for sequence, there are several aspects different from autoid logic.
// 1: sequence allocation don't need check rebase.
// 2: sequence allocation don't need auto step.
// 3: sequence allocation may have negative growth.
// 4: sequence allocation batch length can be dissatisfied.
// 5: sequence batch allocation will be consumed immediately.
func (alloc *allocator) alloc4Sequence(tableID int64) (min int64, max int64, round int64, err error) {
increment := alloc.sequence.Increment
offset := alloc.sequence.Start
minValue := alloc.sequence.MinValue
maxValue := alloc.sequence.MaxValue
cacheSize := alloc.sequence.CacheValue
if !alloc.sequence.Cache {
cacheSize = 1
}

var newBase, newEnd int64
startTime := time.Now()
err = kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
var (
err1 error
seqStep int64
)
// Get the real offset if the sequence is in cycle.
// round is used to count cycle times in sequence with cycle option.
if alloc.sequence.Cycle {
// GetSequenceCycle is used to get the flag `round`, which indicates whether the sequence is already in cycle.
round, err1 = m.GetSequenceCycle(alloc.dbID, tableID)
if err1 != nil {
return err1
}
if round > 0 {
if increment > 0 {
offset = alloc.sequence.MinValue
} else {
offset = alloc.sequence.MaxValue
}
}
}

// Get the global new base.
newBase, err1 = getAutoIDByAllocType(m, alloc.dbID, tableID, alloc.allocType)
if err1 != nil {
return err1
}

// CalcNeededBatchSize calculates the total batch size needed.
seqStep, err1 = CalcSequenceBatchSize(newBase, cacheSize, increment, offset, minValue, maxValue)

if err1 != nil && err1 == ErrAutoincReadFailed {
if !alloc.sequence.Cycle {
return err1
}
// Reset the sequence base and offset.
if alloc.sequence.Increment > 0 {
newBase = alloc.sequence.MinValue - 1
offset = alloc.sequence.MinValue
} else {
newBase = alloc.sequence.MaxValue + 1
offset = alloc.sequence.MaxValue
}
err1 = m.SetSequenceValue(alloc.dbID, tableID, newBase)
if err1 != nil {
return err1
}

// Reset sequence round state value.
round++
// SetSequenceCycle is used to store the flag `round` which indicates whether the sequence is already in cycle.
// round > 0 means the sequence is already in cycle, so the offset should be minvalue / maxvalue rather than sequence.start.
// TiDB is a stateless node, it should know whether the sequence is already in cycle when restart.
err1 = m.SetSequenceCycle(alloc.dbID, tableID, round)
if err1 != nil {
return err1
}

// Recompute the sequence next batch size.
seqStep, err1 = CalcSequenceBatchSize(newBase, cacheSize, increment, offset, minValue, maxValue)
if err1 != nil {
return err1
}
}
var delta int64
if alloc.sequence.Increment > 0 {
delta = seqStep
} else {
delta = -seqStep
}
newEnd, err1 = generateAutoIDByAllocType(m, alloc.dbID, tableID, delta, alloc.allocType)
return err1
})

// TODO: sequence metrics
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDAlloc, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err != nil {
return 0, 0, 0, err
}
logutil.Logger(context.TODO()).Debug("alloc sequence value",
zap.Uint64(" from value", uint64(newBase)),
zap.Uint64("to value", uint64(newEnd)),
zap.Int64("table ID", tableID),
zap.Int64("database ID", alloc.dbID))
return newBase, newEnd, round, nil
}

func getAutoIDByAllocType(m *meta.Meta, dbID, tableID int64, allocType AllocatorType) (int64, error) {
switch allocType {
// Currently, row id allocator and auto-increment value allocator shares the same key-value pair.
case RowIDAllocType, AutoIncrementType:
return m.GetAutoTableID(dbID, tableID)
case AutoRandomType:
return m.GetAutoRandomID(dbID, tableID)
case SequenceType:
return m.GetSequenceValue(dbID, tableID)
default:
return 0, ErrInvalidAllocatorType.GenWithStackByArgs()
}
}

func generateAutoIDByAllocType(m *meta.Meta, dbID, tableID, step int64, allocType AllocatorType) (int64, error) {
switch allocType {
case RowIDAllocType, AutoIncrementType:
return m.GenAutoTableID(dbID, tableID, step)
case AutoRandomType:
return m.GenAutoRandomID(dbID, tableID, step)
case SequenceType:
return m.GenSequenceValue(dbID, tableID, step)
default:
return 0, ErrInvalidAllocatorType.GenWithStackByArgs()
}
}

const signMask uint64 = 0x8000000000000000

// EncodeIntToCmpUint make int v to comparable uint type
func EncodeIntToCmpUint(v int64) uint64 {
return uint64(v) ^ signMask
}

// DecodeCmpUintToInt decodes the u that encoded by EncodeIntToCmpUint
func DecodeCmpUintToInt(u uint64) int64 {
return int64(u ^ signMask)
}

// TestModifyBaseAndEndInjection exported for testing modifying the base and end.
func TestModifyBaseAndEndInjection(alloc Allocator, base, end int64) {
alloc.(*allocator).mu.Lock()
alloc.(*allocator).base = base
alloc.(*allocator).end = end
alloc.(*allocator).mu.Unlock()
}

// AutoRandomIDLayout is used to calculate the bits length of different section in auto_random id.
// The primary key with auto_random can only be `bigint` column, the total layout length of auto random is 64 bits.
// These are two type of layout:
// 1. Signed bigint:
// | [sign_bit] | [shard_bits] | [incremental_bits] |
// sign_bit(1 fixed) + shard_bits(15 max) + incremental_bits(the rest) = total_layout_bits(64 fixed)
// 2. Unsigned bigint:
// | [shard_bits] | [incremental_bits] |
// shard_bits(15 max) + incremental_bits(the rest) = total_layout_bits(64 fixed)
// Please always use NewAutoRandomIDLayout() to instantiate.
type AutoRandomIDLayout struct {
FieldType *types.FieldType
ShardBits uint64
// Derived fields.
TypeBitsLength uint64
IncrementalBits uint64
HasSignBit bool
}

// NewAutoRandomIDLayout create an instance of AutoRandomIDLayout.
func NewAutoRandomIDLayout(fieldType *types.FieldType, shardBits uint64) *AutoRandomIDLayout {
typeBitsLength := uint64(mysql.DefaultLengthOfMysqlTypes[mysql.TypeLonglong] * 8)
incrementalBits := typeBitsLength - shardBits
hasSignBit := !mysql.HasUnsignedFlag(fieldType.Flag)
if hasSignBit {
incrementalBits -= 1
}
return &AutoRandomIDLayout{
FieldType: fieldType,
ShardBits: shardBits,
TypeBitsLength: typeBitsLength,
IncrementalBits: incrementalBits,
HasSignBit: hasSignBit,
}
}

// IncrementalBitsCapacity returns the max capacity of incremental section of the current layout.
func (l *AutoRandomIDLayout) IncrementalBitsCapacity() uint64 {
return uint64(math.Pow(2, float64(l.IncrementalBits)) - 1)
}

// IncrementalMask returns 00..0[11..1], where [xxx] is the incremental section of the current layout.
func (l *AutoRandomIDLayout) IncrementalMask() int64 {
return (1 << l.IncrementalBits) - 1
}
>>>>>>> 9162cfa... meta: fix the allocator batch size compute logic (#17271)
Loading

0 comments on commit 3c1b30b

Please sign in to comment.