Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor tn migration 5 #20764

Merged
merged 22 commits into from
Dec 16, 2024
5 changes: 5 additions & 0 deletions pkg/vm/engine/engine_util/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,11 @@ func (sinker *Sinker) Write(
}

func (sinker *Sinker) Sync(ctx context.Context) error {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
}
if len(sinker.staged.persisted) == 0 && len(sinker.staged.inMemory) == 0 {
return nil
}
Expand Down
56 changes: 39 additions & 17 deletions pkg/vm/engine/engine_util/sinker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"testing"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
Expand Down Expand Up @@ -93,34 +94,46 @@ func TestNewSinker(t *testing.T) {
require.Equal(t, 0, int(proc.Mp().CurrNB()))
}

func TestNewSinker2(t *testing.T) {
proc := testutil.NewProc()
fs, err := fileservice.Get[fileservice.FileService](
proc.GetFileService(), defines.SharedFileServiceName)
require.NoError(t, err)

schema := catalog.MockSchema(3, 2)
seqnums := make([]uint16, len(schema.Attrs()))
for i := range schema.Attrs() {
seqnums[i] = schema.GetSeqnum(schema.Attrs()[i])
func makeTestSinker(
inSchema *catalog.Schema,
mp *mpool.MPool,
fs fileservice.FileService,
) (outSchema *catalog.Schema, sinker *Sinker) {
outSchema = inSchema
if outSchema == nil {
outSchema = catalog.MockSchema(3, 2)
}
seqnums := make([]uint16, len(outSchema.Attrs()))
for i := range outSchema.Attrs() {
seqnums[i] = outSchema.GetSeqnum(outSchema.Attrs()[i])
}

factory := NewFSinkerImplFactory(
seqnums,
schema.GetPrimaryKey().Idx,
outSchema.GetPrimaryKey().Idx,
true,
false,
schema.Version,
outSchema.Version,
)

sinker := NewSinker(
schema.GetPrimaryKey().Idx,
schema.Attrs(),
schema.Types(),
sinker = NewSinker(
outSchema.GetPrimaryKey().Idx,
outSchema.Attrs(),
outSchema.Types(),
factory,
proc.Mp(),
mp,
fs,
)
return
}

func TestNewSinker2(t *testing.T) {
proc := testutil.NewProc()
fs, err := fileservice.Get[fileservice.FileService](
proc.GetFileService(), defines.SharedFileServiceName)
require.NoError(t, err)

schema, sinker := makeTestSinker(nil, proc.Mp(), fs)

for i := 0; i < 5; i++ {
bat := catalog.MockBatch(schema, 8192*2)
Expand Down Expand Up @@ -156,3 +169,12 @@ func TestNewSinker2(t *testing.T) {

require.Equal(t, 0, int(proc.Mp().CurrNB()))
}

func TestSinkerCancel(t *testing.T) {
var sinker Sinker
ctx, cancel := context.WithCancelCause(context.Background())
expectErr := moerr.NewInternalErrorNoCtx("tt")
cancel(expectErr)
err := sinker.Sync(ctx)
require.ErrorContains(t, err, expectErr.Error())
}
43 changes: 41 additions & 2 deletions pkg/vm/engine/tae/db/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) {
start time.Time = time.Now()
)

ctx, cancel := context.WithTimeout(cmd.ctx, 10*time.Minute)
defer cancel()

logger := logutil.Info
logger(
"DB-SwitchToReplay-Start",
Expand All @@ -145,7 +148,15 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) {
// TODO

// 2. switch the checkpoint|diskcleaner to replay mode
// TODO

// 2.1 remove GC disk cron job. no new GC job will be issued from now on
RemoveCronJob(c.db, CronJobs_Name_GCDisk)
RemoveCronJob(c.db, CronJobs_Name_GCCheckpoint)
if err = c.db.DiskCleaner.SwitchToReplayMode(ctx); err != nil {
// Rollback
return
}
// 2.x TODO: checkpoint runner

// 3. build forward write request tunnel to the new write candidate
// TODO
Expand Down Expand Up @@ -175,6 +186,10 @@ func (c *Controller) handleToReplayCmd(cmd *controlCmd) {
// 10. forward the write requests to the new write candidate
// TODO

if err = CheckCronJobs(c.db, DBTxnMode_Replay); err != nil {
// rollback
return
}
// 11. replay the log entries from the logservice
// 11.1 switch the txn mode to replay mode
c.db.TxnMgr.ToReplayMode()
Expand All @@ -199,6 +214,9 @@ func (c *Controller) handleToWriteCmd(cmd *controlCmd) {
start time.Time = time.Now()
)

ctx, cancel := context.WithTimeout(cmd.ctx, 10*time.Minute)
defer cancel()

logger := logutil.Info
logger(
"DB-SwitchToWrite-Start",
Expand Down Expand Up @@ -234,7 +252,28 @@ func (c *Controller) handleToWriteCmd(cmd *controlCmd) {
// TODO

// 5. start merge scheduler|checkpoint|diskcleaner
// TODO
// 5.1 switch the diskcleaner to write mode
if err = c.db.DiskCleaner.SwitchToWriteMode(ctx); err != nil {
// Rollback
return
}
if err = AddCronJob(
c.db, CronJobs_Name_GCDisk, true,
); err != nil {
// Rollback
return
}
if err = AddCronJob(
c.db, CronJobs_Name_GCCheckpoint, true,
); err != nil {
// Rollback
return
}
if err = CheckCronJobs(c.db, DBTxnMode_Write); err != nil {
// Rollback
return
}
// 5.x TODO

WithTxnMode(DBTxnMode_Write)(c.db)
}
Expand Down
Loading
Loading