Skip to content

Commit

Permalink
session: fix the duplicate binding case when updating bind info (ping…
Browse files Browse the repository at this point in the history
…cap#22367) (pingcap#22368)

Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
ti-srebot authored Jan 12, 2021
1 parent 98820cf commit dbade8c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 27 deletions.
73 changes: 48 additions & 25 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,11 +1133,20 @@ func insertBuiltinBindInfoRow(s Session) {
mustExecute(s, sql)
}

type bindInfo struct {
bindSQL string
status string
createTime types.Time
charset string
collation string
source string
}

func upgradeToVer51(s Session, ver int64) {
if ver >= version51 {
return
}
bindMap := make(map[string]types.Time)
bindMap := make(map[string]bindInfo)
h := &bindinfo.BindHandle{}
var err error
mustExecute(s, "BEGIN PESSIMISTIC")
Expand All @@ -1152,7 +1161,11 @@ func upgradeToVer51(s Session, ver int64) {
}()
mustExecute(s, h.LockBindInfoSQL())
var recordSets []sqlexec.RecordSet
recordSets, err = s.ExecuteInternal(context.Background(), "SELECT original_sql, bind_sql, default_db, charset, collation, update_time from mysql.bind_info where source != 'builtin'")
recordSets, err = s.ExecuteInternal(context.Background(),
`SELECT bind_sql, default_db, status, create_time, charset, collation, source
FROM mysql.bind_info
WHERE source != 'builtin'
ORDER BY update_time DESC`)
if err != nil {
logutil.BgLogger().Fatal("upgradeToVer61 error", zap.Error(err))
}
Expand All @@ -1171,42 +1184,52 @@ func upgradeToVer51(s Session, ver int64) {
if req.NumRows() == 0 {
break
}
updateBindInfo(s, iter, p, now.String(), bindMap)
updateBindInfo(iter, p, bindMap)
}

mustExecute(s, "DELETE FROM mysql.bind_info where source != 'builtin'")
for original, bind := range bindMap {
mustExecute(s, fmt.Sprintf("INSERT INTO mysql.bind_info VALUES(%s, %s, '', %s, %s, %s, %s, %s, %s)",
expression.Quote(original),
expression.Quote(bind.bindSQL),
expression.Quote(bind.status),
expression.Quote(bind.createTime.String()),
expression.Quote(now.String()),
expression.Quote(bind.charset),
expression.Quote(bind.collation),
expression.Quote(bind.source),
))
}
}

func updateBindInfo(s Session, iter *chunk.Iterator4Chunk, p *parser.Parser, now string, bindMap map[string]types.Time) {
func updateBindInfo(iter *chunk.Iterator4Chunk, p *parser.Parser, bindMap map[string]bindInfo) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
original := row.GetString(0)
bind := row.GetString(1)
db := row.GetString(2)
charset := row.GetString(3)
collation := row.GetString(4)
updateTime := row.GetTime(5)
bind := row.GetString(0)
db := row.GetString(1)
charset := row.GetString(4)
collation := row.GetString(5)
stmt, err := p.ParseOneStmt(bind, charset, collation)
if err != nil {
logutil.BgLogger().Fatal("updateBindInfo error", zap.Error(err))
}
originWithDB := parser.Normalize(utilparser.RestoreWithDefaultDB(stmt, db))
if latest, ok := bindMap[originWithDB]; ok {
// In the following cases, duplicate originWithDB may occur
if _, ok := bindMap[originWithDB]; ok {
// The results are sorted in descending order of time.
// And in the following cases, duplicate originWithDB may occur
// originalText |bindText |DB
// `select * from t` |`select /*+ use_index(t, idx) */ * from t` |`test`
// `select * from test.t` |`select /*+ use_index(t, idx) */ * from test.t`|``
// If repeated, we will keep the latest binding.
if updateTime.Compare(latest) <= 0 {
continue
}
mustExecute(s, fmt.Sprintf(`DELETE FROM mysql.bind_info WHERE original_sql=%s`, expression.Quote(original)))
original = originWithDB
// Therefore, if repeated, we can skip to keep the latest binding.
continue
}
bindMap[originWithDB] = bindInfo{
bindSQL: utilparser.RestoreWithDefaultDB(stmt, db),
status: row.GetString(2),
createTime: row.GetTime(3),
charset: charset,
collation: collation,
source: row.GetString(6),
}
bindMap[originWithDB] = updateTime
bindWithDB := utilparser.RestoreWithDefaultDB(stmt, db)
mustExecute(s, fmt.Sprintf(`UPDATE mysql.bind_info SET original_sql=%s, bind_sql=%s, default_db='', update_time=%s where original_sql=%s`,
expression.Quote(originWithDB),
expression.Quote(bindWithDB),
expression.Quote(now),
expression.Quote(original)))
}
}

Expand Down
5 changes: 3 additions & 2 deletions session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,11 @@ func (s *testBootstrapSuite) TestUpdateDuplicateBindInfo(c *C) {
se := newSession(c, store, s.dbName)
mustExecSQL(c, se, `insert into mysql.bind_info values('select * from t', 'select /*+ use_index(t, idx_a)*/ * from t', 'test', 'using', '2021-01-04 14:50:58.257', '2021-01-04 14:50:58.257', 'utf8', 'utf8_general_ci', 'manual')`)
// The latest one.
mustExecSQL(c, se, `insert into mysql.bind_info values('select * from test.t', 'select /*+ use_index(t, idx_b)*/ * from test.t', 'test', 'using', '2021-01-04 14:50:58.257', '2021-01-09 14:50:58.257', 'utf8', 'utf8_general_ci', 'manual')`)
mustExecSQL(c, se, `insert into mysql.bind_info values('select * from test . t', 'select /*+ use_index(t, idx_b)*/ * from test.t', 'test', 'using', '2021-01-04 14:50:58.257', '2021-01-09 14:50:58.257', 'utf8', 'utf8_general_ci', 'manual')`)

upgradeToVer51(se, version50)

r := mustExecSQL(c, se, `select original_sql, bind_sql, default_db, status from mysql.bind_info where source != 'builtin'`)
r := mustExecSQL(c, se, `select original_sql, bind_sql, default_db, status, create_time from mysql.bind_info where source != 'builtin'`)
req := r.NewChunk()
c.Assert(r.Next(ctx, req), IsNil)
c.Assert(req.NumRows(), Equals, 1)
Expand All @@ -519,6 +519,7 @@ func (s *testBootstrapSuite) TestUpdateDuplicateBindInfo(c *C) {
c.Assert(row.GetString(1), Equals, "SELECT /*+ use_index(t idx_b)*/ * FROM test.t")
c.Assert(row.GetString(2), Equals, "")
c.Assert(row.GetString(3), Equals, "using")
c.Assert(row.GetTime(4).String(), Equals, "2021-01-04 14:50:58.257")
c.Assert(r.Close(), IsNil)
mustExecSQL(c, se, "delete from mysql.bind_info where original_sql = 'select * from test . t'")
}

0 comments on commit dbade8c

Please sign in to comment.