Skip to content

Commit

Permalink
Merge branch 'fixAddDroppedCol' of https://github.com/lichunzhu/ticdc
Browse files Browse the repository at this point in the history
…into fixAddDroppedCol
  • Loading branch information
lichunzhu committed Mar 17, 2022
2 parents 3345f53 + f1c3712 commit 389c472
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 18 deletions.
8 changes: 5 additions & 3 deletions dm/dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,13 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
// add suffix
var dirSuffix string
if isS3 {
dirSuffix = c.Name + "." + c.SourceID
// we will dump files to s3 dir's subdirectory
dirSuffix = "/" + c.Name + "." + c.SourceID
} else {
dirSuffix = c.Name
// TODO we will dump local file to dir's subdirectory, but it may have risk of compatibility, we will fix in other pr
dirSuffix = "." + c.Name
}
newDir, err := storage.AdjustPath(c.LoaderConfig.Dir, "."+dirSuffix)
newDir, err := storage.AdjustPath(c.LoaderConfig.Dir, dirSuffix)
if err != nil {
return terror.ErrConfigLoaderDirInvalid.Delegate(err, c.LoaderConfig.Dir)
}
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/config/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (t *testConfig) TestSubTaskAdjustLoaderS3Dir(c *C) {
}
err = cfg.Adjust(false)
c.Assert(err, IsNil)
c.Assert(cfg.LoaderConfig.Dir, Equals, "s3://bucket2/prefix"+"."+cfg.Name+"."+cfg.SourceID)
c.Assert(cfg.LoaderConfig.Dir, Equals, "s3://bucket2/prefix"+"/"+cfg.Name+"."+cfg.SourceID)

