Skip to content

Commit

Permalink
Make tidb_bound_staleness deterministic
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Apr 28, 2021
1 parent c9309ba commit 7d8c27f
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 24 deletions.
3 changes: 3 additions & 0 deletions expression/builtin_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -7185,6 +7185,9 @@ func (b *builtinTiDBBoundStalenessSig) evalInt(row chunk.Row) (int64, bool, erro
injectTS := val.(int)
minResolveTS = uint64(injectTS)
})
// Try to get from the stmt cache to make sure this function is deterministic.
stmtCtx := b.ctx.GetSessionVars().StmtCtx
minResolveTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtResolveTsCacheKey, minResolveTS).(uint64)
// For a resolved TS t and a time range [t1, t2]:
// 1. If t < t1, we will use t1 as the result,
// and with it, a read request may fail because it's an unreached resolved TS.
Expand Down
26 changes: 24 additions & 2 deletions expression/builtin_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ func (s *testEvaluatorSuite) TestTime(c *C) {
}

func resetStmtContext(ctx sessionctx.Context) {
ctx.GetSessionVars().StmtCtx.ResetNowTs()
ctx.GetSessionVars().StmtCtx.ResetStmtCache()
}

func (s *testEvaluatorSuite) TestNowAndUTCTimestamp(c *C) {
Expand Down Expand Up @@ -2941,8 +2941,30 @@ func (s *testEvaluatorSuite) TestTiDBBoundStaleness(c *C) {
} else {
c.Assert(d.GetInt64(), Equals, test.expect)
}
failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS")
resetStmtContext(s.ctx)
}

// Test whether it's deterministic.
resolveTS1 := oracle.ComposeTS(t2.Add(-1*time.Second).Unix()*1000, 0)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectResolveTS",
fmt.Sprintf("return(%v)", resolveTS1)), IsNil)
f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)}))
c.Assert(err, IsNil)
d, err := evalBuiltinFunc(f, chunk.Row{})
c.Assert(err, IsNil)
c.Assert(d.GetInt64(), Equals, int64(resolveTS1))
// ResolveTS updated.
resolveTS2 := oracle.ComposeTS(t2.Add(1*time.Second).Unix()*1000, 0)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectResolveTS",
fmt.Sprintf("return(%v)", resolveTS2)), IsNil)
f, err = fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)}))
c.Assert(err, IsNil)
d, err = evalBuiltinFunc(f, chunk.Row{})
c.Assert(err, IsNil)
// Still resolveTS1
c.Assert(d.GetInt64(), Equals, int64(resolveTS1))
resetStmtContext(s.ctx)
failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS")
}

