Skip to content

Commit

Permalink
distribute framework: add planner (#46395)
Browse files Browse the repository at this point in the history
ref #46258
  • Loading branch information
GMHDBJD authored Sep 4, 2023
1 parent 7396f4e commit 6707afa
Show file tree
Hide file tree
Showing 14 changed files with 824 additions and 115 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ mock_lightning: tools/bin/mockgen

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,SubtaskExecutor,Pool,Scheduler,InternalScheduler > disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
2 changes: 1 addition & 1 deletion disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ go_test(
"framework_test.go",
],
flaky = True,
race = "on",
race = "off",
shard_count = 22,
deps = [
"//disttask/framework/dispatcher",
Expand Down
6 changes: 5 additions & 1 deletion disttask/framework/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "mock",
srcs = ["scheduler_mock.go"],
srcs = [
"plan_mock.go",
"scheduler_mock.go",
],
importpath = "github.com/pingcap/tidb/disttask/framework/mock",
visibility = ["//visibility:public"],
deps = [
"//disttask/framework/planner",
"//disttask/framework/proto",
"@org_uber_go_mock//gomock",
],
Expand Down
117 changes: 117 additions & 0 deletions disttask/framework/mock/plan_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions disttask/framework/planner/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "planner",
srcs = [
"plan.go",
"planner.go",
],
importpath = "github.com/pingcap/tidb/disttask/framework/planner",
visibility = ["//visibility:public"],
deps = [
"//disttask/framework/storage",
"//sessionctx",
],
)

go_test(
name = "planner_test",
timeout = "short",
srcs = [
"plan_test.go",
"planner_test.go",
],
flaky = True,
deps = [
":planner",
"//disttask/framework/mock",
"//disttask/framework/storage",
"//testkit",
"@com_github_ngaut_pools//:pools",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_mock//gomock",
],
)
112 changes: 112 additions & 0 deletions disttask/framework/planner/plan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package planner

import (
"context"

"github.com/pingcap/tidb/sessionctx"
)

// PlanCtx is the context for planning.
type PlanCtx struct {
Ctx context.Context

// integrate with current distribute framework
SessionCtx sessionctx.Context
TaskKey string
TaskType string
ThreadCnt int

// PreviousSubtaskMetas is a list of subtask metas from previous step.
// We can remove this field if we find a better way to pass the result between steps.
PreviousSubtaskMetas [][]byte
}

// LogicalPlan represents a logical plan in distribute framework.
// A normal flow of distribute framework is: logical plan -> physical plan -> pipelines.
// To integrate with current distribute framework, the flow becomes:
// logical plan -> task meta -> physical plan -> subtaskmetas -> pipelines.
type LogicalPlan interface {
ToTaskMeta() ([]byte, error)
FromTaskMeta([]byte) error
ToPhysicalPlan(PlanCtx) (*PhysicalPlan, error)
}

// PhysicalPlan is a DAG of processors in distribute framework.
// Each processor is a node process the task with a pipeline,
// and receive/pass the result to other processors via input and output links.
type PhysicalPlan struct {
Processors []ProcessorSpec
}

// AddProcessor adds a node to the DAG.
func (p *PhysicalPlan) AddProcessor(processor ProcessorSpec) {
p.Processors = append(p.Processors, processor)
}

// ToSubtaskMetas converts the physical plan to a list of subtask metas.
func (p *PhysicalPlan) ToSubtaskMetas(ctx PlanCtx, step int64) ([][]byte, error) {
subtaskMetas := make([][]byte, 0, len(p.Processors))
for _, processor := range p.Processors {
if processor.Step != step {
continue
}
subtaskMeta, err := processor.Pipeline.ToSubtaskMeta(ctx)
if err != nil {
return nil, err
}
subtaskMetas = append(subtaskMetas, subtaskMeta)
}
return subtaskMetas, nil
}

// ProcessorSpec is the specification of a processor.
// A processor is a node in the DAG.
// It contains input links from other processors, as well as output links to other processors.
// It also contains an pipeline which is the actual logic of the processor.
type ProcessorSpec struct {
ID int
Input InputSpec
Pipeline PipelineSpec
Output OutputSpec
// We can remove this field if we find a better way to pass the result between steps.
Step int64
}

// InputSpec is the specification of an input.
type InputSpec struct {
ColumnTypes []byte
Links []LinkSpec
}

// OutputSpec is the specification of an output.
type OutputSpec struct {
Links []LinkSpec
}

// LinkSpec is the specification of a link.
// Link connects pipelines between different nodes.
type LinkSpec struct {
ProcessorID int
// Support endpoint for communication between processors.
// Endpoint string
}

// PipelineSpec is the specification of an pipeline.
type PipelineSpec interface {
// ToSubtaskMeta converts the pipeline to a subtask meta
ToSubtaskMeta(PlanCtx) ([]byte, error)
}
38 changes: 38 additions & 0 deletions disttask/framework/planner/plan_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package planner_test

import (
"testing"

"github.com/pingcap/tidb/disttask/framework/mock"
"github.com/pingcap/tidb/disttask/framework/planner"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func TestPhysicalPlan(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockPipelineSpec := mock.NewMockPipelineSpec(ctrl)

plan := &planner.PhysicalPlan{}
planCtx := planner.PlanCtx{}
plan.AddProcessor(planner.ProcessorSpec{Pipeline: mockPipelineSpec, Step: 1})
mockPipelineSpec.EXPECT().ToSubtaskMeta(gomock.Any()).Return([]byte("mock"), nil)
subtaskMetas, err := plan.ToSubtaskMetas(planCtx, 1)
require.NoError(t, err)
require.Equal(t, [][]byte{[]byte("mock")}, subtaskMetas)
}
46 changes: 46 additions & 0 deletions disttask/framework/planner/planner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package planner

import "github.com/pingcap/tidb/disttask/framework/storage"

// Planner represents a distribute plan planner.
type Planner struct{}

// NewPlanner creates a new planer instance.
func NewPlanner() *Planner {
return &Planner{}
}

// Run runs the distribute plan.
func (*Planner) Run(planCtx PlanCtx, plan LogicalPlan) (int64, error) {
globalTaskManager, err := storage.GetTaskManager()
if err != nil {
return 0, err
}

taskMeta, err := plan.ToTaskMeta()
if err != nil {
return 0, err
}

return globalTaskManager.AddGlobalTaskWithSession(
planCtx.SessionCtx,
planCtx.TaskKey,
planCtx.TaskType,
planCtx.ThreadCnt,
taskMeta,
)
}
Loading

0 comments on commit 6707afa

Please sign in to comment.