Skip to content

Commit

Permalink
importinto: limit global sort minimum thread required (#51776)
Browse files Browse the repository at this point in the history
ref #49008
  • Loading branch information
D3Hunter authored Mar 14, 2024
1 parent 2a0050c commit c039855
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 2 deletions.
5 changes: 5 additions & 0 deletions pkg/executor/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec
if err := e.checkTotalFileSize(); err != nil {
return err
}
// run global sort with < 16 thread might OOM on merge step
// TODO: remove this limit after control memory usage.
if e.IsGlobalSort() && e.ThreadCnt < 16 {
return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs("global sort requires at least 16 threads")
}
}
if err := e.checkTableEmpty(ctx, conn); err != nil {
return err
Expand Down
14 changes: 13 additions & 1 deletion pkg/executor/importer/precheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestCheckRequirements(t *testing.T) {
tableObj, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)

// source data file size = 0
c := &importer.LoadDataController{
Plan: &importer.Plan{
DBName: "test",
Expand All @@ -88,8 +89,18 @@ func TestCheckRequirements(t *testing.T) {
}
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataPreCheckFailed)

// now checkTotalFileSize pass, and try next pre-check item
// make checkTotalFileSize pass
c.TotalFileSize = 1
// global sort with thread count < 16
c.ThreadCnt = 15
c.CloudStorageURI = "s3://test"
err = c.CheckRequirements(ctx, conn)
require.ErrorIs(t, err, exeerrors.ErrLoadDataPreCheckFailed)
require.ErrorContains(t, err, "global sort requires at least 16 threads")

// reset fields, make global sort thread check pass
c.ThreadCnt = 1
c.CloudStorageURI = ""
// non-empty table
_, err = conn.Execute(ctx, "insert into test.t values(1)")
require.NoError(t, err)
Expand Down Expand Up @@ -155,6 +166,7 @@ func TestCheckRequirements(t *testing.T) {
require.NoError(t, c.CheckRequirements(ctx, conn))

// with global sort
c.Plan.ThreadCnt = 16
c.Plan.CloudStorageURI = ":"
require.ErrorIs(t, c.CheckRequirements(ctx, conn), exeerrors.ErrLoadDataInvalidURI)
c.Plan.CloudStorageURI = "sdsdsdsd://sdsdsdsd"
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/importintotest4/global_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *mockGCSSuite) TestGlobalSortMultiFiles() {
// 1 subtask, encoding 10 files using 4 threads.
sortStorageURI := fmt.Sprintf("gs://sorted/gs_multi_files?endpoint=%s", gcsEndpoint)
importSQL := fmt.Sprintf(`import into t FROM 'gs://gs-multi-files/t.*.csv?endpoint=%s'
with thread=4, cloud_storage_uri='%s'`, gcsEndpoint, sortStorageURI)
with cloud_storage_uri='%s'`, gcsEndpoint, sortStorageURI)
s.tk.MustQuery(importSQL)
s.tk.MustQuery("select * from t").Sort().Check(testkit.Rows(allData...))
}
1 change: 1 addition & 0 deletions tests/realtikvtest/importintotest4/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestImportInto(t *testing.T) {
}

func (s *mockGCSSuite) SetupSuite() {
testkit.EnableFailPoint(s.T(), "github.com/pingcap/tidb/pkg/util/cpu/mockNumCpu", `return(32)`)
s.Require().True(*realtikvtest.WithRealTiKV)
testutil.ReduceCheckInterval(s.T())
var err error
Expand Down

0 comments on commit c039855

Please sign in to comment.