Skip to content

Commit

Permalink
lighting: replace lightning inner contexts with manged ones (#56038)
Browse files Browse the repository at this point in the history
ref #53388
  • Loading branch information
lcwangchao authored Sep 17, 2024
1 parent 26443da commit 12e5b31
Show file tree
Hide file tree
Showing 12 changed files with 817 additions and 63 deletions.
26 changes: 18 additions & 8 deletions lightning/pkg/importer/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,9 +1283,16 @@ outer:
theResult.Severity = precheck.Warn
if hasUniqueField && len(rows) > 1 {
theResult.Severity = precheck.Critical
} else if !checkFieldCompatibility(tableInfo.Core, ignoreColsSet, rows[0], log.FromContext(ctx)) {
// if there are only 1 csv file or there is not unique key, try to check if all columns are compatible with string value
theResult.Severity = precheck.Critical
} else {
ok, err := checkFieldCompatibility(tableInfo.Core, ignoreColsSet, rows[0], log.FromContext(ctx))
if err != nil {
return nil, err
}

if !ok {
// if there are only 1 csv file or there is not unique key, try to check if all columns are compatible with string value
theResult.Severity = precheck.Critical
}
}
return theResult, nil
}
Expand All @@ -1295,10 +1302,14 @@ func checkFieldCompatibility(
ignoreCols map[string]struct{},
values []types.Datum,
logger log.Logger,
) bool {
se := kv.NewSession(&encode.SessionOptions{
) (bool, error) {
se, err := kv.NewSession(&encode.SessionOptions{
SQLMode: mysql.ModeStrictTransTables,
}, logger)
if err != nil {
return false, errors.Trace(err)
}

for i, col := range tbl.Columns {
// do not check ignored columns
if _, ok := ignoreCols[col.Name.L]; ok {
Expand All @@ -1311,11 +1322,10 @@ func checkFieldCompatibility(
if err != nil {
logger.Error("field value is not consistent with column type", zap.String("value", values[i].GetString()),
zap.Any("column_info", col), zap.Error(err))
return false
return false, nil
}
}

return true
return true, nil
}

type tableEmptyCheckItem struct {
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/import_into.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,13 @@ func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error)
// For example, the function `tidb_is_ddl_owner()` requires the optional eval properties which are not
// provided by the encoding context, so we should avoid using it in the column assignment expressions.
func ValidateImportIntoColAssignmentsWithEncodeCtx(plan *importer.Plan, assigns []*ast.Assignment) error {
encodeCtx := litkv.NewSession(&encode.SessionOptions{
encodeCtx, err := litkv.NewSession(&encode.SessionOptions{
SQLMode: plan.SQLMode,
SysVars: plan.ImportantSysVars,
}, log.L())
if err != nil {
return err
}

providedProps := encodeCtx.GetExprCtx().GetEvalCtx().GetOptionalPropSet()
for i, assign := range assigns {
Expand Down
2 changes: 0 additions & 2 deletions pkg/executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ func NewTableKVEncoder(
if err != nil {
return nil, err
}
// we need a non-nil TxnCtx to avoid panic when evaluating set clause
baseKVEncoder.SessionCtx.SetTxnCtxNotNil()
colAssignExprs, _, err := ti.CreateColAssignSimpleExprs(baseKVEncoder.SessionCtx.GetExprCtx())
if err != nil {
return nil, err
Expand Down
19 changes: 18 additions & 1 deletion pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"allocator.go",
"base.go",
"context.go",
"kv2sql.go",
"session.go",
"sql2kv.go",
Expand All @@ -17,6 +18,7 @@ go_library(
"//pkg/expression",
"//pkg/expression/context",
"//pkg/expression/contextsession",
"//pkg/expression/contextstatic",
"//pkg/infoschema/context",
"//pkg/kv",
"//pkg/lightning/backend/encode",
Expand All @@ -30,6 +32,7 @@ go_library(
"//pkg/parser/mysql",
"//pkg/planner/context",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/table",
"//pkg/table/context",
Expand All @@ -39,8 +42,11 @@ go_library(
"//pkg/types",
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/context",
"//pkg/util/intest",
"//pkg/util/mathutil",
"//pkg/util/redact",
"//pkg/util/timeutil",
"//pkg/util/topsql/stmtstats",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
Expand All @@ -55,16 +61,20 @@ go_test(
srcs = [
"allocator_test.go",
"base_test.go",
"context_test.go",
"kv2sql_test.go",
"session_internal_test.go",
"sql2kv_test.go",
],
embed = [":kv"],
flaky = True,
race = "on",
shard_count = 18,
shard_count = 21,
deps = [
"//pkg/ddl",
"//pkg/errctx",
"//pkg/expression/context",
"//pkg/expression/contextstatic",
"//pkg/kv",
"//pkg/lightning/backend/encode",
"//pkg/lightning/common",
Expand All @@ -77,13 +87,20 @@ go_test(
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/planner/core",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/table",
"//pkg/table/context",
"//pkg/table/tables",
"//pkg/tablecodec",
"//pkg/types",
"//pkg/util/context",
"//pkg/util/deeptest",
"//pkg/util/mock",
"//pkg/util/rowcodec",
"//pkg/util/timeutil",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
Expand Down
12 changes: 4 additions & 8 deletions pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ type BaseKVEncoder struct {
func NewBaseKVEncoder(config *encode.EncodingConfig) (*BaseKVEncoder, error) {
meta := config.Table.Meta()
cols := config.Table.Cols()
se := NewSession(&config.SessionOptions, config.Logger)
// Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord
se, err := NewSession(&config.SessionOptions, config.Logger)
if err != nil {
return nil, err
}

var autoRandomColID int64
autoIDFn := func(id int64) int64 { return id }
Expand Down Expand Up @@ -287,12 +289,6 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu
case isBadNullValue:
err = col.HandleBadNull(errCtx, &value, 0)
default:
// copy from the following GetColDefaultValue function, when this is true it will use getColDefaultExprValue
if col.DefaultIsExpr {
// the expression rewriter requires a non-nil TxnCtx.
deferFn := e.SessionCtx.SetTxnCtxNotNil()
defer deferFn()
}
value, err = table.GetColDefaultValue(e.SessionCtx.GetExprCtx(), col.ToInfo())
}
return value, err
Expand Down
Loading

0 comments on commit 12e5b31

Please sign in to comment.