Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distribute framework: add planner #46395

Merged
merged 16 commits into from
Sep 4, 2023
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ mock_lightning: tools/bin/mockgen

gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,SubtaskExecutor,Pool,Scheduler,InternalScheduler > disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
2 changes: 1 addition & 1 deletion disttask/framework/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ go_test(
"framework_test.go",
],
flaky = True,
race = "on",
race = "off",
shard_count = 22,
deps = [
"//disttask/framework/dispatcher",
Expand Down
6 changes: 5 additions & 1 deletion disttask/framework/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "mock",
srcs = ["scheduler_mock.go"],
srcs = [
"plan_mock.go",
"scheduler_mock.go",
],
importpath = "github.com/pingcap/tidb/disttask/framework/mock",
visibility = ["//visibility:public"],
deps = [
"//disttask/framework/planner",
"//disttask/framework/proto",
"@org_uber_go_mock//gomock",
],
Expand Down
117 changes: 117 additions & 0 deletions disttask/framework/mock/plan_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions disttask/framework/planner/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "planner",
srcs = [
"plan.go",
"planner.go",
],
importpath = "github.com/pingcap/tidb/disttask/framework/planner",
visibility = ["//visibility:public"],
deps = [
"//disttask/framework/storage",
"//sessionctx",
],
)

go_test(
name = "planner_test",
timeout = "short",
srcs = [
"plan_test.go",
"planner_test.go",
],
flaky = True,
deps = [
":planner",
"//disttask/framework/mock",
"//disttask/framework/storage",
"//testkit",
"@com_github_ngaut_pools//:pools",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_mock//gomock",
],
)
112 changes: 112 additions & 0 deletions disttask/framework/planner/plan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package planner

import (
"context"

"github.com/pingcap/tidb/sessionctx"
)

// PlanCtx is the context for planning.
type PlanCtx struct {
Ctx context.Context

// integrate with current distribute framework
SessionCtx sessionctx.Context
TaskKey string
TaskType string
ThreadCnt int

// PreviousSubtaskMetas is a list of subtask metas from previous step.
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
// We can remove this field if we find a better way to pass the result between steps.
PreviousSubtaskMetas [][]byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hold task_table as member?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hold task_table as member?

The planner in tidb runtime also fetch meta data from tikv. So I think it's ok.

Copy link
Contributor Author

@GMHDBJD GMHDBJD Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using PreviousSubtaskMetas is to express that PreviousSubtaskMetas is the result of the previous step. If we use task_table, our planner will be coupled with the system table again, although adding a new interface would be more convenient. 🤔

}

// LogicalPlan represents a logical plan in distribute framework.
// A normal flow of distribute framework is: logical plan -> physical plan -> pipelines.
// To integrate with current distribute framework, the flow becomes:
// logical plan -> task meta -> physical plan -> subtaskmetas -> pipelines.
type LogicalPlan interface {
ToTaskMeta() ([]byte, error)
FromTaskMeta([]byte) error
ToPhysicalPlan(PlanCtx) (*PhysicalPlan, error)
}

// PhysicalPlan is a DAG of processors in distribute framework.
// Each processor is a node process the task with a pipeline,
// and receive/pass the result to other processors via input and output links.
type PhysicalPlan struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more detailed comments to show the arch of the planner?

Processors []ProcessorSpec
}

// AddProcessor adds a node to the DAG.
func (p *PhysicalPlan) AddProcessor(processor ProcessorSpec) {
p.Processors = append(p.Processors, processor)
}

// ToSubtaskMetas converts the physical plan to a list of subtask metas.
func (p *PhysicalPlan) ToSubtaskMetas(ctx PlanCtx, step int64) ([][]byte, error) {
subtaskMetas := make([][]byte, 0, len(p.Processors))
for _, processor := range p.Processors {
if processor.Step != step {
continue
}
subtaskMeta, err := processor.Pipeline.ToSubtaskMeta(ctx)
if err != nil {
return nil, err
}

Check warning on line 70 in disttask/framework/planner/plan.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/planner/plan.go#L69-L70

Added lines #L69 - L70 were not covered by tests
subtaskMetas = append(subtaskMetas, subtaskMeta)
}
return subtaskMetas, nil
}

// ProcessorSpec is the specification of a processor.
// A processor is a node in the DAG.
// It contains input links from other processors, as well as output links to other processors.
// It also contains an pipeline which is the actual logic of the processor.
type ProcessorSpec struct {
ID int
Input InputSpec
Pipeline PipelineSpec
Output OutputSpec
// We can remove this field if we find a better way to pass the result between steps.
Step int64
}

// InputSpec is the specification of an input.
type InputSpec struct {
ColumnTypes []byte
Links []LinkSpec
}

// OutputSpec is the specification of an output.
type OutputSpec struct {
Links []LinkSpec
}

// LinkSpec is the specification of a link.
// Link connects pipelines between different nodes.
type LinkSpec struct {
ProcessorID int
// Support endpoint for communication between processors.
// Endpoint string
}

// PipelineSpec is the specification of an pipeline.
type PipelineSpec interface {
// ToSubtaskMeta converts the pipeline to a subtask meta
ToSubtaskMeta(PlanCtx) ([]byte, error)
}
38 changes: 38 additions & 0 deletions disttask/framework/planner/plan_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package planner_test

import (
"testing"

"github.com/pingcap/tidb/disttask/framework/mock"
"github.com/pingcap/tidb/disttask/framework/planner"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func TestPhysicalPlan(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockPipelineSpec := mock.NewMockPipelineSpec(ctrl)

plan := &planner.PhysicalPlan{}
planCtx := planner.PlanCtx{}
plan.AddProcessor(planner.ProcessorSpec{Pipeline: mockPipelineSpec, Step: 1})
mockPipelineSpec.EXPECT().ToSubtaskMeta(gomock.Any()).Return([]byte("mock"), nil)
subtaskMetas, err := plan.ToSubtaskMetas(planCtx, 1)
require.NoError(t, err)
require.Equal(t, [][]byte{[]byte("mock")}, subtaskMetas)
}
46 changes: 46 additions & 0 deletions disttask/framework/planner/planner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package planner

import "github.com/pingcap/tidb/disttask/framework/storage"

// Planner represents a distribute plan planner.
type Planner struct{}

// NewPlanner creates a new planer instance.
func NewPlanner() *Planner {
return &Planner{}

Check warning on line 24 in disttask/framework/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/planner/planner.go#L23-L24

Added lines #L23 - L24 were not covered by tests
}

// Run runs the distribute plan.
func (*Planner) Run(planCtx PlanCtx, plan LogicalPlan) (int64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No usage?

globalTaskManager, err := storage.GetTaskManager()
if err != nil {
return 0, err
}

Check warning on line 32 in disttask/framework/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/planner/planner.go#L31-L32

Added lines #L31 - L32 were not covered by tests

taskMeta, err := plan.ToTaskMeta()
if err != nil {
return 0, err
}

Check warning on line 37 in disttask/framework/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

disttask/framework/planner/planner.go#L36-L37

Added lines #L36 - L37 were not covered by tests

return globalTaskManager.AddGlobalTaskWithSession(
planCtx.SessionCtx,
planCtx.TaskKey,
planCtx.TaskType,
planCtx.ThreadCnt,
taskMeta,
)
}
Loading