Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: implement tidb_bounded_staleness built-in function #24328

Merged
merged 20 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,9 +1102,10 @@ func (s *testSuite5) TestShowBuiltin(c *C) {
res := tk.MustQuery("show builtins;")
c.Assert(res, NotNil)
rows := res.Rows()
c.Assert(268, Equals, len(rows))
const builtinFuncNum = 269
c.Assert(builtinFuncNum, Equals, len(rows))
c.Assert("abs", Equals, rows[0][0].(string))
c.Assert("yearweek", Equals, rows[267][0].(string))
c.Assert("yearweek", Equals, rows[builtinFuncNum-1][0].(string))
}

func (s *testSuite5) TestShowClusterConfig(c *C) {
Expand Down
4 changes: 3 additions & 1 deletion expression/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,9 @@ var funcs = map[string]functionClass{
ast.Year: &yearFunctionClass{baseFunctionClass{ast.Year, 1, 1}},
ast.YearWeek: &yearWeekFunctionClass{baseFunctionClass{ast.YearWeek, 1, 2}},
ast.LastDay: &lastDayFunctionClass{baseFunctionClass{ast.LastDay, 1, 1}},
// TSO functions
ast.ReadTSIn: &readTSInFunctionClass{baseFunctionClass{ast.ReadTSIn, 2, 2}},
ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}},

// string functions
ast.ASCII: &asciiFunctionClass{baseFunctionClass{ast.ASCII, 1, 1}},
Expand Down Expand Up @@ -881,7 +884,6 @@ var funcs = map[string]functionClass{
// This function is used to show tidb-server version info.
ast.TiDBVersion: &tidbVersionFunctionClass{baseFunctionClass{ast.TiDBVersion, 0, 0}},
ast.TiDBIsDDLOwner: &tidbIsDDLOwnerFunctionClass{baseFunctionClass{ast.TiDBIsDDLOwner, 0, 0}},
ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}},
ast.TiDBDecodePlan: &tidbDecodePlanFunctionClass{baseFunctionClass{ast.TiDBDecodePlan, 1, 1}},

// TiDB Sequence function.
Expand Down
75 changes: 75 additions & 0 deletions expression/builtin_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
Expand Down Expand Up @@ -7113,3 +7114,77 @@ func handleInvalidZeroTime(ctx sessionctx.Context, t types.Time) (bool, error) {
}
return true, handleInvalidTimeError(ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, t.String()))
}

// readTSInFunctionClass reads a time window [a, b] and compares it with the latest resolvedTS
// to determine which TS to use in a read only transaction.
type readTSInFunctionClass struct {
baseFunctionClass
}

func (c *readTSInFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) {
if err := c.verifyArgs(args); err != nil {
return nil, err
}
bf, err := newBaseBuiltinFuncWithTp(ctx, c.funcName, args, types.ETInt, types.ETDatetime, types.ETDatetime)
if err != nil {
return nil, err
}
sig := &builtinReadTSInSig{bf}
return sig, nil
}

type builtinReadTSInSig struct {
baseBuiltinFunc
}

func (b *builtinReadTSInSig) Clone() builtinFunc {
newSig := &builtinTidbParseTsoSig{}
newSig.cloneFrom(&b.baseBuiltinFunc)
return newSig
}

