Skip to content

Commit

Permalink
ddl/ingest: create new local backend if necessary (#44140)
Browse files Browse the repository at this point in the history
close #44044, close #44137
  • Loading branch information
tangenta authored May 24, 2023
1 parent 1b0e019 commit adbcb4e
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 3 deletions.
15 changes: 13 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,13 @@ func cleanupSortPath(currentJobID int64) error {
logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err))
continue
}
// For now, there is only one task using ingest at the same time,
// so we can remove all the temp data of the previous jobs.
if _, ok := ingest.LitBackCtxMgr.Load(jobID); ok {
// The job is still running, skip it.
logutil.BgLogger().Warn("[ddl-ingest] the job is still running, skip removing it",
zap.Int64("running job ID", jobID))
continue
}
// Remove all the temp data of the previous done jobs.
if jobID < currentJobID {
logutil.BgLogger().Info("[ddl-ingest] remove stale temp index data",
zap.Int64("jobID", jobID), zap.Int64("currentJobID", currentJobID))
Expand Down Expand Up @@ -916,6 +921,8 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false)
if err != nil {
if !errorIsRetryable(err, job) {
logutil.BgLogger().Warn("[ddl] run reorg job failed, convert job to rollback",
zap.String("job", job.String()), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err)
ingest.LitBackCtxMgr.Unregister(job.ID)
}
Expand Down Expand Up @@ -1844,6 +1851,10 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
if err != nil {
return errors.Trace(err)
}
// Every time we finish a partition, we update the progress of the job.
if rc := w.getReorgCtx(reorgInfo.Job.ID); rc != nil {
reorgInfo.Job.SetRowCount(rc.getRowCount())
}
}
} else {
//nolint:forcetypeassert
Expand Down
1 change: 1 addition & 0 deletions ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//util/size",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_uber_go_atomic//:atomic",
Expand Down
5 changes: 5 additions & 0 deletions ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
lightning "github.com/pingcap/tidb/br/pkg/lightning/config"
Expand Down Expand Up @@ -122,6 +123,10 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl
return err
}

failpoint.Inject("mockFinishImportErr", func() {
failpoint.Return(fmt.Errorf("mock finish import error"))
})

// Check remote duplicate value for the index.
if unique {
errorMgr := errormanager.New(nil, bc.cfg, log.Logger{Logger: logutil.BgLogger()})
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (bc *litBackendCtx) Register(jobID, indexID int64, schemaName, tableName st

var info string
en, exist := bc.Load(indexID)
if !exist {
if !exist || en.openedEngine == nil {
engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize)
ok := bc.MemRoot.CheckConsume(StructSizeEngineInfo + engineCacheSize)
if !ok {
Expand Down
22 changes: 22 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,25 @@ func TestAddIndexSplitTableRanges(t *testing.T) {
tk.MustExec("admin check table t;")
ddl.SetBackfillTaskChanSizeForTest(1024)
}

func TestAddIndexFinishImportError(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

tk.MustExec("create table t (a int primary key, b int);")
for i := 0; i < 4; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d);", i*10000, i*10000))
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/ingest/mockFinishImportErr", "1*return"))
tk.MustExec("alter table t add index idx(a);")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/ingest/mockFinishImportErr"))
tk.MustExec("admin check table t;")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
//nolint: forcetypeassert
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

0 comments on commit adbcb4e

Please sign in to comment.