func (s *testEvaluatorSuite) TestGetIntervalFromDecimal(c *C) {
Expand Down
20 changes: 12 additions & 8 deletions expression/builtin_time_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -882,6 +883,17 @@ func (b *builtinTiDBBoundStalenessSig) vecEvalInt(input *chunk.Chunk, result *ch
args0 := buf0.Times()
args1 := buf1.Times()
i64s := result.Int64s()
var minResolveTS uint64
if store := b.ctx.GetStore(); store != nil {
minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope())
}
failpoint.Inject("injectResolveTS", func(val failpoint.Value) {
injectTS := val.(int)
minResolveTS = uint64(injectTS)
})
// Try to get from the stmt cache to make sure this function is deterministic.
stmtCtx := b.ctx.GetSessionVars().StmtCtx
minResolveTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtResolveTsCacheKey, minResolveTS).(uint64)
for i := 0; i < n; i++ {
if result.IsNull(i) {
continue
Expand Down Expand Up @@ -916,14 +928,6 @@ func (b *builtinTiDBBoundStalenessSig) vecEvalInt(input *chunk.Chunk, result *ch
continue
}
minTS, maxTS := oracle.ComposeTS(minTime.Unix()*1000, 0), oracle.ComposeTS(maxTime.Unix()*1000, 0)
var minResolveTS uint64
if store := b.ctx.GetStore(); store != nil {
minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope())
}
failpoint.Inject("injectResolveTS", func(val failpoint.Value) {
injectTS := val.(int)
minResolveTS = uint64(injectTS)
})
if minResolveTS < minTS {
i64s[i] = int64(minTS)
} else if minTS <= minResolveTS && minResolveTS <= maxTS {
Expand Down
1 change: 1 addition & 0 deletions expression/builtin_time_vec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{
geners: []dataGenerator{newRangeInt64Gener(0, math.MaxInt64)},
},
},
// Todo: how to inject the resolveTS for better testing.
ast.TiDBBoundStaleness: {
{
retEvalType: types.ETInt,
Expand Down
3 changes: 2 additions & 1 deletion expression/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
Expand Down Expand Up @@ -155,5 +156,5 @@ func getStmtTimestamp(ctx sessionctx.Context) (time.Time, error) {
return time.Unix(timestamp, 0), nil
}
stmtCtx := ctx.GetSessionVars().StmtCtx
return stmtCtx.GetNowTsCached(), nil
return stmtCtx.GetOrStoreStmtCache(stmtctx.StmtNowTsCacheKey, time.Now()).(time.Time), nil
}
2 changes: 1 addition & 1 deletion expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2343,8 +2343,8 @@ func (s *testIntegrationSuite2) TestTimeBuiltin(c *C) {
} else {
result.Check(testkit.Rows(fmt.Sprintf("%d", test.expect)))
}
failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS")
}
failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS")

// fix issue 10308
result = tk.MustQuery("select time(\"- -\");")
Expand Down
41 changes: 29 additions & 12 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ type StatementContext struct {
RuntimeStatsColl *execdetails.RuntimeStatsColl
TableIDs []int64
IndexNames []string
nowTs time.Time // use this variable for now/current_timestamp calculation/cache for one stmt
stmtTimeCached bool
StmtType string
OriginalSQL string
digestMemo struct {
Expand All @@ -164,6 +162,9 @@ type StatementContext struct {
TblInfo2UnionScan map[*model.TableInfo]bool
TaskID uint64 // unique ID for an execution of a statement
TaskMapBakTS uint64 // counter for

// stmtCache is used to store some statement-related values.
stmtCache map[StmtCacheKey]interface{}
}

// StmtHints are SessionVars related sql hints.
Expand Down Expand Up @@ -195,19 +196,35 @@ func (sh *StmtHints) TaskMapNeedBackUp() bool {
return sh.ForceNthPlan != -1
}

// GetNowTsCached getter for nowTs, if not set get now time and cache it
func (sc *StatementContext) GetNowTsCached() time.Time {
if !sc.stmtTimeCached {
now := time.Now()
sc.nowTs = now
sc.stmtTimeCached = true
// StmtCacheKey represents the key type in the StmtCache.
type StmtCacheKey int

const (
// StmtNowTsCacheKey is a variable for now/current_timestamp calculation/cache for one stmt.
StmtNowTsCacheKey StmtCacheKey = iota
// StmtNowTsCacheKey is a variable for resolveTs calculation/cache for one stmt.
StmtResolveTsCacheKey
)

// GetFromStmtCache gets the cached value of the given key if it exists, otherwise will store the value.
func (sc *StatementContext) GetOrStoreStmtCache(key StmtCacheKey, value interface{}) interface{} {
if sc.stmtCache == nil {
sc.stmtCache = make(map[StmtCacheKey]interface{})
}
if _, ok := sc.stmtCache[key]; !ok {
sc.stmtCache[key] = value
}
return sc.nowTs
return sc.stmtCache[key]
}

// ResetInStmtCache resets the cache of given key.
func (sc *StatementContext) ResetInStmtCache(key StmtCacheKey) {
delete(sc.stmtCache, key)
}

// ResetNowTs resetter for nowTs, clear cached time flag
func (sc *StatementContext) ResetNowTs() {
sc.stmtTimeCached = false
// ResetStmtCache resets all cached values.
func (sc *StatementContext) ResetStmtCache() {
sc.stmtCache = make(map[StmtCacheKey]interface{})
}

// SQLDigest gets normalized and digest for provided sql.
Expand Down

0 comments on commit 7d8c27f

Please sign in to comment.