diff --git a/Makefile b/Makefile index 6ec54318f3a33..25d5d99cc7a05 100644 --- a/Makefile +++ b/Makefile @@ -385,6 +385,7 @@ mock_lightning: tools/bin/mockgen gen_mock: tools/bin/mockgen tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,SubtaskExecutor,Pool,Scheduler,InternalScheduler > disttask/framework/mock/scheduler_mock.go + tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go # There is no FreeBSD environment for GitHub actions. So cross-compile on Linux # but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have diff --git a/disttask/framework/BUILD.bazel b/disttask/framework/BUILD.bazel index a49591496b169..fb0eb83f55564 100644 --- a/disttask/framework/BUILD.bazel +++ b/disttask/framework/BUILD.bazel @@ -10,7 +10,7 @@ go_test( "framework_test.go", ], flaky = True, - race = "on", + race = "off", shard_count = 22, deps = [ "//disttask/framework/dispatcher", diff --git a/disttask/framework/mock/BUILD.bazel b/disttask/framework/mock/BUILD.bazel index 901065de29079..6372c8a2708d9 100644 --- a/disttask/framework/mock/BUILD.bazel +++ b/disttask/framework/mock/BUILD.bazel @@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "mock", - srcs = ["scheduler_mock.go"], + srcs = [ + "plan_mock.go", + "scheduler_mock.go", + ], importpath = "github.com/pingcap/tidb/disttask/framework/mock", visibility = ["//visibility:public"], deps = [ + "//disttask/framework/planner", "//disttask/framework/proto", "@org_uber_go_mock//gomock", ], diff --git a/disttask/framework/mock/plan_mock.go b/disttask/framework/mock/plan_mock.go new file mode 100644 index 0000000000000..59dd785138615 --- /dev/null +++ b/disttask/framework/mock/plan_mock.go @@ -0,0 +1,117 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/pingcap/tidb/disttask/framework/planner (interfaces: LogicalPlan,PipelineSpec) + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + planner "github.com/pingcap/tidb/disttask/framework/planner" + gomock "go.uber.org/mock/gomock" +) + +// MockLogicalPlan is a mock of LogicalPlan interface. +type MockLogicalPlan struct { + ctrl *gomock.Controller + recorder *MockLogicalPlanMockRecorder +} + +// MockLogicalPlanMockRecorder is the mock recorder for MockLogicalPlan. +type MockLogicalPlanMockRecorder struct { + mock *MockLogicalPlan +} + +// NewMockLogicalPlan creates a new mock instance. +func NewMockLogicalPlan(ctrl *gomock.Controller) *MockLogicalPlan { + mock := &MockLogicalPlan{ctrl: ctrl} + mock.recorder = &MockLogicalPlanMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockLogicalPlan) EXPECT() *MockLogicalPlanMockRecorder { + return m.recorder +} + +// FromTaskMeta mocks base method. +func (m *MockLogicalPlan) FromTaskMeta(arg0 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FromTaskMeta", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// FromTaskMeta indicates an expected call of FromTaskMeta. +func (mr *MockLogicalPlanMockRecorder) FromTaskMeta(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FromTaskMeta", reflect.TypeOf((*MockLogicalPlan)(nil).FromTaskMeta), arg0) +} + +// ToPhysicalPlan mocks base method. +func (m *MockLogicalPlan) ToPhysicalPlan(arg0 planner.PlanCtx) (*planner.PhysicalPlan, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ToPhysicalPlan", arg0) + ret0, _ := ret[0].(*planner.PhysicalPlan) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ToPhysicalPlan indicates an expected call of ToPhysicalPlan. +func (mr *MockLogicalPlanMockRecorder) ToPhysicalPlan(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ToPhysicalPlan", reflect.TypeOf((*MockLogicalPlan)(nil).ToPhysicalPlan), arg0) +} + +// ToTaskMeta mocks base method. +func (m *MockLogicalPlan) ToTaskMeta() ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ToTaskMeta") + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ToTaskMeta indicates an expected call of ToTaskMeta. +func (mr *MockLogicalPlanMockRecorder) ToTaskMeta() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ToTaskMeta", reflect.TypeOf((*MockLogicalPlan)(nil).ToTaskMeta)) +} + +// MockPipelineSpec is a mock of PipelineSpec interface. +type MockPipelineSpec struct { + ctrl *gomock.Controller + recorder *MockPipelineSpecMockRecorder +} + +// MockPipelineSpecMockRecorder is the mock recorder for MockPipelineSpec. +type MockPipelineSpecMockRecorder struct { + mock *MockPipelineSpec +} + +// NewMockPipelineSpec creates a new mock instance. +func NewMockPipelineSpec(ctrl *gomock.Controller) *MockPipelineSpec { + mock := &MockPipelineSpec{ctrl: ctrl} + mock.recorder = &MockPipelineSpecMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPipelineSpec) EXPECT() *MockPipelineSpecMockRecorder { + return m.recorder +} + +// ToSubtaskMeta mocks base method. +func (m *MockPipelineSpec) ToSubtaskMeta(arg0 planner.PlanCtx) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ToSubtaskMeta", arg0) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ToSubtaskMeta indicates an expected call of ToSubtaskMeta. +func (mr *MockPipelineSpecMockRecorder) ToSubtaskMeta(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ToSubtaskMeta", reflect.TypeOf((*MockPipelineSpec)(nil).ToSubtaskMeta), arg0) +} diff --git a/disttask/framework/planner/BUILD.bazel b/disttask/framework/planner/BUILD.bazel new file mode 100644 index 0000000000000..b878b73ba2f72 --- /dev/null +++ b/disttask/framework/planner/BUILD.bazel @@ -0,0 +1,35 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "planner", + srcs = [ + "plan.go", + "planner.go", + ], + importpath = "github.com/pingcap/tidb/disttask/framework/planner", + visibility = ["//visibility:public"], + deps = [ + "//disttask/framework/storage", + "//sessionctx", + ], +) + +go_test( + name = "planner_test", + timeout = "short", + srcs = [ + "plan_test.go", + "planner_test.go", + ], + flaky = True, + deps = [ + ":planner", + "//disttask/framework/mock", + "//disttask/framework/storage", + "//testkit", + "@com_github_ngaut_pools//:pools", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//util", + "@org_uber_go_mock//gomock", + ], +) diff --git a/disttask/framework/planner/plan.go b/disttask/framework/planner/plan.go new file mode 100644 index 0000000000000..2e20be064826d --- /dev/null +++ b/disttask/framework/planner/plan.go @@ -0,0 +1,112 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package planner + +import ( + "context" + + "github.com/pingcap/tidb/sessionctx" +) + +// PlanCtx is the context for planning. +type PlanCtx struct { + Ctx context.Context + + // integrate with current distribute framework + SessionCtx sessionctx.Context + TaskKey string + TaskType string + ThreadCnt int + + // PreviousSubtaskMetas is a list of subtask metas from previous step. + // We can remove this field if we find a better way to pass the result between steps. + PreviousSubtaskMetas [][]byte +} + +// LogicalPlan represents a logical plan in distribute framework. +// A normal flow of distribute framework is: logical plan -> physical plan -> pipelines. +// To integrate with current distribute framework, the flow becomes: +// logical plan -> task meta -> physical plan -> subtaskmetas -> pipelines. +type LogicalPlan interface { + ToTaskMeta() ([]byte, error) + FromTaskMeta([]byte) error + ToPhysicalPlan(PlanCtx) (*PhysicalPlan, error) +} + +// PhysicalPlan is a DAG of processors in distribute framework. +// Each processor is a node process the task with a pipeline, +// and receive/pass the result to other processors via input and output links. +type PhysicalPlan struct { + Processors []ProcessorSpec +} + +// AddProcessor adds a node to the DAG. +func (p *PhysicalPlan) AddProcessor(processor ProcessorSpec) { + p.Processors = append(p.Processors, processor) +} + +// ToSubtaskMetas converts the physical plan to a list of subtask metas. +func (p *PhysicalPlan) ToSubtaskMetas(ctx PlanCtx, step int64) ([][]byte, error) { + subtaskMetas := make([][]byte, 0, len(p.Processors)) + for _, processor := range p.Processors { + if processor.Step != step { + continue + } + subtaskMeta, err := processor.Pipeline.ToSubtaskMeta(ctx) + if err != nil { + return nil, err + } + subtaskMetas = append(subtaskMetas, subtaskMeta) + } + return subtaskMetas, nil +} + +// ProcessorSpec is the specification of a processor. +// A processor is a node in the DAG. +// It contains input links from other processors, as well as output links to other processors. +// It also contains an pipeline which is the actual logic of the processor. +type ProcessorSpec struct { + ID int + Input InputSpec + Pipeline PipelineSpec + Output OutputSpec + // We can remove this field if we find a better way to pass the result between steps. + Step int64 +} + +// InputSpec is the specification of an input. +type InputSpec struct { + ColumnTypes []byte + Links []LinkSpec +} + +// OutputSpec is the specification of an output. +type OutputSpec struct { + Links []LinkSpec +} + +// LinkSpec is the specification of a link. +// Link connects pipelines between different nodes. +type LinkSpec struct { + ProcessorID int + // Support endpoint for communication between processors. + // Endpoint string +} + +// PipelineSpec is the specification of an pipeline. +type PipelineSpec interface { + // ToSubtaskMeta converts the pipeline to a subtask meta + ToSubtaskMeta(PlanCtx) ([]byte, error) +} diff --git a/disttask/framework/planner/plan_test.go b/disttask/framework/planner/plan_test.go new file mode 100644 index 0000000000000..565a6a6d0ec7e --- /dev/null +++ b/disttask/framework/planner/plan_test.go @@ -0,0 +1,38 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package planner_test + +import ( + "testing" + + "github.com/pingcap/tidb/disttask/framework/mock" + "github.com/pingcap/tidb/disttask/framework/planner" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func TestPhysicalPlan(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockPipelineSpec := mock.NewMockPipelineSpec(ctrl) + + plan := &planner.PhysicalPlan{} + planCtx := planner.PlanCtx{} + plan.AddProcessor(planner.ProcessorSpec{Pipeline: mockPipelineSpec, Step: 1}) + mockPipelineSpec.EXPECT().ToSubtaskMeta(gomock.Any()).Return([]byte("mock"), nil) + subtaskMetas, err := plan.ToSubtaskMetas(planCtx, 1) + require.NoError(t, err) + require.Equal(t, [][]byte{[]byte("mock")}, subtaskMetas) +} diff --git a/disttask/framework/planner/planner.go b/disttask/framework/planner/planner.go new file mode 100644 index 0000000000000..03fd9f5448a23 --- /dev/null +++ b/disttask/framework/planner/planner.go @@ -0,0 +1,46 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package planner + +import "github.com/pingcap/tidb/disttask/framework/storage" + +// Planner represents a distribute plan planner. +type Planner struct{} + +// NewPlanner creates a new planer instance. +func NewPlanner() *Planner { + return &Planner{} +} + +// Run runs the distribute plan. +func (*Planner) Run(planCtx PlanCtx, plan LogicalPlan) (int64, error) { + globalTaskManager, err := storage.GetTaskManager() + if err != nil { + return 0, err + } + + taskMeta, err := plan.ToTaskMeta() + if err != nil { + return 0, err + } + + return globalTaskManager.AddGlobalTaskWithSession( + planCtx.SessionCtx, + planCtx.TaskKey, + planCtx.TaskType, + planCtx.ThreadCnt, + taskMeta, + ) +} diff --git a/disttask/framework/planner/planner_test.go b/disttask/framework/planner/planner_test.go new file mode 100644 index 0000000000000..ff95d506fd6b4 --- /dev/null +++ b/disttask/framework/planner/planner_test.go @@ -0,0 +1,59 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package planner_test + +import ( + "context" + "testing" + "time" + + "github.com/ngaut/pools" + "github.com/pingcap/tidb/disttask/framework/mock" + "github.com/pingcap/tidb/disttask/framework/planner" + "github.com/pingcap/tidb/disttask/framework/storage" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/util" + "go.uber.org/mock/gomock" +) + +func TestPlanner(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx := context.Background() + store := testkit.CreateMockStore(t) + gtk := testkit.NewTestKit(t, store) + pool := pools.NewResourcePool(func() (pools.Resource, error) { + return gtk.Session(), nil + }, 1, 1, time.Second) + defer pool.Close() + mgr := storage.NewTaskManager(util.WithInternalSourceType(ctx, "taskManager"), pool) + storage.SetTaskManager(mgr) + + p := &planner.Planner{} + pCtx := planner.PlanCtx{ + Ctx: ctx, + SessionCtx: gtk.Session(), + TaskKey: "1", + TaskType: "example", + ThreadCnt: 1, + } + mockLogicalPlan := mock.NewMockLogicalPlan(ctrl) + mockLogicalPlan.EXPECT().ToTaskMeta().Return([]byte("mock"), nil) + taskID, err := p.Run(pCtx, mockLogicalPlan) + require.NoError(t, err) + require.Equal(t, int64(1), taskID) +} diff --git a/disttask/importinto/BUILD.bazel b/disttask/importinto/BUILD.bazel index 727fc7f5cdcb8..91c591205b28b 100644 --- a/disttask/importinto/BUILD.bazel +++ b/disttask/importinto/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "dispatcher.go", "job.go", + "planner.go", "proto.go", "scheduler.go", "subtask_executor.go", @@ -26,6 +27,7 @@ go_library( "//config", "//disttask/framework/dispatcher", "//disttask/framework/handle", + "//disttask/framework/planner", "//disttask/framework/proto", "//disttask/framework/scheduler", "//disttask/framework/storage", @@ -60,22 +62,26 @@ go_test( timeout = "short", srcs = [ "dispatcher_test.go", + "planner_test.go", "subtask_executor_test.go", "wrapper_test.go", ], embed = [":importinto"], flaky = True, race = "on", - shard_count = 3, + shard_count = 5, deps = [ "//br/pkg/lightning/checkpoints", "//br/pkg/lightning/mydump", "//br/pkg/lightning/verification", + "//disttask/framework/planner", "//disttask/framework/proto", "//disttask/framework/storage", "//domain/infosync", "//executor/importer", + "//meta/autoid", "//parser/model", + "//parser/mysql", "//testkit", "//util/logutil", "@com_github_ngaut_pools//:pools", diff --git a/disttask/importinto/dispatcher.go b/disttask/importinto/dispatcher.go index 91a7aea05876e..50d963adab71c 100644 --- a/disttask/importinto/dispatcher.go +++ b/disttask/importinto/dispatcher.go @@ -25,22 +25,19 @@ import ( dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" - verify "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/disttask/framework/dispatcher" + "github.com/pingcap/tidb/disttask/framework/planner" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/executor/importer" - "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/etcd" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" @@ -220,6 +217,7 @@ func (dsp *importDispatcher) OnNextStage(ctx context.Context, handle dispatcher. } }() + var nextStep int64 switch gTask.Step { case proto.StepInit: if err := preProcess(ctx, handle, gTask, taskMeta, logger); err != nil { @@ -228,49 +226,50 @@ func (dsp *importDispatcher) OnNextStage(ctx context.Context, handle dispatcher. if err = startJob(ctx, handle, taskMeta); err != nil { return nil, err } - subtaskMetas, err := generateImportStepMetas(ctx, taskMeta) - if err != nil { - return nil, err - } - logger.Info("move to import step", zap.Any("subtask-count", len(subtaskMetas))) - metaBytes := make([][]byte, 0, len(subtaskMetas)) - for _, subtaskMeta := range subtaskMetas { - bs, err := json.Marshal(subtaskMeta) - if err != nil { - return nil, err - } - metaBytes = append(metaBytes, bs) - } - gTask.Step = StepImport - return metaBytes, nil + logger.Info("move to import step") + nextStep = StepImport case StepImport: dsp.switchTiKV2NormalMode(ctx, gTask, logger) failpoint.Inject("clearLastSwitchTime", func() { dsp.lastSwitchTime.Store(time.Time{}) }) - stepMeta, err2 := toPostProcessStep(handle, gTask, taskMeta) - if err2 != nil { - return nil, err2 - } if err = job2Step(ctx, taskMeta, importer.JobStepValidating); err != nil { return nil, err } - logger.Info("move to post-process step ", zap.Any("result", taskMeta.Result), - zap.Any("step-meta", stepMeta)) - bs, err := json.Marshal(stepMeta) - if err != nil { - return nil, err - } failpoint.Inject("failWhenDispatchPostProcessSubtask", func() { failpoint.Return(nil, errors.New("injected error after StepImport")) }) - gTask.Step = StepPostProcess - return [][]byte{bs}, nil + if err := updateResult(handle, gTask, taskMeta); err != nil { + return nil, err + } + logger.Info("move to post-process step ", zap.Any("result", taskMeta.Result)) + nextStep = StepPostProcess case StepPostProcess: return nil, nil default: return nil, errors.Errorf("unknown step %d", gTask.Step) } + + previousSubtaskMetas, err := handle.GetPreviousSubtaskMetas(gTask.ID, gTask.Step) + if err != nil { + return nil, err + } + planCtx := planner.PlanCtx{Ctx: ctx, PreviousSubtaskMetas: previousSubtaskMetas} + logicalPlan := &LogicalPlan{} + if err := logicalPlan.FromTaskMeta(gTask.Meta); err != nil { + return nil, err + } + physicalPlan, err := logicalPlan.ToPhysicalPlan(planCtx) + if err != nil { + return nil, err + } + metaBytes, err := physicalPlan.ToSubtaskMetas(planCtx, nextStep) + if err != nil { + return nil, err + } + gTask.Step = nextStep + logger.Info("generate subtasks", zap.Int64("step", nextStep), zap.Int("subtask-count", len(metaBytes))) + return metaBytes, nil } func (dsp *importDispatcher) OnErrStage(ctx context.Context, handle dispatcher.TaskHandle, gTask *proto.Task, receiveErrs []error) ([]byte, error) { @@ -446,24 +445,6 @@ func updateMeta(gTask *proto.Task, taskMeta *TaskMeta) error { return nil } -func buildController(taskMeta *TaskMeta) (*importer.LoadDataController, error) { - idAlloc := kv.NewPanickingAllocators(0) - tbl, err := tables.TableFromMeta(idAlloc, taskMeta.Plan.TableInfo) - if err != nil { - return nil, err - } - - astArgs, err := importer.ASTArgsFromStmt(taskMeta.Stmt) - if err != nil { - return nil, err - } - controller, err := importer.NewLoadDataController(&taskMeta.Plan, tbl, astArgs) - if err != nil { - return nil, err - } - return controller, nil -} - // todo: converting back and forth, we should unify struct and remove this function later. func toChunkMap(engineCheckpoints map[int32]*checkpoints.EngineCheckpoint) map[int32][]Chunk { chunkMap := make(map[int32][]Chunk, len(engineCheckpoints)) @@ -476,66 +457,23 @@ func toChunkMap(engineCheckpoints map[int32]*checkpoints.EngineCheckpoint) map[i return chunkMap } -func generateImportStepMetas(ctx context.Context, taskMeta *TaskMeta) (subtaskMetas []*ImportStepMeta, err error) { - var chunkMap map[int32][]Chunk - if len(taskMeta.ChunkMap) > 0 { - chunkMap = taskMeta.ChunkMap - } else { - controller, err2 := buildController(taskMeta) - if err2 != nil { - return nil, err2 - } - if err2 = controller.InitDataFiles(ctx); err2 != nil { - return nil, err2 - } - - engineCheckpoints, err2 := controller.PopulateChunks(ctx) - if err2 != nil { - return nil, err2 - } - chunkMap = toChunkMap(engineCheckpoints) - } - for id := range chunkMap { - if id == common.IndexEngineID { - continue - } - subtaskMeta := &ImportStepMeta{ - ID: id, - Chunks: chunkMap[id], - } - subtaskMetas = append(subtaskMetas, subtaskMeta) - } - return subtaskMetas, nil -} - // we will update taskMeta in place and make gTask.Meta point to the new taskMeta. -func toPostProcessStep(handle dispatcher.TaskHandle, gTask *proto.Task, taskMeta *TaskMeta) (*PostProcessStepMeta, error) { +func updateResult(handle dispatcher.TaskHandle, gTask *proto.Task, taskMeta *TaskMeta) error { metas, err := handle.GetPreviousSubtaskMetas(gTask.ID, gTask.Step) if err != nil { - return nil, err + return err } subtaskMetas := make([]*ImportStepMeta, 0, len(metas)) for _, bs := range metas { var subtaskMeta ImportStepMeta if err := json.Unmarshal(bs, &subtaskMeta); err != nil { - return nil, err + return err } subtaskMetas = append(subtaskMetas, &subtaskMeta) } - var localChecksum verify.KVChecksum - maxIDs := make(map[autoid.AllocatorType]int64, 3) columnSizeMap := make(map[int64]int64) for _, subtaskMeta := range subtaskMetas { - checksum := verify.MakeKVChecksum(subtaskMeta.Checksum.Size, subtaskMeta.Checksum.KVs, subtaskMeta.Checksum.Sum) - localChecksum.Add(&checksum) - - for key, val := range subtaskMeta.MaxIDs { - if maxIDs[key] < val { - maxIDs[key] = val - } - } - taskMeta.Result.ReadRowCnt += subtaskMeta.Result.ReadRowCnt taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt for key, val := range subtaskMeta.Result.ColSizeMap { @@ -543,17 +481,7 @@ func toPostProcessStep(handle dispatcher.TaskHandle, gTask *proto.Task, taskMeta } } taskMeta.Result.ColSizeMap = columnSizeMap - if err2 := updateMeta(gTask, taskMeta); err2 != nil { - return nil, err2 - } - return &PostProcessStepMeta{ - Checksum: Checksum{ - Size: localChecksum.SumSize(), - KVs: localChecksum.SumKVS(), - Sum: localChecksum.Sum(), - }, - MaxIDs: maxIDs, - }, nil + return updateMeta(gTask, taskMeta) } func startJob(ctx context.Context, handle dispatcher.TaskHandle, taskMeta *TaskMeta) error { diff --git a/disttask/importinto/job.go b/disttask/importinto/job.go index 64b61048d8c88..9da4b41dbe3de 100644 --- a/disttask/importinto/job.go +++ b/disttask/importinto/job.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/disttask/framework/handle" + "github.com/pingcap/tidb/disttask/framework/planner" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/storage" "github.com/pingcap/tidb/domain/infosync" @@ -175,19 +176,25 @@ func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, err if err2 != nil { return err2 } - task := TaskMeta{ + + // TODO: use planner.Run to run the logical plan + // now creating import job and submitting distributed task should be in the same transaction. + logicalPlan := &LogicalPlan{ JobID: jobID, Plan: *plan, Stmt: ti.stmt, EligibleInstances: instances, ChunkMap: ti.chunkMap, } - taskMeta, err2 := json.Marshal(task) - if err2 != nil { - return err2 + planCtx := planner.PlanCtx{ + Ctx: ctx, + SessionCtx: se, + TaskKey: TaskKey(jobID), + TaskType: proto.ImportInto, + ThreadCnt: int(plan.ThreadCnt), } - taskID, err2 = globalTaskManager.AddGlobalTaskWithSession(se, TaskKey(jobID), proto.ImportInto, - int(plan.ThreadCnt), taskMeta) + p := planner.NewPlanner() + taskID, err2 = p.Run(planCtx, logicalPlan) if err2 != nil { return err2 } diff --git a/disttask/importinto/planner.go b/disttask/importinto/planner.go new file mode 100644 index 0000000000000..5eb2483dbf188 --- /dev/null +++ b/disttask/importinto/planner.go @@ -0,0 +1,230 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importinto + +import ( + "context" + "encoding/json" + + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" + "github.com/pingcap/tidb/br/pkg/lightning/common" + verify "github.com/pingcap/tidb/br/pkg/lightning/verification" + "github.com/pingcap/tidb/disttask/framework/planner" + "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/executor/importer" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/table/tables" +) + +var ( + _ planner.LogicalPlan = &LogicalPlan{} + _ planner.PipelineSpec = &ImportSpec{} + _ planner.PipelineSpec = &PostProcessSpec{} +) + +// LogicalPlan represents a logical plan for import into. +type LogicalPlan struct { + JobID int64 + Plan importer.Plan + Stmt string + EligibleInstances []*infosync.ServerInfo + ChunkMap map[int32][]Chunk +} + +// ToTaskMeta converts the logical plan to task meta. +func (p *LogicalPlan) ToTaskMeta() ([]byte, error) { + taskMeta := TaskMeta{ + JobID: p.JobID, + Plan: p.Plan, + Stmt: p.Stmt, + EligibleInstances: p.EligibleInstances, + ChunkMap: p.ChunkMap, + } + return json.Marshal(taskMeta) +} + +// FromTaskMeta converts the task meta to logical plan. +func (p *LogicalPlan) FromTaskMeta(bs []byte) error { + var taskMeta TaskMeta + if err := json.Unmarshal(bs, &taskMeta); err != nil { + return err + } + p.JobID = taskMeta.JobID + p.Plan = taskMeta.Plan + p.Stmt = taskMeta.Stmt + p.EligibleInstances = taskMeta.EligibleInstances + p.ChunkMap = taskMeta.ChunkMap + return nil +} + +// ToPhysicalPlan converts the logical plan to physical plan. +func (p *LogicalPlan) ToPhysicalPlan(planCtx planner.PlanCtx) (*planner.PhysicalPlan, error) { + physicalPlan := &planner.PhysicalPlan{} + inputLinks := make([]planner.LinkSpec, 0) + // physical plan only needs to be generated once. + // However, our current implementation requires generating it for each step. + // Only the first step needs to generate import specs. + // This is a fast path to bypass generating import spec multiple times (as we need to access the source data). + if len(planCtx.PreviousSubtaskMetas) == 0 { + importSpecs, err := generateImportSpecs(planCtx.Ctx, p) + if err != nil { + return nil, err + } + + for i, importSpec := range importSpecs { + physicalPlan.AddProcessor(planner.ProcessorSpec{ + ID: i, + Pipeline: importSpec, + Output: planner.OutputSpec{ + Links: []planner.LinkSpec{ + { + ProcessorID: len(importSpecs), + }, + }, + }, + Step: StepImport, + }) + inputLinks = append(inputLinks, planner.LinkSpec{ + ProcessorID: i, + }) + } + } + + physicalPlan.AddProcessor(planner.ProcessorSpec{ + ID: len(inputLinks), + Input: planner.InputSpec{ + ColumnTypes: []byte{ + // Checksum_crc64_xor, Total_kvs, Total_bytes, ReadRowCnt, LoadedRowCnt, ColSizeMap + mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeJSON, + }, + Links: inputLinks, + }, + Pipeline: &PostProcessSpec{ + Schema: p.Plan.DBName, + Table: p.Plan.TableInfo.Name.L, + }, + Step: StepPostProcess, + }) + return physicalPlan, nil +} + +// ImportSpec is the specification of an import pipeline. +type ImportSpec struct { + ID int32 + Plan importer.Plan + Chunks []Chunk +} + +// ToSubtaskMeta converts the import spec to subtask meta. +func (s *ImportSpec) ToSubtaskMeta(planner.PlanCtx) ([]byte, error) { + importStepMeta := ImportStepMeta{ + ID: s.ID, + Chunks: s.Chunks, + } + return json.Marshal(importStepMeta) +} + +// PostProcessSpec is the specification of a post process pipeline. +type PostProcessSpec struct { + // for checksum request + Schema string + Table string +} + +// ToSubtaskMeta converts the post process spec to subtask meta. +func (*PostProcessSpec) ToSubtaskMeta(planCtx planner.PlanCtx) ([]byte, error) { + subtaskMetas := make([]*ImportStepMeta, 0, len(planCtx.PreviousSubtaskMetas)) + for _, bs := range planCtx.PreviousSubtaskMetas { + var subtaskMeta ImportStepMeta + if err := json.Unmarshal(bs, &subtaskMeta); err != nil { + return nil, err + } + subtaskMetas = append(subtaskMetas, &subtaskMeta) + } + var localChecksum verify.KVChecksum + maxIDs := make(map[autoid.AllocatorType]int64, 3) + for _, subtaskMeta := range subtaskMetas { + checksum := verify.MakeKVChecksum(subtaskMeta.Checksum.Size, subtaskMeta.Checksum.KVs, subtaskMeta.Checksum.Sum) + localChecksum.Add(&checksum) + + for key, val := range subtaskMeta.MaxIDs { + if maxIDs[key] < val { + maxIDs[key] = val + } + } + } + postProcessStepMeta := &PostProcessStepMeta{ + Checksum: Checksum{ + Size: localChecksum.SumSize(), + KVs: localChecksum.SumKVS(), + Sum: localChecksum.Sum(), + }, + MaxIDs: maxIDs, + } + return json.Marshal(postProcessStepMeta) +} + +func buildController(p *LogicalPlan) (*importer.LoadDataController, error) { + idAlloc := kv.NewPanickingAllocators(0) + tbl, err := tables.TableFromMeta(idAlloc, p.Plan.TableInfo) + if err != nil { + return nil, err + } + + astArgs, err := importer.ASTArgsFromStmt(p.Stmt) + if err != nil { + return nil, err + } + controller, err := importer.NewLoadDataController(&p.Plan, tbl, astArgs) + if err != nil { + return nil, err + } + return controller, nil +} + +func generateImportSpecs(ctx context.Context, p *LogicalPlan) ([]*ImportSpec, error) { + var chunkMap map[int32][]Chunk + if len(p.ChunkMap) > 0 { + chunkMap = p.ChunkMap + } else { + controller, err2 := buildController(p) + if err2 != nil { + return nil, err2 + } + if err2 = controller.InitDataFiles(ctx); err2 != nil { + return nil, err2 + } + + engineCheckpoints, err2 := controller.PopulateChunks(ctx) + if err2 != nil { + return nil, err2 + } + chunkMap = toChunkMap(engineCheckpoints) + } + importSpecs := make([]*ImportSpec, 0, len(chunkMap)) + for id := range chunkMap { + if id == common.IndexEngineID { + continue + } + importSpec := &ImportSpec{ + ID: id, + Plan: p.Plan, + Chunks: chunkMap[id], + } + importSpecs = append(importSpecs, importSpec) + } + return importSpecs, nil +} diff --git a/disttask/importinto/planner_test.go b/disttask/importinto/planner_test.go new file mode 100644 index 0000000000000..7ce4733b4d2ff --- /dev/null +++ b/disttask/importinto/planner_test.go @@ -0,0 +1,126 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importinto + +import ( + "encoding/json" + "testing" + + "github.com/pingcap/tidb/disttask/framework/planner" + "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/executor/importer" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/stretchr/testify/require" +) + +func TestLogicalPlan(t *testing.T) { + logicalPlan := &LogicalPlan{ + JobID: 1, + Plan: importer.Plan{}, + Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`, + EligibleInstances: []*infosync.ServerInfo{{ID: "1"}}, + ChunkMap: map[int32][]Chunk{1: {{Path: "gs://test-load/1.csv"}}}, + } + bs, err := logicalPlan.ToTaskMeta() + require.NoError(t, err) + plan := &LogicalPlan{} + require.NoError(t, plan.FromTaskMeta(bs)) + require.Equal(t, logicalPlan, plan) +} + +func TestToPhysicalPlan(t *testing.T) { + chunkID := int32(1) + logicalPlan := &LogicalPlan{ + JobID: 1, + Plan: importer.Plan{ + DBName: "db", + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("tb"), + }, + }, + Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`, + EligibleInstances: []*infosync.ServerInfo{{ID: "1"}}, + ChunkMap: map[int32][]Chunk{chunkID: {{Path: "gs://test-load/1.csv"}}}, + } + planCtx := planner.PlanCtx{} + physicalPlan, err := logicalPlan.ToPhysicalPlan(planCtx) + require.NoError(t, err) + plan := &planner.PhysicalPlan{ + Processors: []planner.ProcessorSpec{ + { + ID: 0, + Pipeline: &ImportSpec{ + ID: chunkID, + Plan: logicalPlan.Plan, + Chunks: logicalPlan.ChunkMap[chunkID], + }, + Output: planner.OutputSpec{ + Links: []planner.LinkSpec{ + { + ProcessorID: 1, + }, + }, + }, + Step: StepImport, + }, + { + ID: 1, + Input: planner.InputSpec{ + ColumnTypes: []byte{ + mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeLonglong, mysql.TypeJSON, + }, + Links: []planner.LinkSpec{ + { + ProcessorID: 0, + }, + }, + }, + Pipeline: &PostProcessSpec{ + Schema: "db", + Table: "tb", + }, + Step: StepPostProcess, + }, + }, + } + require.Equal(t, plan, physicalPlan) + + subtaskMetas1, err := physicalPlan.ToSubtaskMetas(planCtx, StepImport) + require.NoError(t, err) + subtaskMeta1 := ImportStepMeta{ + ID: chunkID, + Chunks: logicalPlan.ChunkMap[chunkID], + } + bs, err := json.Marshal(subtaskMeta1) + require.NoError(t, err) + require.Equal(t, [][]byte{bs}, subtaskMetas1) + + subtaskMeta1.Checksum = Checksum{Size: 1, KVs: 2, Sum: 3} + bs, err = json.Marshal(subtaskMeta1) + require.NoError(t, err) + subtaskMetas2, err := physicalPlan.ToSubtaskMetas(planner.PlanCtx{ + PreviousSubtaskMetas: [][]byte{bs}, + }, StepPostProcess) + require.NoError(t, err) + subtaskMeta2 := PostProcessStepMeta{ + Checksum: Checksum{Size: 1, KVs: 2, Sum: 3}, + MaxIDs: map[autoid.AllocatorType]int64{}, + } + bs, err = json.Marshal(subtaskMeta2) + require.NoError(t, err) + require.Equal(t, [][]byte{bs}, subtaskMetas2) +}