Skip to content

Commit

Permalink
refactor for the future tn migration 2 (#20571)
Browse files Browse the repository at this point in the history
refactor for the future tn migration

Approved by: @zhangxu19830126
  • Loading branch information
XuPeng-SH authored Dec 5, 2024
1 parent 065e02a commit 585f0c4
Show file tree
Hide file tree
Showing 8 changed files with 633 additions and 25 deletions.
2 changes: 2 additions & 0 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ const (
ErrPrevCheckpointNotFinished uint16 = 20635
ErrCantDelGCChecker uint16 = 20636
ErrTxnUnknown uint16 = 20637
ErrTxnControl uint16 = 20638

// Group 7: lock service
// ErrDeadLockDetected lockservice has detected a deadlock and should abort the transaction if it receives this error
Expand Down Expand Up @@ -468,6 +469,7 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
ErrCantCompileForPrepare: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can not compile for prepare"},
ErrCantDelGCChecker: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can't delete gc checker"},
ErrTxnUnknown: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "txn commit status is unknown: %s"},
ErrTxnControl: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "txn control error: %s"},

// Group 7: lock service
ErrDeadLockDetected: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock detected"},
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/moerr/error_no_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ func NewCantDelGCCheckerNoCtx() *Error {
return newError(Context(), ErrCantDelGCChecker)
}

func NewTxnControlErrorNoCtxf(format string, args ...any) *Error {
return newError(Context(), ErrTxnControl, fmt.Sprintf(format, args...))
}

func NewNotFoundNoCtx() *Error {
return newError(Context(), ErrNotFound)
}
Expand Down
270 changes: 270 additions & 0 deletions pkg/vm/engine/tae/db/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package db

import (
"context"
"fmt"
"sync"
"time"

"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm"
"go.uber.org/zap"
)

type ControlCmdType uint32

const (
ControlCmd_Noop ControlCmdType = iota
ControlCmd_ToReplayMode
ControlCmd_ToWriteMode
)

func newControlCmd(
ctx context.Context,
typ ControlCmdType,
sarg string,
) *controlCmd {
cmd := &controlCmd{
id: uuid.Must(uuid.NewV7()),
typ: typ,
ctx: ctx,
sarg: sarg,
}
cmd.wg.Add(1)
return cmd
}

type controlCmd struct {
id uuid.UUID
typ ControlCmdType
err error
ctx context.Context
wg sync.WaitGroup
sarg string
}

func (c *controlCmd) String() string {
return fmt.Sprintf("ControlCmd{id:%s, typ:%d, sarg:%s}", c.id, c.typ, c.sarg)
}

func (c *controlCmd) setError(err error) {
c.err = err
c.wg.Done()
}

func (c *controlCmd) waitDone() {
c.wg.Wait()
}

type Controller struct {
queue sm.Queue
db *DB
}

func NewController(db *DB) *Controller {
c := &Controller{
db: db,
}
c.queue = sm.NewSafeQueue(
1, 1, c.onCmd,
)
return c
}

func (c *Controller) onCmd(cmds ...any) {
for _, cmd := range cmds {
command := cmd.(*controlCmd)
switch command.typ {
case ControlCmd_ToReplayMode:
c.handleToReplayCmd(command)
case ControlCmd_ToWriteMode:
c.handleToWriteCmd(command)
default:
command.setError(
moerr.NewInternalErrorNoCtxf("unknown command type %d", command.typ),
)
}
}
}

func (c *Controller) handleToReplayCmd(cmd *controlCmd) {
switch c.db.GetTxnMode() {
case DBTxnMode_Replay:
cmd.setError(nil)
return
case DBTxnMode_Write:
default:
cmd.setError(
moerr.NewTxnControlErrorNoCtxf("bad db txn mode %d to replay", c.db.GetTxnMode()),
)
}
// write mode -> replay mode switch steps:
// TODO: error handling

var (
err error
start time.Time = time.Now()
)

logger := logutil.Info
logger(
"DB-SwitchToReplay-Start",
zap.String("cmd", cmd.String()),
)

defer func() {
if err != nil {
logger = logutil.Error
}
logger(
"DB-SwitchToReplay-Done",
zap.String("cmd", cmd.String()),
zap.Duration("duration", time.Since(start)),
zap.Error(err),
)
cmd.setError(err)
}()

// 1. stop the merge scheduler
// TODO

// 2. switch the checkpoint|diskcleaner to replay mode
// TODO

// 3. build forward write request tunnel to the new write candidate
// TODO

// 4. build logtail tunnel to the new write candidate
// TODO

// 5. freeze the write requests consumer
// TODO

// 6. switch the txn mode to readonly mode
if err = c.db.TxnMgr.SwitchToReadonly(cmd.ctx); err != nil {
c.db.TxnMgr.ToWriteMode()
// TODO: recover the previous state
return
}

// 7. wait the logtail push queue to be flushed
// TODO

// 8. send change-writer-config txn to the logservice
// TODO

// 9. change the logtail push queue sourcer to the write candidate tunnel
// TODO

// 10. forward the write requests to the new write candidate
// TODO

// 11. replay the log entries from the logservice
// 11.1 switch the txn mode to replay mode
c.db.TxnMgr.ToReplayMode()
// 11.2 TODO: replay the log entries

WithTxnMode(DBTxnMode_Replay)(c.db)
}

