Skip to content

Commit

Permalink
enable multiple merges on one table at once (#20847)
Browse files Browse the repository at this point in the history
enable multiple merges on one table at once

Approved by: @XuPeng-SH, @sukki37
  • Loading branch information
w-zr authored Dec 23, 2024
1 parent 330ee11 commit e69033e
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 13 deletions.
13 changes: 1 addition & 12 deletions pkg/vm/engine/tae/db/merge/cnScheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
taskpb "github.com/matrixorigin/matrixone/pkg/pb/task"
Expand Down Expand Up @@ -54,23 +53,13 @@ func (s *CNMergeScheduler) sendMergeTask(ctx context.Context, task *api.MergeTas
if !ok {
return taskservice.ErrNotReady
}
taskIDPrefix := "Merge:" + strconv.Itoa(int(task.TblId))
asyncTask, err := ts.QueryAsyncTask(ctx,
taskservice.WithTaskMetadataId(taskservice.LIKE, taskIDPrefix+"%"),
taskservice.WithTaskStatusCond(taskpb.TaskStatus_Created, taskpb.TaskStatus_Running))
if err != nil {
return err
}
if len(asyncTask) != 0 {
return moerr.NewInternalError(ctx, fmt.Sprintf("table %q is merging", task.TableName))
}
b, err := task.Marshal()
if err != nil {
return err
}
return ts.CreateAsyncTask(ctx,
taskpb.TaskMetadata{
ID: taskIDPrefix + ":" + strconv.FormatInt(time.Now().Unix(), 10),
ID: "Merge:" + strconv.Itoa(int(task.TblId)) + ":" + strconv.FormatInt(time.Now().Unix(), 10),
Executor: taskpb.TaskCode_MergeObject,
Context: b,
Options: taskpb.TaskOptions{Resource: &taskpb.Resource{Memory: task.EstimatedMemUsage}},
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/merge/cnScheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestScheduler_CNActiveObjectsString(t *testing.T) {
meta := new(api.MergeTaskEntry)
require.NoError(t, meta.Unmarshal(tasks[0].Metadata.Context))
require.Equal(t, meta.DbName, tbl.GetDB().GetName())
require.Error(t, cnScheduler.sendMergeTask(context.Background(), taskEntry))
require.NoError(t, cnScheduler.sendMergeTask(context.Background(), taskEntry))
}

func TestExecutorCNMerge(t *testing.T) {
Expand Down

0 comments on commit e69033e

Please sign in to comment.