func (b *builtinReadTSInSig) evalInt(row chunk.Row) (int64, bool, error) {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
leftTime, isNull, err := b.args[0].EvalTime(b.ctx, row)
if isNull || err != nil {
return 0, true, handleInvalidTimeError(b.ctx, err)
}
rightTime, isNull, err := b.args[1].EvalTime(b.ctx, row)
if isNull || err != nil {
return 0, true, handleInvalidTimeError(b.ctx, err)
}
if invalidLeftTime, invalidRightTime := leftTime.InvalidZero(), rightTime.InvalidZero(); invalidLeftTime || invalidRightTime {
if invalidLeftTime {
err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, leftTime.String()))
}
if invalidRightTime {
err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, rightTime.String()))
}
return 0, true, err
}
minTime, err := leftTime.GoTime(getTimeZone(b.ctx))
if err != nil {
return 0, true, err
}
maxTime, err := rightTime.GoTime(getTimeZone(b.ctx))
if err != nil {
return 0, true, err
}
if minTime.After(maxTime) {
return 0, true, handleInvalidTimeError(b.ctx, types.ErrWrongValue.FastGenByArgs("left time must be less then the right time"))
}
minTS, maxTS := oracle.ComposeTS(minTime.UnixNano()/int64(time.Millisecond), 0), oracle.ComposeTS(maxTime.UnixNano()/int64(time.Millisecond), 0)
var minResolveTS uint64
if store := b.ctx.GetStore(); store != nil {
minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope())
}
failpoint.Inject("injectResolveTS", func(val failpoint.Value) {
injectTS := val.(int)
minResolveTS = uint64(injectTS)
})
if minResolveTS < minTS {
return int64(minTS), false, nil
} else if min <= minResolveTS && minResolveTS <= maxTS {
return int64(minResolveTS), false, nil
}
return int64(maxTS), false, nil
}
83 changes: 83 additions & 0 deletions expression/builtin_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@
package expression

import (
"fmt"
"math"
"strings"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -2862,6 +2865,86 @@ func (s *testEvaluatorSuite) TestTidbParseTso(c *C) {
}
}

func (s *testEvaluatorSuite) TestReadTSIn(c *C) {
const timeParserLayout = "2006-01-02 15:04:05.000"
t1, err := time.Parse(timeParserLayout, "2015-09-21 09:53:04.877")
c.Assert(err, IsNil)
t1Str := t1.Format(timeParserLayout)
ts1 := int64(oracle.ComposeTS(t1.UnixNano()/int64(time.Millisecond), 0))
t2 := time.Now().UTC()
t2Str := t2.Format(timeParserLayout)
ts2 := int64(oracle.ComposeTS(t2.UnixNano()/int64(time.Millisecond), 0))
s.ctx.GetSessionVars().TimeZone = time.UTC
tests := []struct {
leftTime interface{}
rightTime interface{}
injectResolveTS uint64
isNull bool
expect int64
}{
// ResolveTS is in the range.
{
leftTime: t1Str,
rightTime: t2Str,
injectResolveTS: func() uint64 {
phy := t2.Add(-1*time.Second).UnixNano() / int64(time.Millisecond)
return oracle.ComposeTS(phy, 0)
}(),
isNull: false,
expect: func() int64 {
phy := t2.Add(-1*time.Second).UnixNano() / int64(time.Millisecond)
return int64(oracle.ComposeTS(phy, 0))
}(),
},
// ResolveTS is less than the left time.
{
leftTime: t1Str,
rightTime: t2Str,
injectResolveTS: func() uint64 {
phy := t1.Add(-1*time.Second).UnixNano() / int64(time.Millisecond)
return oracle.ComposeTS(phy, 0)
}(),
isNull: false,
expect: ts1,
},
// ResolveTS is bigger than the right time.
{
leftTime: t1Str,
rightTime: t2Str,
injectResolveTS: func() uint64 {
phy := t2.Add(time.Second).UnixNano() / int64(time.Millisecond)
return oracle.ComposeTS(phy, 0)
}(),
isNull: false,
expect: ts2,
},
// Wrong time order.
{
leftTime: t2Str,
rightTime: t1Str,
injectResolveTS: 0,
isNull: true,
expect: 0,
},
}

fc := funcs[ast.ReadTSIn]
for _, test := range tests {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectResolveTS",
fmt.Sprintf("return(%v)", test.injectResolveTS)), IsNil)
f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(test.leftTime), types.NewDatum(test.rightTime)}))
c.Assert(err, IsNil)
d, err := evalBuiltinFunc(f, chunk.Row{})
c.Assert(err, IsNil)
if test.isNull {
c.Assert(d.IsNull(), IsTrue)
} else {
c.Assert(d.GetInt64(), Equals, test.expect)
}
failpoint.Disable("github.com/pingcap/tidb/expression/injectResolveTS")
}
}