func (c *Controller) handleToWriteCmd(cmd *controlCmd) {
switch c.db.GetTxnMode() {
case DBTxnMode_Write:
cmd.setError(nil)
return
case DBTxnMode_Replay:
default:
cmd.setError(
moerr.NewTxnControlErrorNoCtxf("bad db txn mode %d to write", c.db.GetTxnMode()),
)
}
var (
err error
start time.Time = time.Now()
)

logger := logutil.Info
logger(
"DB-SwitchToWrite-Start",
zap.String("cmd", cmd.String()),
)

defer func() {
if err != nil {
logger = logutil.Error
}
logger(
"DB-SwitchToWrite-Done",
zap.String("cmd", cmd.String()),
zap.Duration("duration", time.Since(start)),
zap.Error(err),
)
cmd.setError(err)
}()

// TODO: error handling
// replay mode -> write mode switch steps:

// 1. it can only be changed after it receives the change-writer-config txn from the logservice
// TODO

// 2. stop replaying the log entries
// TODO

// 3. switch the txnmgr to write mode
c.db.TxnMgr.ToWriteMode()

// 4. unfreeze the write requests
// TODO

// 5. start merge scheduler|checkpoint|diskcleaner
// TODO

WithTxnMode(DBTxnMode_Write)(c.db)
}

func (c *Controller) Start() {
c.queue.Start()
}

func (c *Controller) Stop() {
c.queue.Stop()
}

func (c *Controller) SwitchTxnMode(
ctx context.Context,
iarg int,
sarg string,
) error {
var typ ControlCmdType
switch iarg {
case 1:
typ = ControlCmd_ToReplayMode
case 2:
typ = ControlCmd_ToWriteMode
default:
return moerr.NewTxnControlErrorNoCtxf("unknown txn mode switch iarg %d", iarg)
}
cmd := newControlCmd(ctx, typ, sarg)
if _, err := c.queue.Enqueue(cmd); err != nil {
return err
}
cmd.waitDone()
return cmd.err
}
33 changes: 32 additions & 1 deletion pkg/vm/engine/tae/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,26 @@ var (
ErrClosed = moerr.NewInternalErrorNoCtx("tae: closed")
)

type DBTxnMode uint32

const (
DBTxnMode_Write DBTxnMode = iota
DBTxnMode_Replay
)

type DBOption func(*DB)

func WithTxnMode(mode DBTxnMode) DBOption {
return func(db *DB) {
db.TxnMode.Store(uint32(mode))
}
}

type DB struct {
Dir string
Dir string
TxnMode atomic.Uint32
Controller *Controller

Opts *options.Options

usageMemo *logtail.TNUsageMemo
Expand All @@ -75,6 +93,18 @@ type DB struct {
Closed *atomic.Value
}

func (db *DB) GetTxnMode() DBTxnMode {
return DBTxnMode(db.TxnMode.Load())
}

func (db *DB) SwitchTxnMode(
ctx context.Context,
iarg int,
sarg string,
) error {
return db.Controller.SwitchTxnMode(ctx, iarg, sarg)
}

func (db *DB) GetUsageMemo() *logtail.TNUsageMemo {
return db.usageMemo
}
Expand Down Expand Up @@ -239,6 +269,7 @@ func (db *DB) Close() error {
panic(err)
}
db.Closed.Store(ErrClosed)
db.Controller.Stop()
db.GCManager.Stop()
db.BGScanner.Stop()
db.BGCheckpointRunner.Stop()
Expand Down
14 changes: 13 additions & 1 deletion pkg/vm/engine/tae/db/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ func fillRuntimeOptions(opts *options.Options) {
}
}

func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, err error) {
func Open(
ctx context.Context,
dirname string,
opts *options.Options,
dbOpts ...DBOption,
) (db *DB, err error) {
dbLocker, err := createDBLock(dirname)

logutil.Info(
Expand Down Expand Up @@ -119,6 +124,10 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e
Closed: new(atomic.Value),
usageMemo: logtail.NewTNUsageMemo(nil),
}
for _, opt := range dbOpts {
opt(db)
}

fs := objectio.NewObjectFS(opts.Fs, serviceDir)
localFs := objectio.NewObjectFS(opts.LocalFs, serviceDir)
transferTable, err := model.NewTransferTable[*model.TransferHashPage](ctx, opts.LocalFs)
Expand Down Expand Up @@ -354,6 +363,9 @@ func Open(ctx context.Context, dirname string, opts *options.Options) (db *DB, e

db.GCManager.Start()

db.Controller = NewController(db)
db.Controller.Start()

go TaeMetricsTask(ctx)

// For debug or test
Expand Down
Loading

0 comments on commit 585f0c4

Please sign in to comment.