Skip to content

Commit

Permalink
txnmgr refactor for tn migration (#20537)
Browse files Browse the repository at this point in the history
It can `ResetHeartbeat` and `StopHeartbeat` for the future `TN Migration`

Approved by: @jiangxinmeng1
  • Loading branch information
XuPeng-SH authored Dec 4, 2024
1 parent 2611646 commit b134e52
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 37 deletions.
8 changes: 8 additions & 0 deletions pkg/vm/engine/tae/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@ func (db *DB) AddFaultPoint(ctx context.Context, name string, freq string, actio
return fault.AddFaultPoint(ctx, name, freq, action, iarg, sarg)
}

func (db *DB) ResetTxnHeartbeat() {
db.TxnMgr.ResetHeartbeat()
}

func (db *DB) StopTxnHeartbeat() {
db.TxnMgr.StopHeartbeat()
}

func (db *DB) Close() error {
if err := db.Closed.Load(); err != nil {
panic(err)
Expand Down
43 changes: 33 additions & 10 deletions pkg/vm/engine/tae/tasks/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,6 @@ type JobResult struct {
Res any
}

var SerialJobScheduler = new(simpleJobScheduler)

type simpleJobScheduler struct{}

func (s *simpleJobScheduler) Stop() {}
func (s *simpleJobScheduler) Schedule(job *Job) (err error) {
job.Run()
return
}

type parallelJobScheduler struct {
pool *ants.Pool
}
Expand Down Expand Up @@ -176,3 +166,36 @@ func (job *Job) Init(
job.typ = typ
job.wg.Add(1)
}

type CancelableJob struct {
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
job func(context.Context)
onceStart sync.Once
onceStop sync.Once
}

func NewCancelableJob(job func(context.Context)) *CancelableJob {
ctl := new(CancelableJob)
ctl.job = job
ctl.ctx, ctl.cancel = context.WithCancel(context.Background())
return ctl
}

func (ctl *CancelableJob) Start() {
ctl.onceStart.Do(func() {
ctl.wg.Add(1)
go func() {
defer ctl.wg.Done()
ctl.job(ctl.ctx)
}()
})
}

func (ctl *CancelableJob) Stop() {
ctl.onceStop.Do(func() {
ctl.cancel()
ctl.wg.Wait()
})
}
85 changes: 58 additions & 27 deletions pkg/vm/engine/tae/txn/txnbase/txnmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks"
)

type TxnCommitListener interface {
Expand Down Expand Up @@ -92,11 +93,10 @@ type TxnManager struct {
TxnFactory TxnFactory
Exception *atomic.Value
CommitListener *batchTxnCommitListener
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
workers *ants.Pool

heartbeatJob atomic.Pointer[tasks.CancelableJob]

ts struct {
mu sync.Mutex
allocator *types.TsAlloctor
Expand All @@ -119,7 +119,6 @@ func NewTxnManager(txnStoreFactory TxnStoreFactory, txnFactory TxnFactory, clock
TxnFactory: txnFactory,
Exception: new(atomic.Value),
CommitListener: newBatchCommitListener(),
wg: sync.WaitGroup{},
}
mgr.ts.allocator = types.NewTsAlloctor(clock)
mgr.initMaxCommittedTS()
Expand All @@ -128,7 +127,6 @@ func NewTxnManager(txnStoreFactory TxnStoreFactory, txnFactory TxnFactory, clock
mgr.FlushQueue = sm.NewSafeQueue(20000, 1000, mgr.dequeuePrepared)
mgr.PreparingSM = sm.NewStateMachine(new(sync.WaitGroup), mgr, pqueue, prepareWALQueue)

mgr.ctx, mgr.cancel = context.WithCancel(context.Background())
mgr.workers, _ = ants.NewPool(runtime.GOMAXPROCS(0))
return mgr
}
Expand Down Expand Up @@ -240,24 +238,6 @@ func (mgr *TxnManager) EnqueueFlushing(op any) (err error) {
return
}

func (mgr *TxnManager) heartbeat(ctx context.Context) {
defer mgr.wg.Done()
heartbeatTicker := time.NewTicker(time.Millisecond * 2)
for {
select {
case <-mgr.ctx.Done():
return
case <-heartbeatTicker.C:
op := mgr.newHeartbeatOpTxn(ctx)
op.Txn.(*Txn).Add(1)
_, err := mgr.PreparingSM.EnqueueReceived(op)
if err != nil {
panic(err)
}
}
}
}

func (mgr *TxnManager) newHeartbeatOpTxn(ctx context.Context) *OpTxn {
if exp := mgr.Exception.Load(); exp != nil {
err := exp.(error)
Expand Down Expand Up @@ -605,16 +585,67 @@ func (mgr *TxnManager) MinTSForTest() types.TS {
return minTS
}

func (mgr *TxnManager) StopHeartbeat() {
old := mgr.heartbeatJob.Load()
if old == nil {
return
}
old.Stop()
for swapped := mgr.heartbeatJob.CompareAndSwap(old, nil); !swapped; {
if old = mgr.heartbeatJob.Load(); old != nil {
old.Stop()
}
}
}

func (mgr *TxnManager) ResetHeartbeat() {
old := mgr.heartbeatJob.Load()
if old != nil {
old.Stop()
}
newJob := tasks.NewCancelableJob(func(ctx context.Context) {
prevReportTime := time.Now()
ticker := time.NewTicker(time.Millisecond * 2)
defer ticker.Stop()
logutil.Info(
"TxnManager-HB-Start",
zap.Duration("interval", time.Millisecond*2),
)
for {
select {
case <-ctx.Done():
logutil.Info("TxnManager-HB-Exit")
return
case <-ticker.C:
op := mgr.newHeartbeatOpTxn(ctx)
op.Txn.(*Txn).Add(1)
if err := mgr.OnOpTxn(op); err != nil {
if time.Since(prevReportTime) > time.Second*10 {
logutil.Warn(
"TxnManager-HB-Error",
zap.Error(err),
)
}
}
}
}
})
for swapped := mgr.heartbeatJob.CompareAndSwap(old, newJob); !swapped; {
if old = mgr.heartbeatJob.Load(); old != nil {
old.Stop()
}
}
newJob.Start()
}

func (mgr *TxnManager) Start(ctx context.Context) {
mgr.FlushQueue.Start()
mgr.PreparingSM.Start()
mgr.wg.Add(1)
go mgr.heartbeat(ctx)
mgr.ResetHeartbeat()
}

func (mgr *TxnManager) Stop() {
mgr.cancel()
mgr.wg.Wait()
mgr.StopHeartbeat()
mgr.PreparingSM.Stop()
mgr.FlushQueue.Stop()
mgr.OnException(sm.ErrClose)
Expand Down
5 changes: 5 additions & 0 deletions pkg/vm/engine/test/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ func Test_ReaderCanReadRangesBlocksWithoutDeletes(t *testing.T) {
require.NoError(t, err)

disttaeEngine, taeEngine, rpcAgent, mp = testutil.CreateEngines(ctx, testutil.TestOptions{TaeEngineOptions: opt}, t)
hbMonkeyJob := testutil.MakeTxnHeartbeatMonkeyJob(
taeEngine, time.Millisecond*10,
)
hbMonkeyJob.Start()
defer func() {
hbMonkeyJob.Stop()
disttaeEngine.Close(ctx)
taeEngine.Close(true)
rpcAgent.Close()
Expand Down
25 changes: 25 additions & 0 deletions pkg/vm/engine/test/testutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/vm/engine/engine_util"
"golang.org/x/exp/rand"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/disttae"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks"
)

func GetDefaultTestPath(module string, t *testing.T) string {
Expand Down Expand Up @@ -443,3 +445,26 @@ func EndThisStatement(

return
}

func MakeTxnHeartbeatMonkeyJob(
e *TestTxnStorage,
opInterval time.Duration,
) *tasks.CancelableJob {
taeDB := e.GetDB()
return tasks.NewCancelableJob(func(ctx context.Context) {
ticker := time.NewTicker(opInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if v := rand.Intn(100); v > 50 {
taeDB.StopTxnHeartbeat()
} else {
taeDB.ResetTxnHeartbeat()
}
}
}
})
}

0 comments on commit b134e52

Please sign in to comment.