From 585f0c4590f8cd0552ec95865898c89aea675ecb Mon Sep 17 00:00:00 2001 From: XuPeng-SH Date: Thu, 5 Dec 2024 15:01:23 +0800 Subject: [PATCH] refactor for the future tn migration 2 (#20571) refactor for the future tn migration Approved by: @zhangxu19830126 --- pkg/common/moerr/error.go | 2 + pkg/common/moerr/error_no_ctx.go | 4 + pkg/vm/engine/tae/db/controller.go | 270 ++++++++++++++++++++++++ pkg/vm/engine/tae/db/db.go | 33 ++- pkg/vm/engine/tae/db/open.go | 14 +- pkg/vm/engine/tae/db/test/db_test.go | 18 ++ pkg/vm/engine/tae/txn/txnbase/txnmgr.go | 244 +++++++++++++++++++-- pkg/vm/engine/tae/txn/txnbase/types.go | 73 +++++++ 8 files changed, 633 insertions(+), 25 deletions(-) create mode 100644 pkg/vm/engine/tae/db/controller.go create mode 100644 pkg/vm/engine/tae/txn/txnbase/types.go diff --git a/pkg/common/moerr/error.go b/pkg/common/moerr/error.go index 38b41e3222781..7d945d68057d5 100644 --- a/pkg/common/moerr/error.go +++ b/pkg/common/moerr/error.go @@ -225,6 +225,7 @@ const ( ErrPrevCheckpointNotFinished uint16 = 20635 ErrCantDelGCChecker uint16 = 20636 ErrTxnUnknown uint16 = 20637 + ErrTxnControl uint16 = 20638 // Group 7: lock service // ErrDeadLockDetected lockservice has detected a deadlock and should abort the transaction if it receives this error @@ -468,6 +469,7 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{ ErrCantCompileForPrepare: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can not compile for prepare"}, ErrCantDelGCChecker: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can't delete gc checker"}, ErrTxnUnknown: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "txn commit status is unknown: %s"}, + ErrTxnControl: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "txn control error: %s"}, // Group 7: lock service ErrDeadLockDetected: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock detected"}, diff --git a/pkg/common/moerr/error_no_ctx.go b/pkg/common/moerr/error_no_ctx.go index 2ab51c92068c5..42685213b5061 100644 --- a/pkg/common/moerr/error_no_ctx.go +++ b/pkg/common/moerr/error_no_ctx.go @@ -334,6 +334,10 @@ func NewCantDelGCCheckerNoCtx() *Error { return newError(Context(), ErrCantDelGCChecker) } +func NewTxnControlErrorNoCtxf(format string, args ...any) *Error { + return newError(Context(), ErrTxnControl, fmt.Sprintf(format, args...)) +} + func NewNotFoundNoCtx() *Error { return newError(Context(), ErrNotFound) } diff --git a/pkg/vm/engine/tae/db/controller.go b/pkg/vm/engine/tae/db/controller.go new file mode 100644 index 0000000000000..533d34e51fcf8 --- /dev/null +++ b/pkg/vm/engine/tae/db/controller.go @@ -0,0 +1,270 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package db + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/google/uuid" + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm" + "go.uber.org/zap" +) + +type ControlCmdType uint32 + +const ( + ControlCmd_Noop ControlCmdType = iota + ControlCmd_ToReplayMode + ControlCmd_ToWriteMode +) + +func newControlCmd( + ctx context.Context, + typ ControlCmdType, + sarg string, +) *controlCmd { + cmd := &controlCmd{ + id: uuid.Must(uuid.NewV7()), + typ: typ, + ctx: ctx, + sarg: sarg, + } + cmd.wg.Add(1) + return cmd +} + +type controlCmd struct { + id uuid.UUID + typ ControlCmdType + err error + ctx context.Context + wg sync.WaitGroup + sarg string +} + +func (c *controlCmd) String() string { + return fmt.Sprintf("ControlCmd{id:%s, typ:%d, sarg:%s}", c.id, c.typ, c.sarg) +} + +func (c *controlCmd) setError(err error) { + c.err = err + c.wg.Done() +} + +func (c *controlCmd) waitDone() { + c.wg.Wait() +} + +type Controller struct { + queue sm.Queue + db *DB +} + +func NewController(db *DB) *Controller { + c := &Controller{ + db: db, + } + c.queue = sm.NewSafeQueue( + 1, 1, c.onCmd, + ) + return c +} + +func (c *Controller) onCmd(cmds ...any) { + for _, cmd := range cmds { + command := cmd.(*controlCmd) + switch command.typ { + case ControlCmd_ToReplayMode: + c.handleToReplayCmd(command) + case ControlCmd_ToWriteMode: + c.handleToWriteCmd(command) + default: + command.setError( + moerr.NewInternalErrorNoCtxf("unknown command type %d", command.typ), + ) + } + } +} + +func (c *Controller) handleToReplayCmd(cmd *controlCmd) { + switch c.db.GetTxnMode() { + case DBTxnMode_Replay: + cmd.setError(nil) + return + case DBTxnMode_Write: + default: + cmd.setError( + moerr.NewTxnControlErrorNoCtxf("bad db txn mode %d to replay", c.db.GetTxnMode()), + ) + } + // write mode -> replay mode switch steps: + // TODO: error handling + + var ( + err error + start time.Time = time.Now() + ) + + logger := logutil.Info + logger( + "DB-SwitchToReplay-Start", + zap.String("cmd", cmd.String()), + ) + + defer func() { + if err != nil { + logger = logutil.Error + } + logger( + "DB-SwitchToReplay-Done", + zap.String("cmd", cmd.String()), + zap.Duration("duration", time.Since(start)), + zap.Error(err), + ) + cmd.setError(err) + }() + + // 1. stop the merge scheduler + // TODO + + // 2. switch the checkpoint|diskcleaner to replay mode + // TODO + + // 3. build forward write request tunnel to the new write candidate + // TODO + + // 4. build logtail tunnel to the new write candidate + // TODO + + // 5. freeze the write requests consumer + // TODO + + // 6. switch the txn mode to readonly mode + if err = c.db.TxnMgr.SwitchToReadonly(cmd.ctx); err != nil { + c.db.TxnMgr.ToWriteMode() + // TODO: recover the previous state + return + } + + // 7. wait the logtail push queue to be flushed + // TODO + + // 8. send change-writer-config txn to the logservice + // TODO + + // 9. change the logtail push queue sourcer to the write candidate tunnel + // TODO + + // 10. forward the write requests to the new write candidate + // TODO + + // 11. replay the log entries from the logservice + // 11.1 switch the txn mode to replay mode + c.db.TxnMgr.ToReplayMode() + // 11.2 TODO: replay the log entries + + WithTxnMode(DBTxnMode_Replay)(c.db) +} + +func (c *Controller) handleToWriteCmd(cmd *controlCmd) { + switch c.db.GetTxnMode() { + case DBTxnMode_Write: + cmd.setError(nil) + return + case DBTxnMode_Replay: + default: + cmd.setError( + moerr.NewTxnControlErrorNoCtxf("bad db txn mode %d to write", c.db.GetTxnMode()), + ) + } + var ( + err error + start time.Time = time.Now() + ) + + logger := logutil.Info + logger( + "DB-SwitchToWrite-Start", + zap.String("cmd", cmd.String()), + ) + + defer func() { + if err != nil { + logger = logutil.Error + } + logger( + "DB-SwitchToWrite-Done", + zap.String("cmd", cmd.String()), + zap.Duration("duration", time.Since(start)), + zap.Error(err), + ) + cmd.setError(err) + }() + + // TODO: error handling + // replay mode -> write mode switch steps: + + // 1. it can only be changed after it receives the change-writer-config txn from the logservice + // TODO + + // 2. stop replaying the log entries + // TODO + + // 3. switch the txnmgr to write mode + c.db.TxnMgr.ToWriteMode() + + // 4. unfreeze the write requests + // TODO + + // 5. start merge scheduler|checkpoint|diskcleaner + // TODO + + WithTxnMode(DBTxnMode_Write)(c.db) +} + +func (c *Controller) Start() { + c.queue.Start() +} + +func (c *Controller) Stop() { + c.queue.Stop() +} + +func (c *Controller) SwitchTxnMode( + ctx context.Context, + iarg int, + sarg string, +) error { + var typ ControlCmdType + switch iarg { + case 1: + typ = ControlCmd_ToReplayMode + case 2: + typ = ControlCmd_ToWriteMode + default: + return moerr.NewTxnControlErrorNoCtxf("unknown txn mode switch iarg %d", iarg) + } + cmd := newControlCmd(ctx, typ, sarg) + if _, err := c.queue.Enqueue(cmd); err != nil { + return err + } + cmd.waitDone() + return cmd.err +} diff --git a/pkg/vm/engine/tae/db/db.go b/pkg/vm/engine/tae/db/db.go index dfa5862356ab7..edf2b180ad7f7 100644 --- a/pkg/vm/engine/tae/db/db.go +++ b/pkg/vm/engine/tae/db/db.go @@ -47,8 +47,26 @@ var ( ErrClosed = moerr.NewInternalErrorNoCtx("tae: closed") ) +type DBTxnMode uint32 + +const ( + DBTxnMode_Write DBTxnMode = iota + DBTxnMode_Replay +) + +type DBOption func(*DB) + +func WithTxnMode(mode DBTxnMode) DBOption { + return func(db *DB) { + db.TxnMode.Store(uint32(mode)) + } +} + type DB struct { - Dir string + Dir string + TxnMode atomic.Uint32 + Controller *Controller + Opts *options.Options usageMemo *logtail.TNUsageMemo @@ -75,6 +93,18 @@ type DB struct { Closed *atomic.Value } +func (db *DB) GetTxnMode() DBTxnMode { + return DBTxnMode(db.TxnMode.Load()) +} + +func (db *DB) SwitchTxnMode( + ctx context.Context, + iarg int, + sarg string, +) error { + return db.Controller.SwitchTxnMode(ctx, iarg, sarg) +} + func (db *DB) GetUsageMemo() *logtail.TNUsageMemo { return db.usageMemo } @@ -239,6 +269,7 @@ func (db *DB) Close() error { panic(err) } db.Closed.Store(ErrClosed) + db.Controller.Stop() db.GCManager.Stop() db.BGScanner.Stop() db.BGCheckpointRunner.Stop() diff --git a/pkg/vm/engine/tae/db/open.go b/pkg/vm/engine/tae/db/open.go index 3ad4eb740f7bb..49a55771f38f2 100644 --- a/pkg/vm/engine/tae/db/open.go +++ b/pkg/vm/engine/tae/db/open.go @@ -68,7 +68,12 @@ func fillRuntimeOptions(opts *options.Options) { } } -func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, err error) { +func Open( + ctx context.Context, + dirname string, + opts *options.Options, + dbOpts ...DBOption, +) (db *DB, err error) { dbLocker, err := createDBLock(dirname) logutil.Info( @@ -119,6 +124,10 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e Closed: new(atomic.Value), usageMemo: logtail.NewTNUsageMemo(nil), } + for _, opt := range dbOpts { + opt(db) + } + fs := objectio.NewObjectFS(opts.Fs, serviceDir) localFs := objectio.NewObjectFS(opts.LocalFs, serviceDir) transferTable, err := model.NewTransferTable[*model.TransferHashPage](ctx, opts.LocalFs) @@ -354,6 +363,9 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e db.GCManager.Start() + db.Controller = NewController(db) + db.Controller.Start() + go TaeMetricsTask(ctx) // For debug or test diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index ed387a1b17118..cf1a4bb10cca1 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -10404,3 +10404,21 @@ func TestDedup5(t *testing.T) { assert.Error(t, err) assert.NoError(t, insertTxn.Commit(ctx)) } + +func Test_BasicTxnModeSwitch(t *testing.T) { + ctx := context.Background() + opts := config.WithLongScanAndCKPOpts(nil) + tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer tae.Close() + + assert.Equal(t, db.DBTxnMode_Write, tae.GetTxnMode()) + err := tae.SwitchTxnMode(ctx, 1, "todo") + assert.NoError(t, err) + assert.Equal(t, db.DBTxnMode_Replay, tae.GetTxnMode()) + assert.True(t, tae.TxnMgr.IsRelayMode()) + + err = tae.SwitchTxnMode(ctx, 2, "todo") + assert.NoError(t, err) + assert.Equal(t, db.DBTxnMode_Write, tae.GetTxnMode()) + assert.True(t, tae.TxnMgr.IsWriteMode()) +} diff --git a/pkg/vm/engine/tae/txn/txnbase/txnmgr.go b/pkg/vm/engine/tae/txn/txnbase/txnmgr.go index 3dc6055b46f1d..36f2d7221d386 100644 --- a/pkg/vm/engine/tae/txn/txnbase/txnmgr.go +++ b/pkg/vm/engine/tae/txn/txnbase/txnmgr.go @@ -37,6 +37,43 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" ) +type TxnManagerOption func(*TxnManager) + +// WithTxnSkipFlag set the TxnSkipFlag +// 0 or TxnSkipFlag_None: skip nothing +// TxnFlag_Normal: skip normal txn +// TxnFlag_Replay|TxnFlag_Normal: skip normal and replay txn +// TxnFlag_Heartbeat|TxnFlag_Normal|TxnFlag_Replay or TxnSkipFlag_All: skip all txn +func WithTxnSkipFlag(flag TxnFlag) TxnManagerOption { + return func(m *TxnManager) { + prevFlag := TxnFlag(m.txns.skipFlags.Load()) + m.txns.skipFlags.Store(uint64(flag)) + logutil.Info( + "TxnManager-TxnSkipFlag-Change", + zap.String("prev", prevFlag.String()), + zap.String("current", flag.String()), + ) + } +} + +// Here define the write mode: +// TxnSkipFlag_None: skip nothing +func WithWriteMode(mgr *TxnManager) { + WithTxnSkipFlag(TxnSkipFlag_None)(mgr) +} + +// Here define the replay mode: +// TxnFlag_Normal|TxnFlag_Heartbeat: skip normal and heartbeat txn +func WithReplayMode(mgr *TxnManager) { + WithTxnSkipFlag(TxnFlag_Normal | TxnFlag_Heartbeat)(mgr) +} + +// Here define the readonly mode: +// TxnFlag_Normal|TxnFlag_Heartbeat|TxnFlag_Replay: skip all txn +func WithReadonlyMode(mgr *TxnManager) { + WithTxnSkipFlag(TxnFlag_Normal | TxnFlag_Heartbeat | TxnFlag_Replay)(mgr) +} + type TxnCommitListener interface { OnBeginPrePrepare(txnif.AsyncTxn) OnEndPrePrepare(txnif.AsyncTxn) @@ -86,7 +123,6 @@ type TxnManager struct { sm.ClosedState PreparingSM sm.StateMachine FlushQueue sm.Queue - IDMap *sync.Map IdAlloc *common.TxnIDAllocator MaxCommittedTS atomic.Pointer[types.TS] TxnStoreFactory TxnStoreFactory @@ -97,6 +133,22 @@ type TxnManager struct { heartbeatJob atomic.Pointer[tasks.CancelableJob] + txns struct { + // store all txns + store *sync.Map + + // wg is used to wait all txns to be done + wg sync.WaitGroup + + // TxnSkipFlag to skip some txn type + // 0: skip nothing + // TxnFlag_Normal: skip normal txn + // TxnFlag_Replay: skip replay txn + // TxnFlag_Heartbeat: skip heartbeat txn + // TxnFlag_Normal | TxnFlag_Replay: skip normal and replay txn + skipFlags atomic.Uint64 + } + ts struct { mu sync.Mutex allocator *types.TsAlloctor @@ -108,18 +160,27 @@ type TxnManager struct { prevPrepareTSInPrepareWAL types.TS } -func NewTxnManager(txnStoreFactory TxnStoreFactory, txnFactory TxnFactory, clock clock.Clock) *TxnManager { +func NewTxnManager( + txnStoreFactory TxnStoreFactory, + txnFactory TxnFactory, + clock clock.Clock, + opts ...TxnManagerOption, +) *TxnManager { if txnFactory == nil { txnFactory = DefaultTxnFactory } mgr := &TxnManager{ - IDMap: new(sync.Map), IdAlloc: common.NewTxnIDAllocator(), TxnStoreFactory: txnStoreFactory, TxnFactory: txnFactory, Exception: new(atomic.Value), CommitListener: newBatchCommitListener(), } + mgr.txns.store = new(sync.Map) + mgr.txns.wg = sync.WaitGroup{} + for _, opt := range opts { + opt(mgr) + } mgr.ts.allocator = types.NewTsAlloctor(clock) mgr.initMaxCommittedTS() pqueue := sm.NewSafeQueue(20000, 1000, mgr.dequeuePreparing) @@ -145,6 +206,59 @@ func (mgr *TxnManager) Now() types.TS { return mgr.ts.allocator.Alloc() } +func (mgr *TxnManager) ToWriteMode() { + WithWriteMode(mgr) + mgr.ResetHeartbeat() +} + +func (mgr *TxnManager) IsRelayMode() bool { + skipFlags := mgr.GetTxnSkipFlags() + if skipFlags&TxnFlag_Replay == 0 && skipFlags&TxnFlag_Normal != 0 && skipFlags&TxnFlag_Heartbeat != 0 { + return true + } + return false +} + +func (mgr *TxnManager) IsWriteMode() bool { + skipFlags := mgr.GetTxnSkipFlags() + return skipFlags == TxnSkipFlag_None +} + +// it is only safe to call this function in the readonly mode +func (mgr *TxnManager) ToReplayMode() { + WithReplayMode(mgr) +} + +func (mgr *TxnManager) SwitchToReadonly(ctx context.Context) (err error) { + now := time.Now() + defer func() { + logutil.Info( + "Wait-TxnManager-To-ReplayMode", + zap.Duration("duration", time.Since(now)), + ) + }() + + // 1. do not accept new txn + WithReadonlyMode(mgr) + + // 2. try to abort slow txn: big-tombstone-txn and merge-txn + mgr.txns.store.Range(func(key, value any) bool { + // TODO + return true + }) + + // 3. wait all txn to be done. + // Note: + // the heartbeats may be still running. The controller + // should wait all logtail to be flushed + err = mgr.WaitEmpty(ctx) + return +} + +func (mgr *TxnManager) GetTxnSkipFlags() TxnSkipFlag { + return TxnSkipFlag(mgr.txns.skipFlags.Load()) +} + func (mgr *TxnManager) Init(prevTs types.TS) error { logutil.Infof("init ts to %v", prevTs.ToString()) mgr.ts.allocator.SetStart(prevTs) @@ -154,7 +268,7 @@ func (mgr *TxnManager) Init(prevTs types.TS) error { // Note: Replay should always runs in a single thread func (mgr *TxnManager) OnReplayTxn(txn txnif.AsyncTxn) (err error) { - mgr.IDMap.Store(txn.GetID(), txn) + mgr.storeTxn(txn, TxnFlag_Replay) return } @@ -171,7 +285,7 @@ func (mgr *TxnManager) StartTxn(info []byte) (txn txnif.AsyncTxn, err error) { store := mgr.TxnStoreFactory() txn = mgr.TxnFactory(mgr, store, txnId, startTs, types.TS{}) store.BindTxn(txn) - mgr.IDMap.Store(util.UnsafeBytesToString(txnId), txn) + mgr.storeTxn(txn, TxnFlag_Normal) return } @@ -188,36 +302,119 @@ func (mgr *TxnManager) StartTxnWithStartTSAndSnapshotTS( txnId := mgr.IdAlloc.Alloc() txn = mgr.TxnFactory(mgr, store, txnId, startTS, snapshotTS) store.BindTxn(txn) - mgr.IDMap.Store(util.UnsafeBytesToString(txnId), txn) + err = mgr.storeTxn(txn, TxnFlag_Normal) + return +} + +func (mgr *TxnManager) WaitEmpty(ctx context.Context) (err error) { + c := make(chan struct{}) + go func() { + mgr.txns.wg.Wait() + close(c) + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-c: + return + } +} + +func (mgr *TxnManager) loadTxn( + id string, +) (txnif.AsyncTxn, bool) { + if res, ok := mgr.txns.store.Load(id); ok { + return res.(txnif.AsyncTxn), true + } + return nil, false +} + +func (mgr *TxnManager) loadAndDeleteTxn( + id string, +) (txnif.AsyncTxn, bool) { + if res, ok := mgr.txns.store.LoadAndDelete(id); ok { + mgr.txns.wg.Done() + return res.(txnif.AsyncTxn), true + } + return nil, false +} + +// flag: specify the txn type. only one bit is set +func (mgr *TxnManager) storeTxn( + newTxn txnif.AsyncTxn, flag TxnFlag, +) (err error) { + mgr.txns.wg.Add(1) + + skipFlags := TxnSkipFlag(mgr.txns.skipFlags.Load()) + if skipFlags.Skip(flag) { + mgr.txns.wg.Done() + return moerr.NewTxnControlErrorNoCtxf( + "%s Skip %s", + skipFlags.String(), + flag.String(), + ) + } + + mgr.txns.store.Store(newTxn.GetID(), newTxn) return } +// flag: specify the txn type. only one bit is set +func (mgr *TxnManager) loadOrStoreTxn( + newTxn txnif.AsyncTxn, flag TxnFlag, +) (txnif.AsyncTxn, bool, error) { + mgr.txns.wg.Add(1) + + skipFlags := TxnSkipFlag(mgr.txns.skipFlags.Load()) + if skipFlags.Skip(flag) { + mgr.txns.wg.Done() + return nil, false, moerr.NewTxnControlErrorNoCtxf( + "%s Skip %s", + skipFlags.String(), + flag.String(), + ) + } + + if actual, loaded := mgr.txns.store.LoadOrStore( + newTxn.GetID(), newTxn, + ); loaded { + mgr.txns.wg.Done() + return actual.(txnif.AsyncTxn), true, nil + } + return newTxn, false, nil +} + // GetOrCreateTxnWithMeta Get or create a txn initiated by CN func (mgr *TxnManager) GetOrCreateTxnWithMeta( - info []byte, - id []byte, - ts types.TS) (txn txnif.AsyncTxn, err error) { + info []byte, id []byte, ts types.TS, +) (txn txnif.AsyncTxn, err error) { if exp := mgr.Exception.Load(); exp != nil { err = exp.(error) logutil.Warnf("StartTxn: %v", err) return } - if value, ok := mgr.IDMap.Load(util.UnsafeBytesToString(id)); ok { - txn = value.(txnif.AsyncTxn) - } else { - store := mgr.TxnStoreFactory() - txn = mgr.TxnFactory(mgr, store, id, ts, ts) - store.BindTxn(txn) - mgr.IDMap.Store(util.UnsafeBytesToString(id), txn) + var ok bool + if txn, ok = mgr.loadTxn(util.UnsafeBytesToString(id)); ok { + return } + + store := mgr.TxnStoreFactory() + txn = mgr.TxnFactory(mgr, store, id, ts, ts) + store.BindTxn(txn) + txn, _, err = mgr.loadOrStoreTxn(txn, TxnFlag_Normal) return } func (mgr *TxnManager) DeleteTxn(id string) (err error) { - if _, ok := mgr.IDMap.LoadAndDelete(id); !ok { + if _, ok := mgr.loadAndDeleteTxn(id); !ok { err = moerr.NewTxnNotFoundNoCtx() - logutil.Warnf("Txn %s not found", id) - return + } + if err != nil { + logutil.Warn( + "DeleteTxnError", + zap.String("txn", id), + zap.Error(err), + ) } return } @@ -227,10 +424,11 @@ func (mgr *TxnManager) GetTxnByCtx(ctx []byte) txnif.AsyncTxn { } func (mgr *TxnManager) GetTxn(id string) txnif.AsyncTxn { - if res, ok := mgr.IDMap.Load(id); ok { - return res.(txnif.AsyncTxn) + res, ok := mgr.loadTxn(id) + if !ok || res == nil { + return nil } - return nil + return res } func (mgr *TxnManager) EnqueueFlushing(op any) (err error) { @@ -574,7 +772,7 @@ func (mgr *TxnManager) OnException(new error) { // files that have been gc will not be used. func (mgr *TxnManager) MinTSForTest() types.TS { minTS := types.MaxTs() - mgr.IDMap.Range(func(key, value any) bool { + mgr.txns.store.Range(func(key, value any) bool { txn := value.(txnif.AsyncTxn) startTS := txn.GetStartTS() if startTS.LT(&minTS) { diff --git a/pkg/vm/engine/tae/txn/txnbase/types.go b/pkg/vm/engine/tae/txn/txnbase/types.go new file mode 100644 index 0000000000000..99a15ab8ad372 --- /dev/null +++ b/pkg/vm/engine/tae/txn/txnbase/types.go @@ -0,0 +1,73 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package txnbase + +import ( + "bytes" +) + +type TxnFlag uint64 + +const ( + TxnFlag_Normal TxnFlag = 1 << iota + TxnFlag_Replay + TxnFlag_Heartbeat +) + +type TxnSkipFlag = TxnFlag + +const ( + TxnSkipFlag_Normal = TxnFlag_Normal + TxnSkipFlag_Replay = TxnFlag_Replay + TxnSkipFlag_Heartbeat = TxnFlag_Heartbeat +) + +const ( + TxnSkipFlag_None TxnFlag = 0 + TxnSkipFlag_All TxnFlag = TxnFlag_Normal | TxnFlag_Replay | TxnFlag_Heartbeat +) + +func (m TxnFlag) String() string { + var ( + first = true + w bytes.Buffer + ) + + w.WriteString("Flag[") + + if m&TxnFlag_Normal == TxnFlag_Normal { + w.WriteByte('N') + } + if m&TxnFlag_Replay == TxnFlag_Replay { + if first { + first = false + w.WriteByte('|') + } + w.WriteByte('R') + } + if m&TxnFlag_Heartbeat == TxnFlag_Heartbeat { + if first { + w.WriteByte('|') + } + w.WriteByte('H') + } + w.WriteByte(']') + return w.String() +} + +// flag should only contain one bit +func (m TxnSkipFlag) Skip(flag TxnFlag) bool { + return m&flag == flag +}