From 7cee7f07239ced8f34e3146259597e089bbcdae4 Mon Sep 17 00:00:00 2001 From: Wei Ziran Date: Thu, 19 Dec 2024 18:38:09 +0800 Subject: [PATCH] enable multiple merges on one table at once --- pkg/vm/engine/tae/db/merge/cnScheduler.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/pkg/vm/engine/tae/db/merge/cnScheduler.go b/pkg/vm/engine/tae/db/merge/cnScheduler.go index 703ecf2fa6ad7..df4bd26860cb2 100644 --- a/pkg/vm/engine/tae/db/merge/cnScheduler.go +++ b/pkg/vm/engine/tae/db/merge/cnScheduler.go @@ -22,7 +22,6 @@ import ( "sync" "time" - "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/api" taskpb "github.com/matrixorigin/matrixone/pkg/pb/task" @@ -54,23 +53,13 @@ func (s *CNMergeScheduler) sendMergeTask(ctx context.Context, task *api.MergeTas if !ok { return taskservice.ErrNotReady } - taskIDPrefix := "Merge:" + strconv.Itoa(int(task.TblId)) - asyncTask, err := ts.QueryAsyncTask(ctx, - taskservice.WithTaskMetadataId(taskservice.LIKE, taskIDPrefix+"%"), - taskservice.WithTaskStatusCond(taskpb.TaskStatus_Created, taskpb.TaskStatus_Running)) - if err != nil { - return err - } - if len(asyncTask) != 0 { - return moerr.NewInternalError(ctx, fmt.Sprintf("table %q is merging", task.TableName)) - } b, err := task.Marshal() if err != nil { return err } return ts.CreateAsyncTask(ctx, taskpb.TaskMetadata{ - ID: taskIDPrefix + ":" + strconv.FormatInt(time.Now().Unix(), 10), + ID: "Merge:" + strconv.Itoa(int(task.TblId)) + ":" + strconv.FormatInt(time.Now().Unix(), 10), Executor: taskpb.TaskCode_MergeObject, Context: b, Options: taskpb.TaskOptions{Resource: &taskpb.Resource{Memory: task.EstimatedMemUsage}},