Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add indexes #41718

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,7 @@ func (b *backfillScheduler) adjustWorkerSize() error {
switch b.tp {
case typeAddIndexWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl, false)
idxWorker, err := newAddIndexWorker(b.decodeColMap, b.tbl, backfillCtx,
jc, job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
idxWorker, err := newAddIndexWorker(b.decodeColMap, b.tbl, backfillCtx, jc, job.ID, reorgInfo.elements)
if err != nil {
if canSkipError(b.reorgInfo.ID, len(b.workers), err) {
continue
Expand Down Expand Up @@ -914,18 +913,24 @@ func (b *backfillScheduler) initCopReqSenderPool() {
b.copReqSenderPool != nil || len(b.workers) > 0 {
return
}
indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, b.reorgInfo.currElement.ID)
if indexInfo == nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender",
zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID))
return

indexesInfo := make([]*model.IndexInfo, 0, len(b.reorgInfo.elements))

for _, ele := range b.reorgInfo.elements {
indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, ele.ID)
if indexInfo == nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender",
zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID))
return
}
indexesInfo = append(indexesInfo, indexInfo)
}
sessCtx, err := b.newSessCtx()
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
copCtx, err := newCopContext(b.tbl.Meta(), indexInfo, sessCtx)
copCtx, err := newCopContext(b.tbl.Meta(), indexesInfo, sessCtx)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
Expand Down
59 changes: 35 additions & 24 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,24 +331,27 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
}
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
var indexID int64
var ifExists bool
indexID := make([]int64, 1)
ifExists := make([]bool, 1)
var partitionIDs []int64
if err := job.DecodeArgs(&indexID, &ifExists, &partitionIDs); err != nil {
return errors.Trace(err)
if err := job.DecodeArgs(&indexID[0], &ifExists[0], &partitionIDs); err != nil {
if err = job.DecodeArgs(&indexID, &ifExists, &partitionIDs); err != nil {
return errors.Trace(err)
}
}
// Determine the physicalIDs to be added.
physicalIDs := []int64{job.TableID}
if len(partitionIDs) > 0 {
physicalIDs = partitionIDs
}
// Determine the index IDs to be added.
tempIdxID := tablecodec.TempIndexPrefix | indexID
var indexIDs []int64
if job.State == model.JobStateRollbackDone {
indexIDs = []int64{indexID, tempIdxID}
} else {
indexIDs = []int64{tempIdxID}
for _, idxID := range indexID {
// Determine the index IDs to be added.
tempIdxID := tablecodec.TempIndexPrefix | idxID
if job.State == model.JobStateRollbackDone {
indexIDs = append(indexIDs, idxID)
}
indexIDs = append(indexIDs, tempIdxID)
}
for _, pid := range physicalIDs {
for _, iid := range indexIDs {
Expand All @@ -362,12 +365,14 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
var indexName interface{}
var ifExists bool
var indexID int64
var partitionIDs []int64
if err := job.DecodeArgs(&indexName, &ifExists, &indexID, &partitionIDs); err != nil {
return errors.Trace(err)
indexName := make([]interface{}, 1)
ifExists := make([]bool, 1)
indexID := make([]int64, 1)
if err := job.DecodeArgs(&indexName[0], &ifExists[0], &indexID[0], &partitionIDs); err != nil {
if err = job.DecodeArgs(&indexName, &ifExists, &indexID, &partitionIDs); err != nil {
return errors.Trace(err)
}
}

// partitionIDs len is 0 if the dropped index is a global index, even if it is a partitioned table.
Expand All @@ -378,18 +383,24 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
}
})
for _, pid := range partitionIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1)
elemID := ea.allocForIndexID(pid, indexID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
for _, idxID := range indexID {
startKey := tablecodec.EncodeTableIndexPrefix(pid, idxID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, idxID+1)
elemID := ea.allocForIndexID(pid, idxID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
}
} else {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
elemID := ea.allocForIndexID(tableID, indexID)
return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID))
for _, idxID := range indexID {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, idxID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, idxID+1)
elemID := ea.allocForIndexID(tableID, idxID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("index ID is %d", idxID)); err != nil {
return errors.Trace(err)
}
}
}
case model.ActionDropColumn:
var colName model.CIStr
Expand Down
Loading