Skip to content

Commit

Permalink
fix setval in another session return num when it is already satisfied…
Browse files Browse the repository at this point in the history
… in meta
  • Loading branch information
AilinKid committed Mar 30, 2020
1 parent 08c4c66 commit 22d501e
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
13 changes: 13 additions & 0 deletions ddl/sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,19 @@ func (s *testSequenceSuite) TestSequenceFunction(c *C) {
s.tk.MustQuery("select setval(seq, -10)").Check(testkit.Rows("-10"))
s.tk.MustQuery("select setval(seq, -5)").Check(testkit.Rows("<nil>"))
s.tk.MustExec("drop sequence seq")

// test the current value already satisfied setval in other session.
s.tk.MustExec("create sequence seq")
s.tk.MustQuery("select setval(seq, 100)").Check(testkit.Rows("100"))
se, err := session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
tk1 := testkit.NewTestKit(c, s.store)
tk1.Se = se
tk1.MustExec("use test")
tk1.MustQuery("select setval(seq, 50)").Check(testkit.Rows("<nil>"))
tk1.MustQuery("select setval(seq, 100)").Check(testkit.Rows("<nil>"))
tk1.MustQuery("select setval(seq, 101)").Check(testkit.Rows("101"))
s.tk.MustExec("drop sequence seq")
}

func (s *testSequenceSuite) TestInsertSequence(c *C) {
Expand Down
35 changes: 29 additions & 6 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ type Allocator interface {
// If allocIDs is true, it will allocate some IDs and save to the cache.
// If allocIDs is false, it will not allocate IDs.
Rebase(tableID, newBase int64, allocIDs bool) error

// RebaseSeq rebases the sequence value in number axis with tableID and the new base value.
RebaseSeq(table, newBase int64) (int64, bool, error)

// Base return the current base of Allocator.
Base() int64
// End is only used for test.
Expand Down Expand Up @@ -270,8 +274,9 @@ func (alloc *allocator) rebase4Signed(tableID, requiredBase int64, allocIDs bool
}

// rebase4Sequence won't alloc batch immediately, cause it won't cache value in allocator.
func (alloc *allocator) rebase4Sequence(tableID, requiredBase int64) error {
func (alloc *allocator) rebase4Sequence(tableID, requiredBase int64) (int64, bool, error) {
startTime := time.Now()
alreadySatisfied := false
err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
currentEnd, err := getAutoIDByAllocType(m, alloc.dbID, tableID, alloc.allocType)
Expand All @@ -281,11 +286,13 @@ func (alloc *allocator) rebase4Sequence(tableID, requiredBase int64) error {
if alloc.sequence.Increment > 0 {
if currentEnd >= requiredBase {
// Required base satisfied, we don't need to update KV.
alreadySatisfied = true
return nil
}
} else {
if currentEnd <= requiredBase {
// Required base satisfied, we don't need to update KV.
alreadySatisfied = true
return nil
}
}
Expand All @@ -299,9 +306,12 @@ func (alloc *allocator) rebase4Sequence(tableID, requiredBase int64) error {
// TODO: sequence metrics
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDRebase, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
if err != nil {
return err
return 0, false, err
}
return nil
if alreadySatisfied {
return 0, true, nil
}
return requiredBase, false, nil
}

// Rebase implements autoid.Allocator Rebase interface.
Expand All @@ -315,15 +325,28 @@ func (alloc *allocator) Rebase(tableID, requiredBase int64, allocIDs bool) error
alloc.mu.Lock()
defer alloc.mu.Unlock()

if alloc.allocType == SequenceType {
return alloc.rebase4Sequence(tableID, requiredBase)
}
if alloc.isUnsigned {
return alloc.rebase4Unsigned(tableID, uint64(requiredBase), allocIDs)
}
return alloc.rebase4Signed(tableID, requiredBase, allocIDs)
}

// Rebase implements autoid.Allocator RebaseSeq interface.
// the return value is quite same as expression function, bool means whether it should be NULL,
// here it will be used in setval expression function (true meaning the set value has been satisfied, return NULL).
// case1:When requiredBase is satisfied with current value, it will return (0, true, nil),
// case2:When requiredBase is successfully set in, it will return (requiredBase, false, nil).
// If some error occurs in the process, return it immediately.
func (alloc *allocator) RebaseSeq(tableID, requiredBase int64) (int64, bool, error) {
if tableID == 0 {
return 0, false, errInvalidTableID.GenWithStack("Invalid tableID")
}

alloc.mu.Lock()
defer alloc.mu.Unlock()
return alloc.rebase4Sequence(tableID, requiredBase)
}

func (alloc *allocator) GetType() AllocatorType {
return alloc.allocType
}
Expand Down
23 changes: 15 additions & 8 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,10 +1295,15 @@ func (t *TableCommon) GetSequenceNextVal(ctx interface{}, dbName, seqName string
if err1 != nil {
return err1
}
seq.base, seq.end, seq.round, err1 = sequenceAlloc.AllocSeqCache(t.tableID)
var base, end, round int64
base, end, round, err1 = sequenceAlloc.AllocSeqCache(t.tableID)
if err1 != nil {
return err1
}
// Only update local cache when alloc succeed.
seq.base = base
seq.end = end
seq.round = round
// write sequence binlog to the pumpClient.
if ctx.(sessionctx.Context).GetSessionVars().BinlogClient != nil {
err = writeSequenceUpdateValueBinlog(ctx.(sessionctx.Context), dbName, seqName, seq.end)
Expand Down Expand Up @@ -1367,15 +1372,17 @@ func (t *TableCommon) SetSequenceVal(ctx interface{}, newVal int64, dbName, seqN
if err != nil {
return 0, false, err
}
err = sequenceAlloc.Rebase(t.tableID, newVal, false)
res, alreadySatisfied, err := sequenceAlloc.RebaseSeq(t.tableID, newVal)
if err != nil {
return 0, false, err
}
// write sequence binlog to the pumpClient.
if ctx.(sessionctx.Context).GetSessionVars().BinlogClient != nil {
err = writeSequenceUpdateValueBinlog(ctx.(sessionctx.Context), dbName, seqName, seq.end)
if err != nil {
return 0, false, err
if !alreadySatisfied {
// write sequence binlog to the pumpClient.
if ctx.(sessionctx.Context).GetSessionVars().BinlogClient != nil {
err = writeSequenceUpdateValueBinlog(ctx.(sessionctx.Context), dbName, seqName, seq.end)
if err != nil {
return 0, false, err
}
}
}
// Record the current end after setval succeed.
Expand All @@ -1384,7 +1391,7 @@ func (t *TableCommon) SetSequenceVal(ctx interface{}, newVal int64, dbName, seqN
// setval(seq, 100) setval(seq, 50)
// Because no cache (base, end keep 0), so the second setval won't return NULL.
t.sequence.base, t.sequence.end = newVal, newVal
return newVal, false, nil
return res, alreadySatisfied, nil
}

// getOffset is used in under GetSequenceNextVal & SetSequenceVal, which mu is locked.
Expand Down

0 comments on commit 22d501e

Please sign in to comment.