Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: set txn options in txn provider which avoid data race #52304

Merged
merged 11 commits into from
Apr 8, 2024
2 changes: 1 addition & 1 deletion pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() {
if err != nil {
return nil, err
}
sessForJob.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
sessForJob.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
wk.sess = sess.NewSession(sessForJob)
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc()
return wk, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (d *ddl) addBatchDDLJobs(tasks []*limitJobTask) error {
injectModifyJobArgFailPoint(job)
}

se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)

if tasks[0].job.LocalMode {
for _, task := range tasks {
Expand Down Expand Up @@ -609,7 +609,7 @@ func cleanMDLInfo(pool *sess.Pool, jobID int64, ec *clientv3.Client) {
sctx, _ := pool.Get()
defer pool.Put(sctx)
se := sess.NewSession(sctx)
se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(context.Background(), sql, "delete-mdl-info")
if err != nil {
logutil.BgLogger().Warn("unexpected error when clean mdl info", zap.Int64("job ID", jobID), zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,10 +527,10 @@ func (sdr *sessionDelRangeExecWrapper) AppendParamsList(jobID, elemID int64, sta

func (sdr *sessionDelRangeExecWrapper) ConsumeDeleteRange(ctx context.Context, sql string) error {
// set session disk full opt
sdr.sctx.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
sdr.sctx.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := sdr.sctx.GetSQLExecutor().ExecuteInternal(ctx, sql, sdr.paramsList...)
// clear session disk full opt
sdr.sctx.ClearDiskFullOpt()
sdr.sctx.GetSessionVars().ClearDiskFullOpt()
sdr.paramsList = nil
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
}

func (*ddl) markJobProcessing(se *sess.Session, job *model.Job) error {
se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(context.Background(), fmt.Sprintf(
"update mysql.tidb_ddl_job set processing = 1 where job_id = %d", job.ID),
"mark_job_processing")
Expand Down Expand Up @@ -552,7 +552,7 @@ func insertDDLJobs2Table(se *sess.Session, updateRawArgs bool, jobs ...*model.Jo
}
fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", job.ID, job.MayNeedReorg(), strconv.Quote(job2SchemaIDs(job)), strconv.Quote(job2TableIDs(job)), util.WrapKey2String(b), job.Type, !job.NotStarted())
}
se.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
_, err := se.Execute(ctx, sql.String(), "insert_job")
logutil.BgLogger().Debug("add job to mysql.tidb_ddl_job table", zap.String("category", "ddl"), zap.String("sql", sql.String()))
Expand Down
3 changes: 0 additions & 3 deletions pkg/session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ go_library(
"//pkg/statistics/handle/usage",
"//pkg/statistics/handle/usage/indexusage",
"//pkg/store/driver/error",
"//pkg/store/driver/txn",
"//pkg/store/helper",
"//pkg/store/mockstore",
"//pkg/table",
Expand Down Expand Up @@ -110,7 +109,6 @@ go_library(
"//pkg/util/sqlescape",
"//pkg/util/sqlexec",
"//pkg/util/syncutil",
"//pkg/util/tableutil",
"//pkg/util/timeutil",
"//pkg/util/topsql",
"//pkg/util/topsql/state",
Expand All @@ -124,7 +122,6 @@ go_library(
"@com_github_pingcap_tipb//go-binlog",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
Expand Down
130 changes: 10 additions & 120 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle/usage"
"github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage"
storeerr "github.com/pingcap/tidb/pkg/store/driver/error"
"github.com/pingcap/tidb/pkg/store/driver/txn"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
tbctx "github.com/pingcap/tidb/pkg/table/context"
Expand All @@ -113,15 +112,12 @@ import (
"github.com/pingcap/tidb/pkg/util/sli"
"github.com/pingcap/tidb/pkg/util/sqlescape"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/tableutil"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/pingcap/tidb/pkg/util/topsql/stmtstats"
"github.com/pingcap/tidb/pkg/util/tracing"
"github.com/pingcap/tipb/go-binlog"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
tikvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
Expand Down Expand Up @@ -202,9 +198,6 @@ type session struct {
// indexUsageCollector collects index usage information.
idxUsageCollector *indexusage.SessionIndexUsageCollector

// allowed when tikv disk full happened.
diskFullOpt kvrpcpb.DiskFullOpt

// StmtStats is used to count various indicators of each SQL in this session
// at each point in time. These data will be periodically taken away by the
// background goroutine. The background goroutine will continue to aggregate
Expand Down Expand Up @@ -498,7 +491,7 @@ func (s *session) doCommit(ctx context.Context) error {
defer func() {
s.txn.changeToInvalid()
s.sessionVars.SetInTxn(false)
s.ClearDiskFullOpt()
s.sessionVars.ClearDiskFullOpt()
}()
// check if the transaction is read-only
if s.txn.IsReadOnly() {
Expand Down Expand Up @@ -529,93 +522,9 @@ func (s *session) doCommit(ctx context.Context) error {
}
})

if s.sessionVars.BinlogClient != nil {
prewriteValue := binloginfo.GetPrewriteValue(s, false)
if prewriteValue != nil {
prewriteData, err := prewriteValue.Marshal()
if err != nil {
return errors.Trace(err)
}
info := &binloginfo.BinlogInfo{
Data: &binlog.Binlog{
Tp: binlog.BinlogType_Prewrite,
PrewriteValue: prewriteData,
},
Client: s.sessionVars.BinlogClient,
}
s.txn.SetOption(kv.BinlogInfo, info)
}
}

sessVars := s.GetSessionVars()
// Get the related table or partition IDs.
relatedPhysicalTables := sessVars.TxnCtx.TableDeltaMap
// Get accessed temporary tables in the transaction.
temporaryTables := sessVars.TxnCtx.TemporaryTables
physicalTableIDs := make([]int64, 0, len(relatedPhysicalTables))
for id := range relatedPhysicalTables {
// Schema change on global temporary tables doesn't affect transactions.
if _, ok := temporaryTables[id]; ok {
continue
}
physicalTableIDs = append(physicalTableIDs, id)
}
needCheckSchema := true
// Set this option for 2 phase commit to validate schema lease.
if s.GetSessionVars().TxnCtx != nil {
needCheckSchema = !s.GetSessionVars().TxnCtx.EnableMDL
}
if s.txn.IsPipelined() && !s.GetSessionVars().TxnCtx.EnableMDL {
return errors.New("cannot commit pipelined transaction without Metadata Lock: MDL is OFF")
}

s.txn.SetOption(kv.CommitHook, func(info string, _ error) { s.sessionVars.LastTxnInfo = info })
s.txn.SetOption(kv.EnableAsyncCommit, sessVars.EnableAsyncCommit)
s.txn.SetOption(kv.Enable1PC, sessVars.Enable1PC)
// TODO: refactor SetOption usage to avoid race risk, should detect it in test.
// The pipelined txn will may be flushed in background, not touch the options to avoid races.
if !s.txn.IsPipelined() {
// to avoid session set overlap the txn set.
if s.GetDiskFullOpt() != kvrpcpb.DiskFullOpt_NotAllowedOnFull {
s.txn.SetDiskFullOpt(s.GetDiskFullOpt())
}
s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.GetInfoSchema().SchemaMetaVersion(), physicalTableIDs, needCheckSchema))
s.txn.SetOption(kv.InfoSchema, s.sessionVars.TxnCtx.InfoSchema)
s.txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger())
s.txn.SetOption(kv.ExplicitRequestSourceType, sessVars.ExplicitRequestSourceType)
if sessVars.StmtCtx.KvExecCounter != nil {
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
s.txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor())
}
// priority of the sysvar is lower than `start transaction with causal consistency only`
if val := s.txn.GetOption(kv.GuaranteeLinearizability); val == nil || val.(bool) {
// We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions
// because the property is naturally holds:
// We guarantee the commitTS of any transaction must not exceed the next timestamp from the TSO.
// An auto-commit transaction fetches its startTS from the TSO so its commitTS > its startTS > the commitTS
// of any previously committed transactions.
s.txn.SetOption(kv.GuaranteeLinearizability,
sessVars.TxnCtx.IsExplicit && sessVars.GuaranteeLinearizability)
}
if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 {
s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables))
}
}

var txnSource uint64
if val := s.txn.GetOption(kv.TxnSource); val != nil {
txnSource, _ = val.(uint64)
}
// If the transaction is started by CDC, we need to set the CDCWriteSource option.
if sessVars.CDCWriteSource != 0 {
err := kv.SetCDCWriteSource(&txnSource, sessVars.CDCWriteSource)
if err != nil {
return errors.Trace(err)
}

s.txn.SetOption(kv.TxnSource, txnSource)
}

var commitTSChecker func(uint64) bool
if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 {
c := cachedTableRenewLease{tables: tables}
now := time.Now()
Expand All @@ -625,7 +534,10 @@ func (s *session) doCommit(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
s.txn.SetOption(kv.CommitTSUpperBoundCheck, c.commitTSCheck)
commitTSChecker = c.commitTSCheck
}
if err = sessiontxn.GetTxnManager(s).SetOptionsBeforeCommit(s.txn.Transaction, commitTSChecker); err != nil {
return err
}

err = s.commitTxnWithTemporaryData(tikvutil.SetSessionID(ctx, sessVars.ConnectionID), &s.txn)
Expand Down Expand Up @@ -832,19 +744,6 @@ func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transac
return nil
}

type temporaryTableKVFilter map[int64]tableutil.TempTable

func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) (bool, error) {
tid := tablecodec.DecodeTableID(key)
if _, ok := m[tid]; ok {
return true, nil
}

// This is the default filter for all tables.
defaultFilter := txn.TiDBKVFilter{}
return defaultFilter.IsUnnecessaryKeyValue(key, value, flags)
}

// errIsNoisy is used to filter DUPLCATE KEY errors.
// These can observed by users in INFORMATION_SCHEMA.CLIENT_ERRORS_SUMMARY_GLOBAL instead.
//
Expand Down Expand Up @@ -1593,18 +1492,6 @@ func (s *session) getOomAlarmVariablesInfo() util.OOMAlarmVariablesInfo {
}
}

func (s *session) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) {
s.diskFullOpt = level
}

func (s *session) GetDiskFullOpt() kvrpcpb.DiskFullOpt {
return s.diskFullOpt
}

func (s *session) ClearDiskFullOpt() {
s.diskFullOpt = kvrpcpb.DiskFullOpt_NotAllowedOnFull
}

func (s *session) ExecuteInternal(ctx context.Context, sql string, args ...any) (rs sqlexec.RecordSet, err error) {
origin := s.sessionVars.InRestrictedSQL
s.sessionVars.InRestrictedSQL = true
Expand Down Expand Up @@ -2610,7 +2497,7 @@ func (s *session) Close() {
if s.stmtStats != nil {
s.stmtStats.SetFinished()
}
s.ClearDiskFullOpt()
s.sessionVars.ClearDiskFullOpt()
if s.sessionPlanCache != nil {
s.sessionPlanCache.Close()
}
Expand Down Expand Up @@ -4330,6 +4217,9 @@ func (s *session) DecodeSessionStates(ctx context.Context,
func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNode ast.StmtNode) {
if !s.isInternal() {
if txn, _ := s.Txn(false); txn != nil && txn.Valid() {
if txn.IsPipelined() {
stmtLabel = "pdml"
}
txn.SetOption(kv.RequestSourceType, stmtLabel)
}
s.sessionVars.RequestSourceType = stmtLabel
Expand Down
5 changes: 5 additions & 0 deletions pkg/session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,8 @@ func (m *txnManager) newProviderWithRequest(r *sessiontxn.EnterNewTxnRequest) (s
return nil, errors.Errorf("Invalid txn mode '%s'", txnMode)
}
}

// SetOptionsBeforeCommit sets options before commit.
func (m *txnManager) SetOptionsBeforeCommit(txn kv.Transaction, commitTSChecker func(uint64) bool) error {
return m.ctxProvider.SetOptionsBeforeCommit(txn, commitTSChecker)
}
1 change: 0 additions & 1 deletion pkg/session/types/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ go_library(
"//pkg/sessionctx/sessionstates",
"//pkg/util",
"//pkg/util/sqlexec",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
],
)
6 changes: 0 additions & 6 deletions pkg/session/types/sesson_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"crypto/tls"
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/extension"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -81,11 +80,6 @@ type Session interface {
FieldList(tableName string) (fields []*ast.ResultField, err error)
SetPort(port string)

// set cur session operations allowed when tikv disk full happens.
SetDiskFullOpt(level kvrpcpb.DiskFullOpt)
GetDiskFullOpt() kvrpcpb.DiskFullOpt
ClearDiskFullOpt()

// SetExtensions sets the `*extension.SessionExtensions` object
SetExtensions(extensions *extension.SessionExtensions)
}
1 change: 0 additions & 1 deletion pkg/sessionctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ go_library(
"//pkg/util/sqlexec",
"//pkg/util/topsql/stmtstats",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_tipb//go-binlog",
"@com_github_tikv_client_go_v2//oracle",
],
Expand Down
5 changes: 0 additions & 5 deletions pkg/sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/extension"
Expand Down Expand Up @@ -69,10 +68,6 @@ type Context interface {
SessionStatesHandler
contextutil.ValueStoreContext
tablelock.TableLockContext
// SetDiskFullOpt set the disk full opt when tikv disk full happened.
SetDiskFullOpt(level kvrpcpb.DiskFullOpt)
// ClearDiskFullOpt clear the disk full opt.
ClearDiskFullOpt()
// RollbackTxn rolls back the current transaction.
RollbackTxn(ctx context.Context)
// CommitTxn commits the current transaction.
Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/variable/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ go_library(
"//pkg/util/topsql/state",
"//pkg/util/versioninfo",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
Loading