From 12e5b3173112d74a815bdf5739447a23f529f519 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Tue, 17 Sep 2024 15:20:03 +0800 Subject: [PATCH] lighting: replace lightning inner contexts with manged ones (#56038) ref pingcap/tidb#53388 --- lightning/pkg/importer/precheck_impl.go | 26 +- pkg/executor/import_into.go | 5 +- pkg/executor/importer/kv_encode.go | 2 - pkg/lightning/backend/kv/BUILD.bazel | 19 +- pkg/lightning/backend/kv/base.go | 12 +- pkg/lightning/backend/kv/context.go | 260 ++++++++++++ pkg/lightning/backend/kv/context_test.go | 396 ++++++++++++++++++ pkg/lightning/backend/kv/kv2sql.go | 8 +- pkg/lightning/backend/kv/kv2sql_test.go | 6 +- pkg/lightning/backend/kv/session.go | 92 ++-- .../backend/kv/session_internal_test.go | 49 +++ pkg/lightning/backend/kv/sql2kv.go | 5 - 12 files changed, 817 insertions(+), 63 deletions(-) create mode 100644 pkg/lightning/backend/kv/context.go create mode 100644 pkg/lightning/backend/kv/context_test.go diff --git a/lightning/pkg/importer/precheck_impl.go b/lightning/pkg/importer/precheck_impl.go index b0fc797eff2e7..90625a74556f3 100644 --- a/lightning/pkg/importer/precheck_impl.go +++ b/lightning/pkg/importer/precheck_impl.go @@ -1283,9 +1283,16 @@ outer: theResult.Severity = precheck.Warn if hasUniqueField && len(rows) > 1 { theResult.Severity = precheck.Critical - } else if !checkFieldCompatibility(tableInfo.Core, ignoreColsSet, rows[0], log.FromContext(ctx)) { - // if there are only 1 csv file or there is not unique key, try to check if all columns are compatible with string value - theResult.Severity = precheck.Critical + } else { + ok, err := checkFieldCompatibility(tableInfo.Core, ignoreColsSet, rows[0], log.FromContext(ctx)) + if err != nil { + return nil, err + } + + if !ok { + // if there are only 1 csv file or there is not unique key, try to check if all columns are compatible with string value + theResult.Severity = precheck.Critical + } } return theResult, nil } @@ -1295,10 +1302,14 @@ func checkFieldCompatibility( ignoreCols map[string]struct{}, values []types.Datum, logger log.Logger, -) bool { - se := kv.NewSession(&encode.SessionOptions{ +) (bool, error) { + se, err := kv.NewSession(&encode.SessionOptions{ SQLMode: mysql.ModeStrictTransTables, }, logger) + if err != nil { + return false, errors.Trace(err) + } + for i, col := range tbl.Columns { // do not check ignored columns if _, ok := ignoreCols[col.Name.L]; ok { @@ -1311,11 +1322,10 @@ func checkFieldCompatibility( if err != nil { logger.Error("field value is not consistent with column type", zap.String("value", values[i].GetString()), zap.Any("column_info", col), zap.Error(err)) - return false + return false, nil } } - - return true + return true, nil } type tableEmptyCheckItem struct { diff --git a/pkg/executor/import_into.go b/pkg/executor/import_into.go index e1b8f592cd27c..b82b15585ab4f 100644 --- a/pkg/executor/import_into.go +++ b/pkg/executor/import_into.go @@ -146,10 +146,13 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error) // For example, the function `tidb_is_ddl_owner()` requires the optional eval properties which are not // provided by the encoding context, so we should avoid using it in the column assignment expressions. func ValidateImportIntoColAssignmentsWithEncodeCtx(plan *importer.Plan, assigns []*ast.Assignment) error { - encodeCtx := litkv.NewSession(&encode.SessionOptions{ + encodeCtx, err := litkv.NewSession(&encode.SessionOptions{ SQLMode: plan.SQLMode, SysVars: plan.ImportantSysVars, }, log.L()) + if err != nil { + return err + } providedProps := encodeCtx.GetExprCtx().GetEvalCtx().GetOptionalPropSet() for i, assign := range assigns { diff --git a/pkg/executor/importer/kv_encode.go b/pkg/executor/importer/kv_encode.go index 49d57c1bfae9a..88d43c33409f8 100644 --- a/pkg/executor/importer/kv_encode.go +++ b/pkg/executor/importer/kv_encode.go @@ -62,8 +62,6 @@ func NewTableKVEncoder( if err != nil { return nil, err } - // we need a non-nil TxnCtx to avoid panic when evaluating set clause - baseKVEncoder.SessionCtx.SetTxnCtxNotNil() colAssignExprs, _, err := ti.CreateColAssignSimpleExprs(baseKVEncoder.SessionCtx.GetExprCtx()) if err != nil { return nil, err diff --git a/pkg/lightning/backend/kv/BUILD.bazel b/pkg/lightning/backend/kv/BUILD.bazel index b407e962252c9..7adcaae908e76 100644 --- a/pkg/lightning/backend/kv/BUILD.bazel +++ b/pkg/lightning/backend/kv/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "allocator.go", "base.go", + "context.go", "kv2sql.go", "session.go", "sql2kv.go", @@ -17,6 +18,7 @@ go_library( "//pkg/expression", "//pkg/expression/context", "//pkg/expression/contextsession", + "//pkg/expression/contextstatic", "//pkg/infoschema/context", "//pkg/kv", "//pkg/lightning/backend/encode", @@ -30,6 +32,7 @@ go_library( "//pkg/parser/mysql", "//pkg/planner/context", "//pkg/sessionctx", + "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/table", "//pkg/table/context", @@ -39,8 +42,11 @@ go_library( "//pkg/types", "//pkg/util/chunk", "//pkg/util/codec", + "//pkg/util/context", + "//pkg/util/intest", "//pkg/util/mathutil", "//pkg/util/redact", + "//pkg/util/timeutil", "//pkg/util/topsql/stmtstats", "@com_github_docker_go_units//:go-units", "@com_github_pingcap_errors//:errors", @@ -55,6 +61,7 @@ go_test( srcs = [ "allocator_test.go", "base_test.go", + "context_test.go", "kv2sql_test.go", "session_internal_test.go", "sql2kv_test.go", @@ -62,9 +69,12 @@ go_test( embed = [":kv"], flaky = True, race = "on", - shard_count = 18, + shard_count = 21, deps = [ "//pkg/ddl", + "//pkg/errctx", + "//pkg/expression/context", + "//pkg/expression/contextstatic", "//pkg/kv", "//pkg/lightning/backend/encode", "//pkg/lightning/common", @@ -77,13 +87,20 @@ go_test( "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/planner/core", + "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/table", + "//pkg/table/context", "//pkg/table/tables", "//pkg/tablecodec", "//pkg/types", + "//pkg/util/context", + "//pkg/util/deeptest", "//pkg/util/mock", + "//pkg/util/rowcodec", + "//pkg/util/timeutil", "@com_github_docker_go_units//:go-units", + "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", "@org_uber_go_zap//:zap", "@org_uber_go_zap//zapcore", diff --git a/pkg/lightning/backend/kv/base.go b/pkg/lightning/backend/kv/base.go index b2f6148066acc..743e821371e21 100644 --- a/pkg/lightning/backend/kv/base.go +++ b/pkg/lightning/backend/kv/base.go @@ -136,8 +136,10 @@ type BaseKVEncoder struct { func NewBaseKVEncoder(config *encode.EncodingConfig) (*BaseKVEncoder, error) { meta := config.Table.Meta() cols := config.Table.Cols() - se := NewSession(&config.SessionOptions, config.Logger) - // Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord + se, err := NewSession(&config.SessionOptions, config.Logger) + if err != nil { + return nil, err + } var autoRandomColID int64 autoIDFn := func(id int64) int64 { return id } @@ -287,12 +289,6 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu case isBadNullValue: err = col.HandleBadNull(errCtx, &value, 0) default: - // copy from the following GetColDefaultValue function, when this is true it will use getColDefaultExprValue - if col.DefaultIsExpr { - // the expression rewriter requires a non-nil TxnCtx. - deferFn := e.SessionCtx.SetTxnCtxNotNil() - defer deferFn() - } value, err = table.GetColDefaultValue(e.SessionCtx.GetExprCtx(), col.ToInfo()) } return value, err diff --git a/pkg/lightning/backend/kv/context.go b/pkg/lightning/backend/kv/context.go new file mode 100644 index 0000000000000..42f82a8f27157 --- /dev/null +++ b/pkg/lightning/backend/kv/context.go @@ -0,0 +1,260 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "maps" + "math/rand" + "sync" + "time" + + "github.com/pingcap/tidb/pkg/errctx" + exprctx "github.com/pingcap/tidb/pkg/expression/context" + "github.com/pingcap/tidb/pkg/expression/contextstatic" + "github.com/pingcap/tidb/pkg/meta/autoid" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/table" + tbctx "github.com/pingcap/tidb/pkg/table/context" + "github.com/pingcap/tidb/pkg/types" + contextutil "github.com/pingcap/tidb/pkg/util/context" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/timeutil" +) + +var _ exprctx.ExprContext = &litExprContext{} + +// litExprContext implements the `exprctx.ExprContext` interface for lightning import. +// It provides the context to build and evaluate expressions, furthermore, it allows to set user variables +// for `IMPORT INTO ...` statements. +type litExprContext struct { + *contextstatic.StaticExprContext + userVars *variable.UserVars +} + +// NewExpressionContext creates a new `*StaticExprContext` for lightning import. +func newLitExprContext(sqlMode mysql.SQLMode, sysVars map[string]string, timestamp int64) (*litExprContext, error) { + flags := types.DefaultStmtFlags. + WithTruncateAsWarning(!sqlMode.HasStrictMode()). + WithIgnoreInvalidDateErr(sqlMode.HasAllowInvalidDatesMode()). + WithIgnoreZeroInDate(!sqlMode.HasStrictMode() || sqlMode.HasAllowInvalidDatesMode() || + !sqlMode.HasNoZeroInDateMode() || !sqlMode.HasNoZeroDateMode()) + + errLevels := stmtctx.DefaultStmtErrLevels + errLevels[errctx.ErrGroupTruncate] = errctx.ResolveErrLevel(flags.IgnoreTruncateErr(), flags.TruncateAsWarning()) + errLevels[errctx.ErrGroupBadNull] = errctx.ResolveErrLevel(false, !sqlMode.HasStrictMode()) + errLevels[errctx.ErrGroupDividedByZero] = + errctx.ResolveErrLevel(!sqlMode.HasErrorForDivisionByZeroMode(), !sqlMode.HasStrictMode()) + + userVars := variable.NewUserVars() + evalCtx := contextstatic.NewStaticEvalContext( + contextstatic.WithSQLMode(sqlMode), + contextstatic.WithTypeFlags(flags), + contextstatic.WithLocation(timeutil.SystemLocation()), + contextstatic.WithErrLevelMap(errLevels), + contextstatic.WithUserVarsReader(userVars), + ) + + // no need to build as plan cache. + planCacheTracker := contextutil.NewPlanCacheTracker(contextutil.IgnoreWarn) + intest.Assert(!planCacheTracker.UseCache()) + ctx := contextstatic.NewStaticExprContext( + contextstatic.WithEvalCtx(evalCtx), + contextstatic.WithPlanCacheTracker(&planCacheTracker), + ) + + if len(sysVars) > 0 { + var err error + ctx, err = ctx.LoadSystemVars(sysVars) + if err != nil { + return nil, err + } + evalCtx = ctx.GetStaticEvalCtx() + } + + currentTime := func() (time.Time, error) { return time.Now(), nil } + if timestamp > 0 { + currentTime = func() (time.Time, error) { return time.Unix(timestamp, 0), nil } + } + + evalCtx = evalCtx.Apply(contextstatic.WithCurrentTime(currentTime)) + ctx = ctx.Apply(contextstatic.WithEvalCtx(evalCtx)) + + return &litExprContext{ + StaticExprContext: ctx, + userVars: userVars, + }, nil +} + +// setUserVarVal sets the value of a user variable. +func (ctx *litExprContext) setUserVarVal(name string, dt types.Datum) { + ctx.userVars.SetUserVarVal(name, dt) +} + +// UnsetUserVar unsets a user variable. +func (ctx *litExprContext) unsetUserVar(varName string) { + ctx.userVars.UnsetUserVar(varName) +} + +var _ table.MutateContext = &litTableMutateContext{} + +// litTableMutateContext implements the `table.MutateContext` interface for lightning import. +type litTableMutateContext struct { + exprCtx *litExprContext + encodingConfig tbctx.RowEncodingConfig + mutateBuffers *tbctx.MutateBuffers + shardID *variable.RowIDShardGenerator + reservedRowIDAlloc stmtctx.ReservedRowIDAlloc + enableMutationChecker bool + assertionLevel variable.AssertionLevel + tableDelta struct { + sync.Mutex + // tblID -> (colID -> deltaSize) + m map[int64]map[int64]int64 + } +} + +// AlternativeAllocators implements the `table.MutateContext` interface. +func (*litTableMutateContext) AlternativeAllocators(*model.TableInfo) (autoid.Allocators, bool) { + // lightning does not support temporary tables, so we don't need to provide alternative allocators. + return autoid.Allocators{}, false +} + +// GetExprCtx implements the `table.MutateContext` interface. +func (ctx *litTableMutateContext) GetExprCtx() exprctx.ExprContext { + return ctx.exprCtx +} + +// ConnectionID implements the `table.MutateContext` interface. +func (*litTableMutateContext) ConnectionID() uint64 { + // Just return 0 because lightning import does not in any connection. + return 0 +} + +// InRestrictedSQL implements the `table.MutateContext` interface. +func (*litTableMutateContext) InRestrictedSQL() bool { + // Just return false because lightning import does not in any SQL. + return false +} + +// TxnAssertionLevel implements the `table.MutateContext` interface. +func (ctx *litTableMutateContext) TxnAssertionLevel() variable.AssertionLevel { + return ctx.assertionLevel +} + +// EnableMutationChecker implements the `table.MutateContext` interface. +func (ctx *litTableMutateContext) EnableMutationChecker() bool { + return ctx.enableMutationChecker +} + +// GetRowEncodingConfig implements the `table.MutateContext` interface. +func (ctx *litTableMutateContext) GetRowEncodingConfig() tbctx.RowEncodingConfig { + return ctx.encodingConfig +} + +// GetMutateBuffers implements the `table.MutateContext` interface. +func (ctx *litTableMutateContext) GetMutateBuffers() *tbctx.MutateBuffers { + return ctx.mutateBuffers +} + +// GetRowIDShardGenerator implements the `table.MutateContext` interface. +func (ctx *litTableMutateContext) GetRowIDShardGenerator() *variable.RowIDShardGenerator { + return ctx.shardID +} + +// GetReservedRowIDAlloc implements the `table.MutateContext` interface. +func (ctx *litTableMutateContext) GetReservedRowIDAlloc() (*stmtctx.ReservedRowIDAlloc, bool) { + return &ctx.reservedRowIDAlloc, true +} + +// GetBinlogSupport implements the `table.MutateContext` interface. +func (*litTableMutateContext) GetBinlogSupport() (tbctx.BinlogSupport, bool) { + // lightning import does not support binlog. + return nil, false +} + +// GetStatisticsSupport implements the `table.MutateContext` interface. +func (ctx *litTableMutateContext) GetStatisticsSupport() (tbctx.StatisticsSupport, bool) { + return ctx, true +} + +// UpdatePhysicalTableDelta implements the `table.StatisticsSupport` interface. +func (ctx *litTableMutateContext) UpdatePhysicalTableDelta( + physicalTableID int64, _ int64, + _ int64, cols variable.DeltaCols, +) { + ctx.tableDelta.Lock() + defer ctx.tableDelta.Unlock() + if ctx.tableDelta.m == nil { + ctx.tableDelta.m = make(map[int64]map[int64]int64) + } + tableMap := ctx.tableDelta.m + colSize := tableMap[physicalTableID] + tableMap[physicalTableID] = cols.UpdateColSizeMap(colSize) +} + +// GetColumnSize returns the colum size map (colID -> deltaSize) for the given table ID. +func (ctx *litTableMutateContext) GetColumnSize(tblID int64) (ret map[int64]int64) { + ctx.tableDelta.Lock() + defer ctx.tableDelta.Unlock() + return maps.Clone(ctx.tableDelta.m[tblID]) +} + +// GetCachedTableSupport implements the `table.MutateContext` interface. +func (*litTableMutateContext) GetCachedTableSupport() (tbctx.CachedTableSupport, bool) { + // lightning import does not support cached table. + return nil, false +} + +func (*litTableMutateContext) GetTemporaryTableSupport() (tbctx.TemporaryTableSupport, bool) { + // lightning import does not support temporary table. + return nil, false +} + +func (*litTableMutateContext) GetExchangePartitionDMLSupport() (tbctx.ExchangePartitionDMLSupport, bool) { + // lightning import is not in a DML query, we do not need to support it. + return nil, false +} + +// newLitTableMutateContext creates a new `*litTableMutateContext` for lightning import. +func newLitTableMutateContext(exprCtx *litExprContext, sysVars map[string]string) (*litTableMutateContext, error) { + intest.AssertNotNil(exprCtx) + sessVars := variable.NewSessionVars(nil) + for k, v := range sysVars { + if err := sessVars.SetSystemVar(k, v); err != nil { + return nil, err + } + } + + return &litTableMutateContext{ + exprCtx: exprCtx, + encodingConfig: tbctx.RowEncodingConfig{ + IsRowLevelChecksumEnabled: sessVars.IsRowLevelChecksumEnabled(), + RowEncoder: &sessVars.RowEncoder, + }, + mutateBuffers: tbctx.NewMutateBuffers(sessVars.GetWriteStmtBufs()), + // Though the row ID is generated by lightning itself, and `GetRowIDShardGenerator` is useless, + // still return a valid object to make the context complete and avoid some potential panic + // if there are some changes in the future. + shardID: variable.NewRowIDShardGenerator( + rand.New(rand.NewSource(time.Now().UnixNano())), // #nosec G404 + int(sessVars.ShardAllocateStep), + ), + enableMutationChecker: sessVars.EnableMutationChecker, + assertionLevel: sessVars.AssertionLevel, + }, nil +} diff --git a/pkg/lightning/backend/kv/context_test.go b/pkg/lightning/backend/kv/context_test.go new file mode 100644 index 0000000000000..4f26782dc97db --- /dev/null +++ b/pkg/lightning/backend/kv/context_test.go @@ -0,0 +1,396 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "strconv" + "strings" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/errctx" + exprctx "github.com/pingcap/tidb/pkg/expression/context" + "github.com/pingcap/tidb/pkg/expression/contextstatic" + "github.com/pingcap/tidb/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/pkg/lightning/log" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + tbctx "github.com/pingcap/tidb/pkg/table/context" + "github.com/pingcap/tidb/pkg/types" + contextutil "github.com/pingcap/tidb/pkg/util/context" + "github.com/pingcap/tidb/pkg/util/deeptest" + "github.com/pingcap/tidb/pkg/util/rowcodec" + "github.com/pingcap/tidb/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func TestLitExprContext(t *testing.T) { + cases := []struct { + sqlMode mysql.SQLMode + sysVars map[string]string + timestamp int64 + checkFlags types.Flags + checkErrLevel errctx.LevelMap + check func(types.Flags, errctx.LevelMap) + }{ + { + sqlMode: mysql.ModeNone, + timestamp: 1234567, + checkFlags: types.DefaultStmtFlags | types.FlagTruncateAsWarning | types.FlagIgnoreZeroInDateErr, + checkErrLevel: func() errctx.LevelMap { + m := stmtctx.DefaultStmtErrLevels + m[errctx.ErrGroupTruncate] = errctx.LevelWarn + m[errctx.ErrGroupBadNull] = errctx.LevelWarn + m[errctx.ErrGroupDividedByZero] = errctx.LevelIgnore + return m + }(), + sysVars: map[string]string{ + "max_allowed_packet": "10240", + "div_precision_increment": "5", + "time_zone": "Europe/Berlin", + "default_week_format": "2", + "block_encryption_mode": "aes-128-ofb", + "group_concat_max_len": "2048", + }, + }, + { + sqlMode: mysql.ModeStrictTransTables | mysql.ModeNoZeroDate | mysql.ModeNoZeroInDate | + mysql.ModeErrorForDivisionByZero, + checkFlags: types.DefaultStmtFlags, + checkErrLevel: func() errctx.LevelMap { + m := stmtctx.DefaultStmtErrLevels + m[errctx.ErrGroupTruncate] = errctx.LevelError + m[errctx.ErrGroupBadNull] = errctx.LevelError + m[errctx.ErrGroupDividedByZero] = errctx.LevelError + return m + }(), + }, + { + sqlMode: mysql.ModeNoZeroDate | mysql.ModeNoZeroInDate | mysql.ModeErrorForDivisionByZero, + checkFlags: types.DefaultStmtFlags | types.FlagTruncateAsWarning | types.FlagIgnoreZeroInDateErr, + checkErrLevel: func() errctx.LevelMap { + m := stmtctx.DefaultStmtErrLevels + m[errctx.ErrGroupTruncate] = errctx.LevelWarn + m[errctx.ErrGroupBadNull] = errctx.LevelWarn + m[errctx.ErrGroupDividedByZero] = errctx.LevelWarn + return m + }(), + }, + { + sqlMode: mysql.ModeStrictTransTables | mysql.ModeNoZeroInDate, + checkFlags: types.DefaultStmtFlags | types.FlagIgnoreZeroInDateErr, + checkErrLevel: func() errctx.LevelMap { + m := stmtctx.DefaultStmtErrLevels + m[errctx.ErrGroupTruncate] = errctx.LevelError + m[errctx.ErrGroupBadNull] = errctx.LevelError + m[errctx.ErrGroupDividedByZero] = errctx.LevelIgnore + return m + }(), + }, + { + sqlMode: mysql.ModeStrictTransTables | mysql.ModeNoZeroDate, + checkFlags: types.DefaultStmtFlags | types.FlagIgnoreZeroInDateErr, + checkErrLevel: func() errctx.LevelMap { + m := stmtctx.DefaultStmtErrLevels + m[errctx.ErrGroupTruncate] = errctx.LevelError + m[errctx.ErrGroupBadNull] = errctx.LevelError + m[errctx.ErrGroupDividedByZero] = errctx.LevelIgnore + return m + }(), + }, + { + sqlMode: mysql.ModeStrictTransTables | mysql.ModeAllowInvalidDates, + checkFlags: types.DefaultStmtFlags | types.FlagIgnoreZeroInDateErr | types.FlagIgnoreInvalidDateErr, + checkErrLevel: func() errctx.LevelMap { + m := stmtctx.DefaultStmtErrLevels + m[errctx.ErrGroupTruncate] = errctx.LevelError + m[errctx.ErrGroupBadNull] = errctx.LevelError + m[errctx.ErrGroupDividedByZero] = errctx.LevelIgnore + return m + }(), + }, + } + + // We need to compare the new introduced `*litExprContext` the same behavior with the old `session`. + // After refactoring finished, we can remove the old session and this test. + compareWithLegacySession := func(t *testing.T, ctx *litExprContext, opts *encode.SessionOptions) { + if opts.SysVars == nil { + opts.SysVars = make(map[string]string) + } + if _, ok := opts.SysVars["div_precision_increment"]; !ok { + // It seems that `DefDivPrecisionIncrement` is not set as a default value in `newSession` and its + // default value is 0. + // We should set it manually to make test pass. + // The legacy code has no bug for this default value because the `DefaultImportantVariables` + // will be loaded every time to override this variable: + // https://github.com/pingcap/tidb/blob/2e457b394f09165e23fa5121fcfd89c6e8a6e835/pkg/lightning/common/common.go#L33-L42 + opts.SysVars["div_precision_increment"] = strconv.Itoa(variable.DefDivPrecisionIncrement) + } + if _, ok := opts.SysVars["block_encryption_mode"]; !ok { + // same reason with `DivPrecisionIncrement`, we need to set `block_encryption_mode` manually to pass test. + opts.SysVars["block_encryption_mode"] = variable.DefBlockEncryptionMode + } + se := newSession(opts, log.L()) + seCtx := contextstatic.MakeExprContextStatic(se.exprCtx.SessionExprContext) + deeptest.AssertDeepClonedEqual(t, seCtx, ctx.StaticExprContext, deeptest.WithIgnorePath([]string{ + "$.staticExprCtxState.evalCtx.id", + "$.staticExprCtxState.evalCtx.staticEvalCtxState.typeCtx.loc", + "$.staticExprCtxState.evalCtx.staticEvalCtxState.warnHandler", + "$.staticExprCtxState.evalCtx.staticEvalCtxState.typeCtx.warnHandler", + "$.staticExprCtxState.evalCtx.staticEvalCtxState.errCtx.warnHandler", + "$.staticExprCtxState.evalCtx.staticEvalCtxState.currentTime", + "$.staticExprCtxState.evalCtx.staticEvalCtxState.requestVerificationFn", + "$.staticExprCtxState.evalCtx.staticEvalCtxState.requestDynamicVerificationFn", + "$.staticExprCtxState.rng", + "$.staticExprCtxState.planCacheTracker", + })) + currentTime, err := seCtx.GetEvalCtx().CurrentTime() + require.NoError(t, err) + seTime, err := seCtx.GetEvalCtx().CurrentTime() + require.NoError(t, err) + if opts.Timestamp == 0 { + require.InDelta(t, seTime.Unix(), currentTime.Unix(), 2) + } else { + require.Equal(t, opts.Timestamp*1000000000, currentTime.UnixNano()) + require.Equal(t, seTime.UnixNano(), currentTime.UnixNano()) + } + require.Equal(t, seCtx.GetEvalCtx().Location().String(), ctx.GetEvalCtx().Location().String()) + } + + for i, c := range cases { + t.Run("case-"+strconv.Itoa(i), func(t *testing.T) { + ctx, err := newLitExprContext(c.sqlMode, c.sysVars, c.timestamp) + require.NoError(t, err) + + compareWithLegacySession(t, ctx, &encode.SessionOptions{ + SQLMode: c.sqlMode, + SysVars: c.sysVars, + Timestamp: c.timestamp, + }) + + evalCtx := ctx.GetEvalCtx() + require.Equal(t, c.sqlMode, evalCtx.SQLMode()) + tc, ec := evalCtx.TypeCtx(), evalCtx.ErrCtx() + require.Same(t, evalCtx.Location(), tc.Location()) + require.Equal(t, c.checkFlags, tc.Flags()) + require.Equal(t, c.checkErrLevel, ec.LevelMap()) + + // shares the same warning handler + warns := []contextutil.SQLWarn{ + {Level: contextutil.WarnLevelWarning, Err: errors.New("mockErr1")}, + {Level: contextutil.WarnLevelWarning, Err: errors.New("mockErr2")}, + {Level: contextutil.WarnLevelWarning, Err: errors.New("mockErr3")}, + } + require.Equal(t, 0, evalCtx.WarningCount()) + evalCtx.AppendWarning(warns[0].Err) + tc.AppendWarning(warns[1].Err) + ec.AppendWarning(warns[2].Err) + require.Equal(t, warns, evalCtx.CopyWarnings(nil)) + + // system vars + timeZone := "SYSTEM" + expectedMaxAllowedPacket := variable.DefMaxAllowedPacket + expectedDivPrecisionInc := variable.DefDivPrecisionIncrement + expectedDefaultWeekFormat := variable.DefDefaultWeekFormat + expectedBlockEncryptionMode := variable.DefBlockEncryptionMode + expectedGroupConcatMaxLen := variable.DefGroupConcatMaxLen + for k, v := range c.sysVars { + switch strings.ToLower(k) { + case "time_zone": + timeZone = v + case "max_allowed_packet": + expectedMaxAllowedPacket, err = strconv.ParseUint(v, 10, 64) + case "div_precision_increment": + expectedDivPrecisionInc, err = strconv.Atoi(v) + case "default_week_format": + expectedDefaultWeekFormat = v + case "block_encryption_mode": + expectedBlockEncryptionMode = v + case "group_concat_max_len": + expectedGroupConcatMaxLen, err = strconv.ParseUint(v, 10, 64) + } + require.NoError(t, err) + } + if strings.ToLower(timeZone) == "system" { + require.Same(t, timeutil.SystemLocation(), evalCtx.Location()) + } else { + require.Equal(t, timeZone, evalCtx.Location().String()) + } + require.Equal(t, expectedMaxAllowedPacket, evalCtx.GetMaxAllowedPacket()) + require.Equal(t, expectedDivPrecisionInc, evalCtx.GetDivPrecisionIncrement()) + require.Equal(t, expectedDefaultWeekFormat, evalCtx.GetDefaultWeekFormatMode()) + require.Equal(t, expectedBlockEncryptionMode, ctx.GetBlockEncryptionMode()) + require.Equal(t, expectedGroupConcatMaxLen, ctx.GetGroupConcatMaxLen()) + + now := time.Now() + tm, err := evalCtx.CurrentTime() + require.NoError(t, err) + require.Same(t, evalCtx.Location(), tm.Location()) + if c.timestamp == 0 { + // timestamp == 0 means use the current time. + require.InDelta(t, now.Unix(), tm.Unix(), 2) + } else { + require.Equal(t, c.timestamp*1000000000, tm.UnixNano()) + } + // CurrentTime returns the same value + tm2, err := evalCtx.CurrentTime() + require.NoError(t, err) + require.Equal(t, tm.Nanosecond(), tm2.Nanosecond()) + require.Same(t, tm.Location(), tm2.Location()) + + // currently we don't support optional properties + require.Equal(t, exprctx.OptionalEvalPropKeySet(0), evalCtx.GetOptionalPropSet()) + // not build for plan cache + require.False(t, ctx.IsUseCache()) + // rng not nil + require.NotNil(t, ctx.Rng()) + // ConnectionID + require.Equal(t, uint64(0), ctx.ConnectionID()) + // user vars + userVars := evalCtx.GetUserVarsReader() + _, ok := userVars.GetUserVarVal("a") + require.False(t, ok) + ctx.setUserVarVal("a", types.NewIntDatum(123)) + d, ok := userVars.GetUserVarVal("a") + require.True(t, ok) + require.Equal(t, types.NewIntDatum(123), d) + ctx.unsetUserVar("a") + _, ok = userVars.GetUserVarVal("a") + require.False(t, ok) + }) + } +} + +func TestLitTableMutateContext(t *testing.T) { + exprCtx, err := newLitExprContext(mysql.ModeNone, nil, 0) + require.NoError(t, err) + + checkCommon := func(t *testing.T, tblCtx *litTableMutateContext) { + require.Same(t, exprCtx, tblCtx.GetExprCtx()) + _, ok := tblCtx.AlternativeAllocators(&model.TableInfo{ID: 1}) + require.False(t, ok) + require.Equal(t, uint64(0), tblCtx.ConnectionID()) + require.Equal(t, tblCtx.GetExprCtx().ConnectionID(), tblCtx.ConnectionID()) + require.False(t, tblCtx.InRestrictedSQL()) + require.NotNil(t, tblCtx.GetMutateBuffers()) + require.NotNil(t, tblCtx.GetMutateBuffers().GetWriteStmtBufs()) + alloc, ok := tblCtx.GetReservedRowIDAlloc() + require.True(t, ok) + require.NotNil(t, alloc) + require.Equal(t, &stmtctx.ReservedRowIDAlloc{}, alloc) + require.True(t, alloc.Exhausted()) + _, ok = tblCtx.GetBinlogSupport() + require.False(t, ok) + _, ok = tblCtx.GetCachedTableSupport() + require.False(t, ok) + _, ok = tblCtx.GetTemporaryTableSupport() + require.False(t, ok) + stats, ok := tblCtx.GetStatisticsSupport() + require.True(t, ok) + // test for `UpdatePhysicalTableDelta` and `GetColumnSize` + stats.UpdatePhysicalTableDelta(123, 5, 2, variable.DeltaColsMap{1: 2, 3: 4}) + r := tblCtx.GetColumnSize(123) + require.Equal(t, map[int64]int64{1: 2, 3: 4}, r) + stats.UpdatePhysicalTableDelta(123, 8, 2, variable.DeltaColsMap{3: 5, 4: 3}) + r = tblCtx.GetColumnSize(123) + require.Equal(t, map[int64]int64{1: 2, 3: 9, 4: 3}, r) + // the result should be a cloned value + r[1] = 100 + require.Equal(t, map[int64]int64{1: 2, 3: 9, 4: 3}, tblCtx.GetColumnSize(123)) + // test gets a non-existed table + require.Empty(t, tblCtx.GetColumnSize(456)) + } + + // We need to compare the new introduced `*litTableMutateContext` the same behavior with the old `session`. + // After refactoring finished, we can remove the old session and this test. + compareWithLegacySession := func(ctx *litTableMutateContext, vars map[string]string) { + se := newSession(&encode.SessionOptions{ + SQLMode: mysql.ModeNone, + SysVars: vars, + }, log.L()) + // make sure GetRowIDShardGenerator() internal assertion pass + se.GetSessionVars().TxnCtx = &variable.TransactionContext{} + se.GetSessionVars().TxnCtx.StartTS = 123 + seCtx := se.GetTableCtx() + require.Equal(t, seCtx.ConnectionID(), ctx.ConnectionID()) + require.Equal(t, seCtx.InRestrictedSQL(), ctx.InRestrictedSQL()) + require.Equal(t, seCtx.TxnAssertionLevel(), ctx.TxnAssertionLevel()) + require.Equal(t, seCtx.GetMutateBuffers(), ctx.GetMutateBuffers()) + require.Equal(t, seCtx.EnableMutationChecker(), ctx.EnableMutationChecker()) + require.Equal(t, seCtx.GetRowEncodingConfig(), ctx.GetRowEncodingConfig()) + require.Equal(t, seCtx.GetRowIDShardGenerator().GetShardStep(), ctx.GetRowIDShardGenerator().GetShardStep()) + seAlloc, ok := seCtx.GetReservedRowIDAlloc() + require.True(t, ok) + alloc, ok := ctx.GetReservedRowIDAlloc() + require.True(t, ok) + require.Equal(t, seAlloc, alloc) + } + + // test for default + tblCtx, err := newLitTableMutateContext(exprCtx, nil) + require.NoError(t, err) + checkCommon(t, tblCtx) + require.Equal(t, variable.AssertionLevelOff, tblCtx.TxnAssertionLevel()) + require.Equal(t, variable.DefTiDBEnableMutationChecker, tblCtx.EnableMutationChecker()) + require.False(t, tblCtx.EnableMutationChecker()) + require.Equal(t, tbctx.RowEncodingConfig{ + IsRowLevelChecksumEnabled: false, + RowEncoder: &rowcodec.Encoder{Enable: false}, + }, tblCtx.GetRowEncodingConfig()) + g := tblCtx.GetRowIDShardGenerator() + require.NotNil(t, g) + require.Equal(t, variable.DefTiDBShardAllocateStep, g.GetShardStep()) + compareWithLegacySession(tblCtx, nil) + + // test for load vars + sysVars := map[string]string{ + "tidb_txn_assertion_level": "STRICT", + "tidb_enable_mutation_checker": "ON", + "tidb_row_format_version": "2", + "tidb_shard_allocate_step": "1234567", + } + tblCtx, err = newLitTableMutateContext(exprCtx, sysVars) + require.NoError(t, err) + checkCommon(t, tblCtx) + require.Equal(t, variable.AssertionLevelStrict, tblCtx.TxnAssertionLevel()) + require.True(t, tblCtx.EnableMutationChecker()) + require.Equal(t, tbctx.RowEncodingConfig{ + IsRowLevelChecksumEnabled: false, + RowEncoder: &rowcodec.Encoder{Enable: true}, + }, tblCtx.GetRowEncodingConfig()) + g = tblCtx.GetRowIDShardGenerator() + require.NotNil(t, g) + require.NotEqual(t, variable.DefTiDBShardAllocateStep, g.GetShardStep()) + require.Equal(t, 1234567, g.GetShardStep()) + compareWithLegacySession(tblCtx, sysVars) + + // test for `RowEncodingConfig.IsRowLevelChecksumEnabled` which should be loaded from global variable. + require.False(t, variable.EnableRowLevelChecksum.Load()) + defer variable.EnableRowLevelChecksum.Store(false) + variable.EnableRowLevelChecksum.Store(true) + sysVars = map[string]string{ + "tidb_row_format_version": "2", + } + tblCtx, err = newLitTableMutateContext(exprCtx, sysVars) + require.NoError(t, err) + require.Equal(t, tbctx.RowEncodingConfig{ + IsRowLevelChecksumEnabled: true, + RowEncoder: &rowcodec.Encoder{Enable: true}, + }, tblCtx.GetRowEncodingConfig()) + compareWithLegacySession(tblCtx, sysVars) +} diff --git a/pkg/lightning/backend/kv/kv2sql.go b/pkg/lightning/backend/kv/kv2sql.go index 502eea5cc2748..0a535166b1902 100644 --- a/pkg/lightning/backend/kv/kv2sql.go +++ b/pkg/lightning/backend/kv/kv2sql.go @@ -129,10 +129,12 @@ func NewTableKVDecoder( options *encode.SessionOptions, logger log.Logger, ) (*TableKVDecoder, error) { - se := NewSession(options, logger) - cols := tbl.Cols() - // Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord + se, err := NewSession(options, logger) + if err != nil { + return nil, err + } + cols := tbl.Cols() genCols, err := CollectGeneratedColumns(se, tbl.Meta(), cols) if err != nil { return nil, err diff --git a/pkg/lightning/backend/kv/kv2sql_test.go b/pkg/lightning/backend/kv/kv2sql_test.go index 9c5ca613d4c15..cf6d40341eb6a 100644 --- a/pkg/lightning/backend/kv/kv2sql_test.go +++ b/pkg/lightning/backend/kv/kv2sql_test.go @@ -52,7 +52,8 @@ func TestIterRawIndexKeysClusteredPK(t *testing.T) { decoder, err := kv.NewTableKVDecoder(tbl, "`test`.`c1`", sessionOpts, log.L()) require.NoError(t, err) - sctx := kv.NewSession(sessionOpts, log.L()) + sctx, err := kv.NewSession(sessionOpts, log.L()) + require.NoError(t, err) txn := sctx.Txn() handle, err := tbl.AddRecord(sctx.GetTableCtx(), txn, []types.Datum{types.NewIntDatum(1), types.NewIntDatum(2)}) require.NoError(t, err) @@ -92,7 +93,8 @@ func TestIterRawIndexKeysIntPK(t *testing.T) { decoder, err := kv.NewTableKVDecoder(tbl, "`test`.`c1`", sessionOpts, log.L()) require.NoError(t, err) - sctx := kv.NewSession(sessionOpts, log.L()) + sctx, err := kv.NewSession(sessionOpts, log.L()) + require.NoError(t, err) txn := sctx.Txn() require.NoError(t, err) handle, err := tbl.AddRecord(sctx.GetTableCtx(), txn, []types.Datum{types.NewIntDatum(1), types.NewIntDatum(2)}) diff --git a/pkg/lightning/backend/kv/session.go b/pkg/lightning/backend/kv/session.go index 100bd67a2d339..6d263f5661bc0 100644 --- a/pkg/lightning/backend/kv/session.go +++ b/pkg/lightning/backend/kv/session.go @@ -20,13 +20,11 @@ import ( "context" "errors" "fmt" - "maps" "strconv" "sync" "github.com/docker/go-units" "github.com/pingcap/tidb/pkg/errctx" - "github.com/pingcap/tidb/pkg/expression" exprctx "github.com/pingcap/tidb/pkg/expression/context" exprctximpl "github.com/pingcap/tidb/pkg/expression/contextsession" infoschema "github.com/pingcap/tidb/pkg/infoschema/context" @@ -287,7 +285,7 @@ func (*transaction) MayFlush() error { // sessExprContext implements the ExprContext interface // It embedded an `ExprContext` and a `sessEvalContext` to provide no optional properties. type sessExprContext struct { - exprctx.ExprContext + *exprctximpl.SessionExprContext evalCtx *sessEvalContext } @@ -312,6 +310,7 @@ func (*sessEvalContext) GetOptionalPropProvider(exprctx.OptionalEvalPropKey) (ex return nil, false } +// Deprecated: `session` will be removed soon. // session is a trimmed down Session type which only wraps our own trimmed-down // transaction type and provides the session variables to the TiDB library // optimized for Lightning. @@ -328,6 +327,7 @@ type session struct { values map[fmt.Stringer]any } +// Deprecated: this function will be removed soon. // newSession creates a new trimmed down Session matching the options. func newSession(options *encode.SessionOptions, logger log.Logger) *session { s := &session{ @@ -382,7 +382,7 @@ func newSession(options *encode.SessionOptions, logger log.Logger) *session { // The exprCtx should be an expression context providing no optional properties in `EvalContext`. // That is to make sure it only allows expressions that require basic context. s.exprCtx = &sessExprContext{ - ExprContext: exprCtx, + SessionExprContext: exprCtx, evalCtx: &sessEvalContext{ EvalContext: exprCtx.GetEvalCtx(), }, @@ -438,34 +438,75 @@ func (*session) GetStmtStats() *stmtstats.StatementStats { // Session is used to provide context for lightning. type Session struct { - sctx *session + txn transaction + exprCtx *litExprContext + tblCtx *litTableMutateContext } // NewSession creates a new Session. -func NewSession(options *encode.SessionOptions, logger log.Logger) *Session { - return &Session{ - sctx: newSession(options, logger), +func NewSession(options *encode.SessionOptions, logger log.Logger) (*Session, error) { + sysVars := make(map[string]string, len(options.SysVars)) + if options.SysVars != nil { + // This sessVars is only used to do validations. + sessVars := variable.NewSessionVars(nil) + // To keep compatible with the old versions, we should to skip errors caused by illegal system variables. + for k, v := range options.SysVars { + // since 6.3(current master) tidb checks whether we can set a system variable + // lc_time_names is a read-only variable for now, but might be implemented later, + // so we not remove it from defaultImportantVariables and check it in below way. + if sv := variable.GetSysVar(k); sv == nil { + logger.DPanic("unknown system var", zap.String("key", k)) + continue + } else if sv.ReadOnly { + logger.Debug("skip read-only variable", zap.String("key", k)) + continue + } + if err := sessVars.SetSystemVar(k, v); err != nil { + logger.DPanic("new session: failed to set system var", + log.ShortError(err), + zap.String("key", k)) + continue + } + sysVars[k] = v + } + } + + exprCtx, err := newLitExprContext(options.SQLMode, sysVars, options.Timestamp) + if err != nil { + return nil, err + } + + tblCtx, err := newLitTableMutateContext(exprCtx, sysVars) + if err != nil { + return nil, err } + + s := &Session{ + exprCtx: exprCtx, + tblCtx: tblCtx, + } + s.txn.kvPairs = &Pairs{} + return s, nil } // GetExprCtx returns the expression context -func (s *Session) GetExprCtx() expression.BuildContext { - return s.sctx.GetExprCtx() +func (s *Session) GetExprCtx() exprctx.ExprContext { + return s.exprCtx } // Txn returns the internal txn. func (s *Session) Txn() kv.Transaction { - return &s.sctx.txn + return &s.txn } // GetTableCtx returns the table MutateContext. func (s *Session) GetTableCtx() tbctx.MutateContext { - return s.sctx.tblctx + return s.tblCtx } // TakeKvPairs returns the current Pairs and resets the buffer. func (s *Session) TakeKvPairs() *Pairs { - memBuf := &s.sctx.txn.MemBuf + memBuf := &s.txn.MemBuf pairs := memBuf.kvPairs if pairs.BytesBuf != nil { pairs.MemBuf = memBuf @@ -475,39 +516,24 @@ func (s *Session) TakeKvPairs() *Pairs { return pairs } -// SetTxnCtxNotNil sets the internal SessionVars.TxnCtx to a non-nil value to avoid some panics. -// TODO: remove it after code refactoring. -func (s *Session) SetTxnCtxNotNil() func() { - s.sctx.Vars.TxnCtx = new(variable.TransactionContext) - return func() { - s.sctx.Vars.TxnCtx = nil - } -} - // SetUserVarVal sets the value of a user variable. func (s *Session) SetUserVarVal(name string, dt types.Datum) { - s.sctx.Vars.SetUserVarVal(name, dt) + s.exprCtx.setUserVarVal(name, dt) } // UnsetUserVar unsets a user variable. func (s *Session) UnsetUserVar(varName string) { - s.sctx.Vars.UnsetUserVar(varName) + s.exprCtx.unsetUserVar(varName) } // GetColumnSize returns the size of each column. func (s *Session) GetColumnSize(tblID int64) (ret map[int64]int64) { - vars := s.sctx.Vars - vars.TxnCtxMu.Lock() - defer vars.TxnCtxMu.Unlock() - if txnCtx := s.sctx.Vars.TxnCtx; txnCtx != nil { - return maps.Clone(txnCtx.TableDeltaMap[tblID].ColSize) - } - return ret + return s.tblCtx.GetColumnSize(tblID) } -// Close implements the sessionctx.Context interface +// Close closes the session func (s *Session) Close() { - memBuf := &s.sctx.txn.MemBuf + memBuf := &s.txn.MemBuf if memBuf.buf != nil { memBuf.buf.destroy() memBuf.buf = nil diff --git a/pkg/lightning/backend/kv/session_internal_test.go b/pkg/lightning/backend/kv/session_internal_test.go index d70f2827a7916..e5cd32377249d 100644 --- a/pkg/lightning/backend/kv/session_internal_test.go +++ b/pkg/lightning/backend/kv/session_internal_test.go @@ -18,6 +18,12 @@ import ( "testing" "github.com/docker/go-units" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/lightning/backend/encode" + "github.com/pingcap/tidb/pkg/lightning/common" + "github.com/pingcap/tidb/pkg/lightning/log" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -124,3 +130,46 @@ func TestKVMemBufBatchAllocAndRecycle(t *testing.T) { } require.Equal(t, maxAvailableBufSize, len(testKVMemBuf.availableBufs)) } + +func TestSessionInternalState(t *testing.T) { + se, err := NewSession(&encode.SessionOptions{ + SQLMode: mysql.ModeStrictAllTables, + SysVars: map[string]string{ + "max_allowed_packet": "40960", + "div_precision_increment": "9", + "time_zone": "SYSTEM", + // readonly variables should be allowed for compatibility + "lc_time_names": "en_US", + "default_week_format": "1", + "block_encryption_mode": "aes-256-ecb", + "group_concat_max_len": "2048", + "tidb_backoff_weight": "6", + "tidb_row_format_version": "2", + }, + Timestamp: 123456, + }, log.L()) + require.NoError(t, err) + // some system vars should be loaded + require.Equal(t, uint64(40960), se.GetExprCtx().GetEvalCtx().GetMaxAllowedPacket()) + require.Equal(t, 9, se.GetExprCtx().GetEvalCtx().GetDivPrecisionIncrement()) + require.Same(t, timeutil.SystemLocation(), se.GetExprCtx().GetEvalCtx().Location()) + require.Equal(t, "1", se.GetExprCtx().GetEvalCtx().GetDefaultWeekFormatMode()) + require.Equal(t, "aes-256-ecb", se.GetExprCtx().GetBlockEncryptionMode()) + require.Equal(t, uint64(2048), se.GetExprCtx().GetGroupConcatMaxLen()) + require.True(t, se.GetTableCtx().GetRowEncodingConfig().RowEncoder.Enable) + tm, err := se.GetExprCtx().GetEvalCtx().CurrentTime() + require.NoError(t, err) + require.Equal(t, int64(123456), tm.Unix()) + + // kv pairs + require.NoError(t, se.Txn().Set(kv.Key("k1"), []byte("v1"))) + require.NoError(t, se.Txn().Set(kv.Key("k2"), []byte("v2"))) + pairs := se.TakeKvPairs() + require.Equal(t, []common.KvPair{ + {Key: kv.Key("k1"), Val: []byte("v1")}, + {Key: kv.Key("k2"), Val: []byte("v2")}, + }, pairs.Pairs) + // internal contexts + require.NotNil(t, se.GetExprCtx()) + require.NotNil(t, se.GetTableCtx()) +} diff --git a/pkg/lightning/backend/kv/sql2kv.go b/pkg/lightning/backend/kv/sql2kv.go index e303d682398b7..2457db832ad81 100644 --- a/pkg/lightning/backend/kv/sql2kv.go +++ b/pkg/lightning/backend/kv/sql2kv.go @@ -81,11 +81,6 @@ func CollectGeneratedColumns(se *Session, meta *model.TableInfo, cols []*table.C return nil, nil } - // the expression rewriter requires a non-nil TxnCtx. - // TODO: remove it after code refactoring. - deferFn := se.SetTxnCtxNotNil() - defer deferFn() - // not using TableInfo2SchemaAndNames to avoid parsing all virtual generated columns again. exprColumns := make([]*expression.Column, 0, len(cols)) names := make(types.NameSlice, 0, len(cols))