Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sessiontxn/staleread: more accurate stale ts (#44272) #45760

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,19 @@ type calibrateResourceExec struct {
done bool
}

func (e *calibrateResourceExec) parseCalibrateDuration() (startTime time.Time, endTime time.Time, err error) {
func (e *calibrateResourceExec) parseCalibrateDuration(ctx context.Context) (startTime time.Time, endTime time.Time, err error) {
var dur time.Duration
var ts uint64
for _, op := range e.optionList {
switch op.Tp {
case ast.CalibrateStartTime:
ts, err = staleread.CalculateAsOfTsExpr(e.ctx, op.Ts)
ts, err = staleread.CalculateAsOfTsExpr(ctx, e.ctx, op.Ts)
if err != nil {
return
}
startTime = oracle.GetTimeFromTS(ts)
case ast.CalibrateEndTime:
ts, err = staleread.CalculateAsOfTsExpr(e.ctx, op.Ts)
ts, err = staleread.CalculateAsOfTsExpr(ctx, e.ctx, op.Ts)
if err != nil {
return
}
Expand Down Expand Up @@ -202,7 +202,7 @@ var (
)

func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
startTs, endTs, err := e.parseCalibrateDuration()
startTs, endTs, err := e.parseCalibrateDuration(ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.J
}

func (e *DDLExec) executeFlashBackCluster(s *ast.FlashBackToTimestampStmt) error {
flashbackTS, err := staleread.CalculateAsOfTsExpr(e.ctx, s.FlashbackTS)
flashbackTS, err := staleread.CalculateAsOfTsExpr(context.Background(), e.ctx, s.FlashbackTS)
if err != nil {
return err
}
Expand Down
25 changes: 25 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1396,3 +1396,28 @@ func TestStalePrepare(t *testing.T) {
tk.MustQuery("execute stmt").Check(expected)
}
}

func TestStaleTSO(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")

tk.MustExec("insert into t values(1)")

asOfExprs := []string{
"now(3) - interval 1 second",
"current_time() - interval 1 second",
"curtime() - interval 1 second",
}

nextTSO := oracle.GoTimeToTS(time.Now().Add(2 * time.Second))
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO", fmt.Sprintf("return(%d)", nextTSO)))
defer failpoint.Disable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO")
for _, expr := range asOfExprs {
// Make sure the now() expr is evaluated from the stale ts provider.
tk.MustQuery("select * from t as of timestamp " + expr + " order by id asc").Check(testkit.Rows("1"))
}
}
12 changes: 12 additions & 0 deletions expression/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/logutil"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

func boolToInt64(v bool) int64 {
Expand Down Expand Up @@ -158,6 +161,15 @@ func getStmtTimestamp(ctx sessionctx.Context) (time.Time, error) {
failpoint.Return(v, nil)
})

if ctx != nil {
staleTSO, err := ctx.GetSessionVars().StmtCtx.GetStaleTSO()
if staleTSO != 0 && err == nil {
return oracle.GetTimeFromTS(staleTSO), nil
} else if err != nil {
logutil.BgLogger().Error("get stale tso failed", zap.Error(err))
}
}

now := time.Now()

if ctx == nil {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3530,7 +3530,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
case *ast.BeginStmt:
readTS := b.ctx.GetSessionVars().TxnReadTS.PeakTxnReadTS()
if raw.AsOf != nil {
startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf.TsExpr)
startTS, err := staleread.CalculateAsOfTsExpr(ctx, b.ctx, raw.AsOf.TsExpr)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ const allowedTimeFromNow = 100 * time.Millisecond

// ValidateStaleReadTS validates that readTS does not exceed the current time not strictly.
func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error {
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
currentTS, err := sctx.GetSessionVars().StmtCtx.GetStaleTSO()
if currentTS == 0 || err != nil {
currentTS, err = sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
}
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD
if err != nil {
metrics.ValidateReadTSFromPDCount.Inc()
Expand Down
32 changes: 32 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,12 @@ type StatementContext struct {
useChunkAlloc bool
// Check if TiFlash read engine is removed due to strict sql mode.
TiFlashEngineRemovedDueToStrictSQLMode bool
// StaleTSOProvider is used to provide stale timestamp oracle for read-only transactions.
StaleTSOProvider struct {
sync.Mutex
value *uint64
eval func() (uint64, error)
}
}

// StmtHints are SessionVars related sql hints.
Expand Down Expand Up @@ -1196,6 +1202,32 @@ func (sc *StatementContext) DetachMemDiskTracker() {
}
}

// SetStaleTSOProvider sets the stale TSO provider.
func (sc *StatementContext) SetStaleTSOProvider(eval func() (uint64, error)) {
sc.StaleTSOProvider.Lock()
defer sc.StaleTSOProvider.Unlock()
sc.StaleTSOProvider.value = nil
sc.StaleTSOProvider.eval = eval
}

// GetStaleTSO returns the TSO for stale-read usage which calculate from PD's last response.
func (sc *StatementContext) GetStaleTSO() (uint64, error) {
sc.StaleTSOProvider.Lock()
defer sc.StaleTSOProvider.Unlock()
if sc.StaleTSOProvider.value != nil {
return *sc.StaleTSOProvider.value, nil
}
if sc.StaleTSOProvider.eval == nil {
return 0, nil
}
tso, err := sc.StaleTSOProvider.eval()
if err != nil {
return 0, err
}
sc.StaleTSOProvider.value = &tso
return tso, nil
}

// CopTasksDetails collects some useful information of cop-tasks during execution.
type CopTasksDetails struct {
NumCopTasks int
Expand Down
1 change: 1 addition & 0 deletions sessiontxn/staleread/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//types",
"//util/dbterror",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//oracle",
],
)
Expand Down
2 changes: 1 addition & 1 deletion sessiontxn/staleread/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as
return 0, nil
}

ts, err := CalculateAsOfTsExpr(sctx, asOf.TsExpr)
ts, err := CalculateAsOfTsExpr(ctx, sctx, asOf.TsExpr)
if err != nil {
return 0, err
}
Expand Down
12 changes: 11 additions & 1 deletion sessiontxn/staleread/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
Expand All @@ -29,7 +30,16 @@ import (
)

// CalculateAsOfTsExpr calculates the TsExpr of AsOfClause to get a StartTS.
func CalculateAsOfTsExpr(sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) {
func CalculateAsOfTsExpr(ctx context.Context, sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) {
sctx.GetSessionVars().StmtCtx.SetStaleTSOProvider(func() (uint64, error) {
failpoint.Inject("mockStaleReadTSO", func(val failpoint.Value) (uint64, error) {
return uint64(val.(int)), nil
})
// this function accepts a context, but we don't need it when there is a valid cached ts.
// in most cases, the stale read ts can be calculated from `cached ts + time since cache - staleness`,
// this can be more accurate than `time.Now() - staleness`, because TiDB's local time can drift.
return sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
})
tsVal, err := expression.EvalAstExpr(sctx, tsExpr)
if err != nil {
return 0, err
Expand Down