cfg.LoaderConfig = LoaderConfig{
PoolSize: defaultPoolSize,
Expand All @@ -230,7 +230,7 @@ func (t *testConfig) TestSubTaskAdjustLoaderS3Dir(c *C) {
}
err = cfg.Adjust(false)
c.Assert(err, IsNil)
c.Assert(cfg.LoaderConfig.Dir, Equals, "s3://bucket3/prefix/path"+"."+cfg.Name+"."+cfg.SourceID+"?endpoint=https://127.0.0.1:9000&force_path_style=0&SSE=aws:kms&sse-kms-key-id=TestKey&xyz=abc")
c.Assert(cfg.LoaderConfig.Dir, Equals, "s3://bucket3/prefix/path"+"/"+cfg.Name+"."+cfg.SourceID+"?endpoint=https://127.0.0.1:9000&force_path_style=0&SSE=aws:kms&sse-kms-key-id=TestKey&xyz=abc")

// invaild dir
cfg.LoaderConfig = LoaderConfig{
Expand Down
37 changes: 33 additions & 4 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/openapi"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/storage"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand Down Expand Up @@ -355,9 +357,23 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig {

loadName, loadIdx = getGenerateName(stCfg.LoaderConfig, loadIdx, "load", loadMap)
loaderCfg := stCfg.LoaderConfig
dirSuffix := "." + c.Name

var dirSuffix string
var err error
if storage.IsS3Path(loaderCfg.Dir) {
// we will dump files to s3 dir's subdirectory
dirSuffix = "/" + c.Name + "." + stCfg.SourceID
} else {
// TODO we will dump local file to dir's subdirectory, but it may have risk of compatibility, we will fix in other pr
dirSuffix = "." + c.Name
}
// if ends with the task name, we remove to get user input dir.
loaderCfg.Dir = strings.TrimSuffix(loaderCfg.Dir, dirSuffix)
loaderCfg.Dir, err = storage.TrimPath(loaderCfg.Dir, dirSuffix)
// because dir comes form subtask, there should not have error.
if err != nil {
log.L().Warn("parse config comes from subtask error.", zap.Error(err))
}

c.Loaders[loadName] = &loaderCfg

syncName, syncIdx = getGenerateName(stCfg.SyncerConfig, syncIdx, "sync", syncMap)
Expand Down Expand Up @@ -445,9 +461,22 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta
}
taskSourceConfig.SourceConf = sourceConfList

dirSuffix := "." + oneSubtaskConfig.Name
var dirSuffix string
var err error
if storage.IsS3Path(oneSubtaskConfig.LoaderConfig.Dir) {
// we will dump files to s3 dir's subdirectory
dirSuffix = "/" + oneSubtaskConfig.Name + "." + oneSubtaskConfig.SourceID
} else {
// TODO we will dump local file to dir's subdirectory, but it may have risk of compatibility, we will fix in other pr
dirSuffix = "." + oneSubtaskConfig.Name
}
// if ends with the task name, we remove to get user input dir.
oneSubtaskConfig.LoaderConfig.Dir = strings.TrimSuffix(oneSubtaskConfig.LoaderConfig.Dir, dirSuffix)
oneSubtaskConfig.LoaderConfig.Dir, err = storage.TrimPath(oneSubtaskConfig.LoaderConfig.Dir, dirSuffix)
// because dir comes form subtask, there should not have error.
if err != nil {
log.L().Warn("parse config comes from subtask error.", zap.Error(err))
}

taskSourceConfig.FullMigrateConf = &openapi.TaskFullMigrateConf{
ExportThreads: &oneSubtaskConfig.MydumperConfig.Threads,
DataDir: &oneSubtaskConfig.LoaderConfig.Dir,
Expand Down
33 changes: 28 additions & 5 deletions dm/pkg/storage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"os"
"path"
"path/filepath"
"strings"

"github.com/pingcap/errors"
Expand All @@ -40,21 +41,43 @@ func AdjustPath(rawURL string, uniqueID string) (string, error) {
}
// not url format, we don't use url library to avoid being escaped or unescaped
if u.Scheme == "" {
// avoid duplicate add uniqueID
if !strings.HasSuffix(rawURL, uniqueID) {
return rawURL + uniqueID, nil
// avoid duplicate add uniqueID, and trim suffix '/' like './dump_data/'
trimPath := strings.TrimRight(rawURL, string(filepath.Separator))
if !strings.HasSuffix(trimPath, uniqueID) {
return trimPath + uniqueID, nil
}
return rawURL, nil
}
// u.Path is an unescaped string and can be used as normal
if !strings.HasSuffix(u.Path, uniqueID) {
u.Path += uniqueID
trimPath := strings.TrimRight(u.Path, string(filepath.Separator))
if !strings.HasSuffix(trimPath, uniqueID) {
u.Path = trimPath + uniqueID
// u.String will return escaped url and can be used safely in other steps
return u.String(), err
}
return rawURL, nil
}

// TrimPath trims rawURL suffix which is uniqueID, supports local and s3.
func TrimPath(rawURL string, uniqueID string) (string, error) {
if rawURL == "" || uniqueID == "" {
return rawURL, nil
}
u, err := bstorage.ParseRawURL(rawURL)
if err != nil {
return "", errors.Trace(err)
}
// not url format, we don't use url library to avoid being escaped or unescaped
if u.Scheme == "" {
// avoid duplicate add uniqueID, and trim suffix '/' like './dump_data/'
return strings.TrimSuffix(rawURL, uniqueID), nil
}
// u.Path is an unescaped string and can be used as normal
u.Path = strings.TrimSuffix(u.Path, uniqueID)
// u.String will return escaped url and can be used safely in other steps
return u.String(), err
}

// isS3Path judges if rawURL is s3 path.
func IsS3Path(rawURL string) bool {
if rawURL == "" {
Expand Down
8 changes: 7 additions & 1 deletion dm/pkg/storage/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestIsS3(t *testing.T) {
}
}

func TestIsS3AndAdjustS3Path(t *testing.T) {
func TestIsS3AndAdjustAndTrimPath(t *testing.T) {
testPaths := []struct {
isURLFormat bool
rawPath string
Expand All @@ -66,9 +66,11 @@ func TestIsS3AndAdjustS3Path(t *testing.T) {
{true, "file:///tmp/storage", "file:///tmp/storage_placeholder"},
{false, "/tmp/storage", "/tmp/storage_placeholder"},
{false, "./tmp/storage", "./tmp/storage_placeholder"},
{false, "./tmp/storage/", "./tmp/storage_placeholder"},
{false, "tmp/storage", "tmp/storage_placeholder"},
{true, "s3:///bucket/more/prefix", "s3:///bucket/more/prefix_placeholder"},
{true, "s3://bucket2/prefix", "s3://bucket2/prefix_placeholder"},
{true, "s3://bucket2/prefix/", "s3://bucket2/prefix_placeholder"},
{
true, "s3://bucket3/prefix/path?endpoint=https://127.0.0.1:9000&force_path_style=0&SSE=aws:kms&sse-kms-key-id=TestKey&xyz=abc",
"s3://bucket3/prefix/path_placeholder?endpoint=https://127.0.0.1:9000&force_path_style=0&SSE=aws:kms&sse-kms-key-id=TestKey&xyz=abc",
Expand Down Expand Up @@ -123,6 +125,10 @@ func TestIsS3AndAdjustS3Path(t *testing.T) {
res, err = AdjustPath(expectPath, testSeparator+testUniqueID.test)
require.NoError(t, err)
require.Equal(t, expectPath, res)
// trim
res, err = TrimPath(expectPath, testSeparator+testUniqueID.test)
require.NoError(t, err)
require.Equal(t, strings.ReplaceAll(testPath.expectPath, "_placeholder", ""), res)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions dm/tests/s3_dumpling_lighting/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ function check_dump_s3_exist() {
schema_create="${1}-schema-create.sql"
table_schema="${1}.${2}-schema.sql"

file_should_exist "${s3_DBPATH}/${dumpPath}.${3}.${4}/${schema_create}"
file_should_exist "${s3_DBPATH}/${dumpPath}.${3}.${4}/${table_schema}"
file_should_exist "${s3_DBPATH}/${dumpPath}/${3}.${4}/${schema_create}"
file_should_exist "${s3_DBPATH}/${dumpPath}/${3}.${4}/${table_schema}"
}

function file_should_exist() {
Expand Down Expand Up @@ -134,7 +134,7 @@ function run_test() {
if $1; then
check_dump_s3_exist $db1 $tb1 $2 $SOURCE_ID1
else
dir_should_not_exist "${s3_DBPATH}/${dumpPath}.${2}.${SOURCE_ID1}"
dir_should_not_exist "${s3_DBPATH}/${dumpPath}/${2}.${SOURCE_ID1}"
fi

cleanup_s3
Expand Down

0 comments on commit 389c472

Please sign in to comment.