func (s *testEvaluatorSuite) TestGetIntervalFromDecimal(c *C) {
du := baseDateArithmitical{}

Expand Down
78 changes: 78 additions & 0 deletions expression/builtin_time_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -854,6 +855,83 @@ func (b *builtinTidbParseTsoSig) vecEvalTime(input *chunk.Chunk, result *chunk.C
return nil
}

func (b *builtinReadTSInSig) vectorized() bool {
return true
}

func (b *builtinReadTSInSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error {
n := input.NumRows()
buf0, err := b.bufAllocator.get(types.ETDatetime, n)
if err != nil {
return err
}
defer b.bufAllocator.put(buf0)
if err = b.args[0].VecEvalTime(b.ctx, input, buf0); err != nil {
return err
}
buf1, err := b.bufAllocator.get(types.ETDatetime, n)
if err != nil {
return err
}
defer b.bufAllocator.put(buf1)
if err = b.args[1].VecEvalTime(b.ctx, input, buf1); err != nil {
return err
}
result.ResizeInt64(n, false)
result.MergeNulls(buf0, buf1)
args0 := buf0.Times()
args1 := buf1.Times()
i64s := result.Int64s()
for i := 0; i < n; i++ {
if result.IsNull(i) {
continue
}
if invalidArg0, invalidArg1 := args0[i].InvalidZero(), args1[i].InvalidZero(); invalidArg0 || invalidArg1 {
if invalidArg0 {
err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, args0[i].String()))
}
if invalidArg1 {
err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, args1[i].String()))
}
result.SetNull(i, true)
if err != nil {
return err
}
continue
}
minTime, err := args0[i].GoTime(getTimeZone(b.ctx))
if err != nil {
result.SetNull(i, true)
return err
}
maxTime, err := args1[i].GoTime(getTimeZone(b.ctx))
if err != nil {
result.SetNull(i, true)
return err
}
if minTime.After(maxTime) {
result.SetNull(i, true)
return handleInvalidTimeError(b.ctx, types.ErrWrongValue.FastGenByArgs("left time must be less then the right time"))
}
minTS, maxTS := oracle.ComposeTS(minTime.UnixNano()/int64(time.Millisecond), 0), oracle.ComposeTS(maxTime.UnixNano()/int64(time.Millisecond), 0)
var minResolveTS uint64
if store := b.ctx.GetStore(); store != nil {
minResolveTS = store.GetMinResolveTS(b.ctx.GetSessionVars().CheckAndGetTxnScope())
}
failpoint.Inject("injectResolveTS", func(val failpoint.Value) {
injectTS := val.(int)
minResolveTS = uint64(injectTS)
})
if minResolveTS < minTS {
i64s[i] = int64(minTS)
} else if min <= minResolveTS && minResolveTS <= maxTS {
i64s[i] = int64(minResolveTS)
}
i64s[i] = int64(maxTS)
}
return nil
}

func (b *builtinFromDaysSig) vectorized() bool {
return true
}
Expand Down
9 changes: 9 additions & 0 deletions expression/builtin_time_vec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,15 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{
geners: []dataGenerator{newRangeInt64Gener(0, math.MaxInt64)},
},
},
ast.ReadTSIn: {
// Because there is a chance that a time error will cause the test to fail,
// we cannot use the vectorized test framework to test builtinReadTSInSig.
// We test the builtinReadTSInSig in TestReadTSIn function.
// {
// retEvalType: types.ETInt,
// childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime},
// },
},
ast.LastDay: {
{retEvalType: types.ETDatetime, childrenTypes: []types.EvalType{types.ETDatetime}},
},
Expand Down
Loading