From 48e64f7d10cdd7eaebd5219b494c3b86871204c0 Mon Sep 17 00:00:00 2001 From: WizardXiao <89761062+WizardXiao@users.noreply.github.com> Date: Thu, 17 Mar 2022 11:48:27 +0800 Subject: [PATCH] DM: adjust loader's dir to be add subdirectory (#4897) * commit-message: just adjust s3 path * commit-message: add notes * commit-message: fix ut * commit-message: add trim suffix * commit-message: add trim suffix for webui * commit-message: fix error handle Co-authored-by: niubell --- dm/dm/config/subtask.go | 8 +++--- dm/dm/config/subtask_test.go | 4 +-- dm/dm/config/task_converters.go | 37 +++++++++++++++++++++++++--- dm/pkg/storage/utils.go | 33 +++++++++++++++++++++---- dm/pkg/storage/utils_test.go | 8 +++++- dm/tests/s3_dumpling_lighting/run.sh | 6 ++--- 6 files changed, 78 insertions(+), 18 deletions(-) diff --git a/dm/dm/config/subtask.go b/dm/dm/config/subtask.go index 776347afd97..e307d104cde 100644 --- a/dm/dm/config/subtask.go +++ b/dm/dm/config/subtask.go @@ -411,11 +411,13 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error { // add suffix var dirSuffix string if isS3 { - dirSuffix = c.Name + "." + c.SourceID + // we will dump files to s3 dir's subdirectory + dirSuffix = "/" + c.Name + "." + c.SourceID } else { - dirSuffix = c.Name + // TODO we will dump local file to dir's subdirectory, but it may have risk of compatibility, we will fix in other pr + dirSuffix = "." + c.Name } - newDir, err := storage.AdjustPath(c.LoaderConfig.Dir, "."+dirSuffix) + newDir, err := storage.AdjustPath(c.LoaderConfig.Dir, dirSuffix) if err != nil { return terror.ErrConfigLoaderDirInvalid.Delegate(err, c.LoaderConfig.Dir) } diff --git a/dm/dm/config/subtask_test.go b/dm/dm/config/subtask_test.go index 967c040e0eb..bac451dc475 100644 --- a/dm/dm/config/subtask_test.go +++ b/dm/dm/config/subtask_test.go @@ -221,7 +221,7 @@ func (t *testConfig) TestSubTaskAdjustLoaderS3Dir(c *C) { } err = cfg.Adjust(false) c.Assert(err, IsNil) - c.Assert(cfg.LoaderConfig.Dir, Equals, "s3://bucket2/prefix"+"."+cfg.Name+"."+cfg.SourceID) + c.Assert(cfg.LoaderConfig.Dir, Equals, "s3://bucket2/prefix"+"/"+cfg.Name+"."+cfg.SourceID) cfg.LoaderConfig = LoaderConfig{ PoolSize: defaultPoolSize, @@ -230,7 +230,7 @@ func (t *testConfig) TestSubTaskAdjustLoaderS3Dir(c *C) { } err = cfg.Adjust(false) c.Assert(err, IsNil) - c.Assert(cfg.LoaderConfig.Dir, Equals, "s3://bucket3/prefix/path"+"."+cfg.Name+"."+cfg.SourceID+"?endpoint=https://127.0.0.1:9000&force_path_style=0&SSE=aws:kms&sse-kms-key-id=TestKey&xyz=abc") + c.Assert(cfg.LoaderConfig.Dir, Equals, "s3://bucket3/prefix/path"+"/"+cfg.Name+"."+cfg.SourceID+"?endpoint=https://127.0.0.1:9000&force_path_style=0&SSE=aws:kms&sse-kms-key-id=TestKey&xyz=abc") // invaild dir cfg.LoaderConfig = LoaderConfig{ diff --git a/dm/dm/config/task_converters.go b/dm/dm/config/task_converters.go index 22aab821e62..ef4e90004e5 100644 --- a/dm/dm/config/task_converters.go +++ b/dm/dm/config/task_converters.go @@ -21,9 +21,11 @@ import ( "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/filter" router "github.com/pingcap/tidb-tools/pkg/table-router" + "go.uber.org/zap" "github.com/pingcap/tiflow/dm/openapi" "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/storage" "github.com/pingcap/tiflow/dm/pkg/terror" ) @@ -355,9 +357,23 @@ func SubTaskConfigsToTaskConfig(stCfgs ...*SubTaskConfig) *TaskConfig { loadName, loadIdx = getGenerateName(stCfg.LoaderConfig, loadIdx, "load", loadMap) loaderCfg := stCfg.LoaderConfig - dirSuffix := "." + c.Name + + var dirSuffix string + var err error + if storage.IsS3Path(loaderCfg.Dir) { + // we will dump files to s3 dir's subdirectory + dirSuffix = "/" + c.Name + "." + stCfg.SourceID + } else { + // TODO we will dump local file to dir's subdirectory, but it may have risk of compatibility, we will fix in other pr + dirSuffix = "." + c.Name + } // if ends with the task name, we remove to get user input dir. - loaderCfg.Dir = strings.TrimSuffix(loaderCfg.Dir, dirSuffix) + loaderCfg.Dir, err = storage.TrimPath(loaderCfg.Dir, dirSuffix) + // because dir comes form subtask, there should not have error. + if err != nil { + log.L().Warn("parse config comes from subtask error.", zap.Error(err)) + } + c.Loaders[loadName] = &loaderCfg syncName, syncIdx = getGenerateName(stCfg.SyncerConfig, syncIdx, "sync", syncMap) @@ -445,9 +461,22 @@ func SubTaskConfigsToOpenAPITask(subTaskConfigList []*SubTaskConfig) *openapi.Ta } taskSourceConfig.SourceConf = sourceConfList - dirSuffix := "." + oneSubtaskConfig.Name + var dirSuffix string + var err error + if storage.IsS3Path(oneSubtaskConfig.LoaderConfig.Dir) { + // we will dump files to s3 dir's subdirectory + dirSuffix = "/" + oneSubtaskConfig.Name + "." + oneSubtaskConfig.SourceID + } else { + // TODO we will dump local file to dir's subdirectory, but it may have risk of compatibility, we will fix in other pr + dirSuffix = "." + oneSubtaskConfig.Name + } // if ends with the task name, we remove to get user input dir. - oneSubtaskConfig.LoaderConfig.Dir = strings.TrimSuffix(oneSubtaskConfig.LoaderConfig.Dir, dirSuffix) + oneSubtaskConfig.LoaderConfig.Dir, err = storage.TrimPath(oneSubtaskConfig.LoaderConfig.Dir, dirSuffix) + // because dir comes form subtask, there should not have error. + if err != nil { + log.L().Warn("parse config comes from subtask error.", zap.Error(err)) + } + taskSourceConfig.FullMigrateConf = &openapi.TaskFullMigrateConf{ ExportThreads: &oneSubtaskConfig.MydumperConfig.Threads, DataDir: &oneSubtaskConfig.LoaderConfig.Dir, diff --git a/dm/pkg/storage/utils.go b/dm/pkg/storage/utils.go index e07cb4e3e22..9551a54daa7 100644 --- a/dm/pkg/storage/utils.go +++ b/dm/pkg/storage/utils.go @@ -17,6 +17,7 @@ import ( "context" "os" "path" + "path/filepath" "strings" "github.com/pingcap/errors" @@ -40,21 +41,43 @@ func AdjustPath(rawURL string, uniqueID string) (string, error) { } // not url format, we don't use url library to avoid being escaped or unescaped if u.Scheme == "" { - // avoid duplicate add uniqueID - if !strings.HasSuffix(rawURL, uniqueID) { - return rawURL + uniqueID, nil + // avoid duplicate add uniqueID, and trim suffix '/' like './dump_data/' + trimPath := strings.TrimRight(rawURL, string(filepath.Separator)) + if !strings.HasSuffix(trimPath, uniqueID) { + return trimPath + uniqueID, nil } return rawURL, nil } // u.Path is an unescaped string and can be used as normal - if !strings.HasSuffix(u.Path, uniqueID) { - u.Path += uniqueID + trimPath := strings.TrimRight(u.Path, string(filepath.Separator)) + if !strings.HasSuffix(trimPath, uniqueID) { + u.Path = trimPath + uniqueID // u.String will return escaped url and can be used safely in other steps return u.String(), err } return rawURL, nil } +// TrimPath trims rawURL suffix which is uniqueID, supports local and s3. +func TrimPath(rawURL string, uniqueID string) (string, error) { + if rawURL == "" || uniqueID == "" { + return rawURL, nil + } + u, err := bstorage.ParseRawURL(rawURL) + if err != nil { + return "", errors.Trace(err) + } + // not url format, we don't use url library to avoid being escaped or unescaped + if u.Scheme == "" { + // avoid duplicate add uniqueID, and trim suffix '/' like './dump_data/' + return strings.TrimSuffix(rawURL, uniqueID), nil + } + // u.Path is an unescaped string and can be used as normal + u.Path = strings.TrimSuffix(u.Path, uniqueID) + // u.String will return escaped url and can be used safely in other steps + return u.String(), err +} + // isS3Path judges if rawURL is s3 path. func IsS3Path(rawURL string) bool { if rawURL == "" { diff --git a/dm/pkg/storage/utils_test.go b/dm/pkg/storage/utils_test.go index e3161ba381f..f15c8e6cf3f 100644 --- a/dm/pkg/storage/utils_test.go +++ b/dm/pkg/storage/utils_test.go @@ -57,7 +57,7 @@ func TestIsS3(t *testing.T) { } } -func TestIsS3AndAdjustS3Path(t *testing.T) { +func TestIsS3AndAdjustAndTrimPath(t *testing.T) { testPaths := []struct { isURLFormat bool rawPath string @@ -66,9 +66,11 @@ func TestIsS3AndAdjustS3Path(t *testing.T) { {true, "file:///tmp/storage", "file:///tmp/storage_placeholder"}, {false, "/tmp/storage", "/tmp/storage_placeholder"}, {false, "./tmp/storage", "./tmp/storage_placeholder"}, + {false, "./tmp/storage/", "./tmp/storage_placeholder"}, {false, "tmp/storage", "tmp/storage_placeholder"}, {true, "s3:///bucket/more/prefix", "s3:///bucket/more/prefix_placeholder"}, {true, "s3://bucket2/prefix", "s3://bucket2/prefix_placeholder"}, + {true, "s3://bucket2/prefix/", "s3://bucket2/prefix_placeholder"}, { true, "s3://bucket3/prefix/path?endpoint=https://127.0.0.1:9000&force_path_style=0&SSE=aws:kms&sse-kms-key-id=TestKey&xyz=abc", "s3://bucket3/prefix/path_placeholder?endpoint=https://127.0.0.1:9000&force_path_style=0&SSE=aws:kms&sse-kms-key-id=TestKey&xyz=abc", @@ -123,6 +125,10 @@ func TestIsS3AndAdjustS3Path(t *testing.T) { res, err = AdjustPath(expectPath, testSeparator+testUniqueID.test) require.NoError(t, err) require.Equal(t, expectPath, res) + // trim + res, err = TrimPath(expectPath, testSeparator+testUniqueID.test) + require.NoError(t, err) + require.Equal(t, strings.ReplaceAll(testPath.expectPath, "_placeholder", ""), res) } } } diff --git a/dm/tests/s3_dumpling_lighting/run.sh b/dm/tests/s3_dumpling_lighting/run.sh index 0ed6db7d08a..3fb1a87c89c 100755 --- a/dm/tests/s3_dumpling_lighting/run.sh +++ b/dm/tests/s3_dumpling_lighting/run.sh @@ -62,8 +62,8 @@ function check_dump_s3_exist() { schema_create="${1}-schema-create.sql" table_schema="${1}.${2}-schema.sql" - file_should_exist "${s3_DBPATH}/${dumpPath}.${3}.${4}/${schema_create}" - file_should_exist "${s3_DBPATH}/${dumpPath}.${3}.${4}/${table_schema}" + file_should_exist "${s3_DBPATH}/${dumpPath}/${3}.${4}/${schema_create}" + file_should_exist "${s3_DBPATH}/${dumpPath}/${3}.${4}/${table_schema}" } function file_should_exist() { @@ -134,7 +134,7 @@ function run_test() { if $1; then check_dump_s3_exist $db1 $tb1 $2 $SOURCE_ID1 else - dir_should_not_exist "${s3_DBPATH}/${dumpPath}.${2}.${SOURCE_ID1}" + dir_should_not_exist "${s3_DBPATH}/${dumpPath}/${2}.${SOURCE_ID1}" fi cleanup_s3