Skip to content

Commit

Permalink
refactor tn migration 5 (#20764)
Browse files Browse the repository at this point in the history
code refactor for the tn migration

Approved by: @LeftHandCold
  • Loading branch information
XuPeng-SH authored Dec 16, 2024
1 parent fa32fcb commit 25b0a18
Show file tree
Hide file tree
Showing 21 changed files with 1,240 additions and 196 deletions.
5 changes: 5 additions & 0 deletions pkg/vm/engine/engine_util/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,11 @@ func (sinker *Sinker) Write(
}

func (sinker *Sinker) Sync(ctx context.Context) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
if len(sinker.staged.persisted) == 0 && len(sinker.staged.inMemory) == 0 {
return nil
}
Expand Down
56 changes: 39 additions & 17 deletions pkg/vm/engine/engine_util/sinker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"testing"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
Expand Down Expand Up @@ -93,34 +94,46 @@ func TestNewSinker(t *testing.T) {
require.Equal(t, 0, int(proc.Mp().CurrNB()))
}

func TestNewSinker2(t *testing.T) {
proc := testutil.NewProc()
fs, err := fileservice.Get[fileservice.FileService](
proc.GetFileService(), defines.SharedFileServiceName)
require.NoError(t, err)

schema := catalog.MockSchema(3, 2)
seqnums := make([]uint16, len(schema.Attrs()))
for i := range schema.Attrs() {
seqnums[i] = schema.GetSeqnum(schema.Attrs()[i])
func makeTestSinker(
inSchema *catalog.Schema,
mp *mpool.MPool,
fs fileservice.FileService,
) (outSchema *catalog.Schema, sinker *Sinker) {
outSchema = inSchema
if outSchema == nil {
outSchema = catalog.MockSchema(3, 2)
}
seqnums := make([]uint16, len(outSchema.Attrs()))
for i := range outSchema.Attrs() {
seqnums[i] = outSchema.GetSeqnum(outSchema.Attrs()[i])
}

factory := NewFSinkerImplFactory(
seqnums,
schema.GetPrimaryKey().Idx,
outSchema.GetPrimaryKey().Idx,
true,
false,
schema.Version,
outSchema.Version,
)

sinker := NewSinker(
schema.GetPrimaryKey().Idx,
schema.Attrs(),
schema.Types(),
sinker = NewSinker(
outSchema.GetPrimaryKey().Idx,
outSchema.Attrs(),
outSchema.Types(),
factory,
proc.Mp(),
mp,
fs,
)
return
}

func TestNewSinker2(t *testing.T) {
proc := testutil.NewProc()
fs, err := fileservice.Get[fileservice.FileService](
proc.GetFileService(), defines.SharedFileServiceName)
require.NoError(t, err)

schema, sinker := makeTestSinker(nil, proc.Mp(), fs)

for i := 0; i < 5; i++ {
bat := catalog.MockBatch(schema, 8192*2)
Expand Down Expand Up @@ -156,3 +169,12 @@ func TestNewSinker2(t *testing.T) {

require.Equal(t, 0, int(proc.Mp().CurrNB()))
}

func TestSinkerCancel(t *testing.T) {
var sinker Sinker
ctx, cancel := context.WithCancelCause(context.Background())
expectErr := moerr.NewInternalErrorNoCtx("tt")
cancel(expectErr)
err := sinker.Sync(ctx)
require.ErrorContains(t, err, expectErr.Error())
}
43 changes: 41 additions & 2 deletions pkg/vm/engine/tae/db/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) {
start time.Time = time.Now()
)

ctx, cancel := context.WithTimeout(cmd.ctx, 10*time.Minute)
defer cancel()

logger := logutil.Info
logger(
"DB-SwitchToReplay-Start",
Expand All @@ -145,7 +148,15 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) {
// TODO

// 2. switch the checkpoint|diskcleaner to replay mode
// TODO

// 2.1 remove GC disk cron job. no new GC job will be issued from now on
RemoveCronJob(c.db, CronJobs_Name_GCDisk)
RemoveCronJob(c.db, CronJobs_Name_GCCheckpoint)
if err = c.db.DiskCleaner.SwitchToReplayMode(ctx); err != nil {
// Rollback
return
}
// 2.x TODO: checkpoint runner

// 3. build forward write request tunnel to the new write candidate
// TODO
Expand Down Expand Up @@ -175,6 +186,10 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) {
// 10. forward the write requests to the new write candidate
// TODO

if err = CheckCronJobs(c.db, DBTxnMode_Replay); err != nil {
// rollback
return
}
// 11. replay the log entries from the logservice
// 11.1 switch the txn mode to replay mode
c.db.TxnMgr.ToReplayMode()
Expand All @@ -199,6 +214,9 @@ func (c *Controller) handleToWriteCmd(cmd *controlCmd) {
start time.Time = time.Now()
)

ctx, cancel := context.WithTimeout(cmd.ctx, 10*time.Minute)
defer cancel()

logger := logutil.Info
logger(
"DB-SwitchToWrite-Start",
Expand Down Expand Up @@ -234,7 +252,28 @@ func (c *Controller) handleToWriteCmd(cmd *controlCmd) {
// TODO

// 5. start merge scheduler|checkpoint|diskcleaner
// TODO
// 5.1 switch the diskcleaner to write mode
if err = c.db.DiskCleaner.SwitchToWriteMode(ctx); err != nil {
// Rollback
return
}
if err = AddCronJob(
c.db, CronJobs_Name_GCDisk, true,
); err != nil {
// Rollback
return
}
if err = AddCronJob(
c.db, CronJobs_Name_GCCheckpoint, true,
); err != nil {
// Rollback
return
}
if err = CheckCronJobs(c.db, DBTxnMode_Write); err != nil {
// Rollback
return
}
// 5.x TODO

WithTxnMode(DBTxnMode_Write)(c.db)
}
Expand Down
Loading

0 comments on commit 25b0a18

Please sign in to comment.