Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
feat: get subtask config and status of v1.0.x from dm-worker (#846)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Aug 3, 2020
1 parent 261d7b5 commit f571300
Show file tree
Hide file tree
Showing 26 changed files with 1,859 additions and 136 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ ErrShardDDLOptimismTrySyncFail,[code=11111:class=functional:scope=internal:level
ErrConnInvalidTLSConfig,[code=11112:class=functional:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config."
ErrConnRegistryTLSConfig,[code=11113:class=functional:scope=internal:level=medium], "Message: fail to registry TLS config"
ErrUpgradeVersionEtcdFail,[code=11114:class=functional:scope=internal:level=high], "Message: fail to operate DM cluster version in etcd, Workaround: Please use `list-member --master` to confirm whether the DM-master cluster is healthy"
ErrInvalidV1WorkerMetaPath,[code=11115:class=functional:scope=internal:level=medium], "Message: %s is an invalid v1.0.x DM-worker meta path, Workaround: Please check no `meta-dir` set for v1.0.x DM-worker"
ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium], "Message: checking item %s is not supported\n%s, Workaround: Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`."
ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct TOML format."
ErrConfigYamlTransform,[code=20003:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct YAML format."
Expand Down
1,190 changes: 1,055 additions & 135 deletions dm/pb/dmworker.pb.go

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions dm/pbmock/dmworker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 27 additions & 1 deletion dm/proto/dmworker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ service Worker {
// a `set`/`remove` operation should be an one-time operation (only take effect once),
// so we use a gRPC method rather than a etcd operation now (no persistent operation state).
rpc OperateSchema(OperateWorkerSchemaRequest) returns(CommonWorkerResponse) {}

rpc OperateV1Meta(OperateV1MetaRequest) returns(OperateV1MetaResponse) {}
}

message StartSubTaskRequest {
Expand Down Expand Up @@ -418,4 +420,28 @@ message OperateWorkerSchemaRequest {
string database = 4; // database name
string table = 5; // table name
string schema = 6; // schema content, a `CREATE TABLE` statement
}
}

// copied `TaskMeta` from release-1.0 branch.
message V1SubTaskMeta {
TaskOp op = 1;
Stage stage = 2; // the stage of sub-task after we apply some operations
string name = 3; // sub task's name
bytes task = 4; // (sub) task's configuration
}

enum V1MetaOp {
InvalidV1MetaOp = 0;
GetV1Meta = 1;
RemoveV1Meta = 2;
}

message OperateV1MetaRequest {
V1MetaOp op = 1;
}

message OperateV1MetaResponse {
bool result = 1;
string msg = 2; // error message if failed.
map<string, V1SubTaskMeta> meta = 3; // subtasks' meta for `get` operation.
}
61 changes: 61 additions & 0 deletions dm/worker/v1meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package worker

import (
"context"
"fmt"

"go.uber.org/zap"

"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/v1workermeta"
)

// OperateV1Meta implements WorkerServer.OperateV1Meta.
func (s *Server) OperateV1Meta(ctx context.Context, req *pb.OperateV1MetaRequest) (*pb.OperateV1MetaResponse, error) {
log.L().Info("", zap.String("request", "OperateV1Meta"), zap.Stringer("payload", req))

switch req.Op {
case pb.V1MetaOp_GetV1Meta:
meta, err := v1workermeta.GetSubtasksMeta()
if err != nil {
return &pb.OperateV1MetaResponse{
Result: false,
Msg: err.Error(),
}, nil
}
return &pb.OperateV1MetaResponse{
Result: true,
Meta: meta,
}, nil
case pb.V1MetaOp_RemoveV1Meta:
err := v1workermeta.RemoveSubtasksMeta()
if err != nil {
return &pb.OperateV1MetaResponse{
Result: false,
Msg: err.Error(),
}, nil
}
return &pb.OperateV1MetaResponse{
Result: true,
}, nil
default:
return &pb.OperateV1MetaResponse{
Result: false,
Msg: fmt.Sprintf("invalid op %s", req.Op.String()),
}, nil
}
}
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,12 @@ description = ""
workaround = "Please use `list-member --master` to confirm whether the DM-master cluster is healthy"
tags = ["internal", "high"]

[error.DM-functional-11115]
message = "%s is an invalid v1.0.x DM-worker meta path"
description = ""
workaround = "Please check no `meta-dir` set for v1.0.x DM-worker"
tags = ["internal", "medium"]

[error.DM-config-20001]
message = "checking item %s is not supported\n%s"
description = ""
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285
github.com/twitchtv/retool v1.3.8-0.20180918173430-41330f8b4e07
github.com/uber-go/atomic v1.4.0 // indirect
github.com/unrolled/render v1.0.1
Expand Down
6 changes: 6 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ const (

// pkg/upgrade
codeUpgradeVersionEtcdFail

// pkg/v1workermeta
codeInvalidV1WorkerMetaPath
)

// Config related error code list
Expand Down Expand Up @@ -724,6 +727,9 @@ var (
// pkg/upgrade
ErrUpgradeVersionEtcdFail = New(codeUpgradeVersionEtcdFail, ClassFunctional, ScopeInternal, LevelHigh, "fail to operate DM cluster version in etcd", "Please use `list-member --master` to confirm whether the DM-master cluster is healthy")

// pkg/v1workermeta
ErrInvalidV1WorkerMetaPath = New(codeInvalidV1WorkerMetaPath, ClassFunctional, ScopeInternal, LevelMedium, "%s is an invalid v1.0.x DM-worker meta path", "Please check no `meta-dir` set for v1.0.x DM-worker")

// Config related error
ErrConfigCheckItemNotSupport = New(codeConfigCheckItemNotSupport, ClassConfig, ScopeInternal, LevelMedium, "checking item %s is not supported\n%s", "Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`.")
ErrConfigTomlTransform = New(codeConfigTomlTransform, ClassConfig, ScopeInternal, LevelMedium, "%s", "Please check the configuration file has correct TOML format.")
Expand Down
108 changes: 108 additions & 0 deletions pkg/v1workermeta/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v1workermeta

import (
"os"
"path/filepath"
"strconv"

"github.com/BurntSushi/toml"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// `var` rather than `const` for testing.
var (
// metaPath is the meta data path in v1.0.x.
metaPath = "./dm_worker_meta"
// dbPath is the levelDB path in v1.0.x.
dbPath = "./dm_worker_meta/kv"
)

// v1SubTaskConfig represents the subtask config in v1.0.x.
type v1SubTaskConfig struct {
config.SubTaskConfig // embed the subtask config in v2.0.x.

// NOTE: in v1.0.x, `ChunkFilesize` is `int64`, but in v2.0.x it's `string`.
// (ref: https://github.com/pingcap/dm/pull/713).
// if we decode data with v1.0.x from TOML directly,
// an error of `toml: cannot load TOML value of type int64 into a Go string` will be reported,
// so we overwrite it with another filed which has the same struct tag `chunk-filesize` here.
// but if set `chunk-filesize: 64` in a YAML file, both v1.0.x (int64) and v2.0.x (string) can support it.
ChunkFilesize int64 `yaml:"chunk-filesize" toml:"chunk-filesize" json:"chunk-filesize"` // -F, --chunk-filesize
}

// GetSubtasksMeta gets all subtasks' meta (config and status) from `dm_worker_meta` in v1.0.x.
func GetSubtasksMeta() (map[string]*pb.V1SubTaskMeta, error) {
// check if meta data exist.
if !utils.IsDirExists(metaPath) || !utils.IsDirExists(dbPath) {
return nil, nil
}

// open levelDB to get subtasks meta.
db, err := openDB(dbPath, defaultKVConfig)
if err != nil {
return nil, err
}
defer db.Close()

// load subtasks' meta from levelDB.
meta, err := newMeta(db)
if err != nil {
return nil, err
}

return meta.TasksMeta(), nil
}

// RemoveSubtasksMeta removes subtasks' metadata.
// this is often called after upgraded from v1.0.x to v2.0.x,
// so no need to handle again after re-started the DM-worker process.
func RemoveSubtasksMeta() error {
// check is a valid v1.0.x meta path.
if !utils.IsDirExists(metaPath) || !utils.IsDirExists(dbPath) {
return terror.ErrInvalidV1WorkerMetaPath.Generate(filepath.Abs(metaPath))
}

// try to open levelDB to check.
db, err := openDB(dbPath, defaultKVConfig)
if err != nil {
return terror.ErrInvalidV1WorkerMetaPath.Generate(filepath.Abs(metaPath))
}
defer db.Close()

return os.RemoveAll(metaPath)
}

// SubTaskConfigFromV1TOML gets SubTaskConfig from subtask's TOML data with v1.0.x.
func SubTaskConfigFromV1TOML(data []byte) (config.SubTaskConfig, error) {
var v1Cfg v1SubTaskConfig
_, err := toml.Decode(string(data), &v1Cfg)
if err != nil {
return config.SubTaskConfig{}, terror.ErrConfigTomlTransform.Delegate(err, "decode v1 subtask config from data")
}

cfg := v1Cfg.SubTaskConfig
cfg.MydumperConfig.ChunkFilesize = strconv.FormatInt(v1Cfg.ChunkFilesize, 10)
err = cfg.Adjust(true)
if err != nil {
return config.SubTaskConfig{}, terror.ErrConfigTomlTransform.Delegate(err, "transform `chunk-filesize`")
}

return cfg, nil
}
Loading

0 comments on commit f571300

Please sign in to comment.