Skip to content

Commit

Permalink
Merge branch 'master' into clean-up-only-point-range
Browse files Browse the repository at this point in the history
  • Loading branch information
xuyifangreeneyes authored Dec 13, 2021
2 parents 2de7594 + 1f26870 commit 33779a5
Show file tree
Hide file tree
Showing 35 changed files with 892 additions and 94 deletions.
2 changes: 1 addition & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func skipUnsupportedDDLJob(job *model.Job) bool {
case model.ActionCreatePlacementPolicy,
model.ActionAlterPlacementPolicy,
model.ActionDropPlacementPolicy,
model.ActionAlterTablePartitionPolicy,
model.ActionAlterTablePartitionPlacement,
model.ActionModifySchemaDefaultPlacement,
model.ActionAlterTablePlacement,
model.ActionAlterTableAttributes,
Expand Down
7 changes: 4 additions & 3 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ package ddl

import (
"context"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -117,9 +118,9 @@ func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
tc.BaseCallback.OnWatched(ctx)
}

func (s *testDDLSuite) TestCallback(c *C) {
func TestCallback(t *testing.T) {
cb := &BaseCallback{}
c.Assert(cb.OnChanged(nil), IsNil)
require.Nil(t, cb.OnChanged(nil))
cb.OnJobRunBefore(nil)
cb.OnJobUpdated(nil)
cb.OnWatched(context.TODO())
Expand Down
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,7 @@ func (w *updateColumnWorker) cleanRowMap() {
}
}

// BackfillDataInTxn will backfill the table record in a transaction, lock corresponding rowKey, if the value of rowKey is changed.
// BackfillDataInTxn will backfill the table record in a transaction.
func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
oprStartTime := time.Now()
errInTxn = kv.RunInNewTxn(context.Background(), w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
Expand Down
2 changes: 1 addition & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = Suite(&testDBSuite2{&testDBSuite{}})
var _ = Suite(&testDBSuite3{&testDBSuite{}})
var _ = Suite(&testDBSuite4{&testDBSuite{}})
var _ = Suite(&testDBSuite5{&testDBSuite{}})
var _ = Suite(&testDBSuite6{&testDBSuite{}})
var _ = SerialSuites(&testDBSuite6{&testDBSuite{}})
var _ = Suite(&testDBSuite7{&testDBSuite{}})
var _ = Suite(&testDBSuite8{&testDBSuite{}})
var _ = SerialSuites(&testSerialDBSuite{&testDBSuite{}})
Expand Down
8 changes: 4 additions & 4 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,7 @@ func buildTableInfo(
tbInfo.CommonHandleVersion = 1
}
}
if tbInfo.PKIsHandle || tbInfo.IsCommonHandle {
if tbInfo.HasClusteredIndex() {
// Primary key cannot be invisible.
if constr.Option != nil && constr.Option.Visibility == ast.IndexVisibilityInvisible {
return nil, ErrPKIndexCantBeInvisible
Expand Down Expand Up @@ -2438,7 +2438,7 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) err
case ast.TableOptionCompression:
tbInfo.Compression = op.StrValue
case ast.TableOptionShardRowID:
if op.UintValue > 0 && (tbInfo.PKIsHandle || tbInfo.IsCommonHandle) {
if op.UintValue > 0 && tbInfo.HasClusteredIndex() {
return errUnsupportedShardRowIDBits
}
tbInfo.ShardRowIDBits = op.UintValue
Expand Down Expand Up @@ -2946,7 +2946,7 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint
// Nothing need to do.
return nil
}
if uVal > 0 && (t.Meta().PKIsHandle || t.Meta().IsCommonHandle) {
if uVal > 0 && t.Meta().HasClusteredIndex() {
return errUnsupportedShardRowIDBits
}
err = verifyNoOverflowShardBits(d.sessPool, t, uVal)
Expand Down Expand Up @@ -6440,7 +6440,7 @@ func (d *ddl) AlterTablePartitionPlacement(ctx sessionctx.Context, tableIdent as
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
Type: model.ActionAlterTablePartitionPolicy,
Type: model.ActionAlterTablePartitionPlacement,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partitionID, policyRefInfo, placementSettings},
}
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onDropPlacementPolicy(d, t, job)
case model.ActionAlterPlacementPolicy:
ver, err = onAlterPlacementPolicy(d, t, job)
case model.ActionAlterTablePartitionPolicy:
ver, err = onAlterTablePartitionOptions(d, t, job)
case model.ActionAlterTablePartitionPlacement:
ver, err = onAlterTablePartitionPlacement(t, job)
case model.ActionAlterTablePlacement:
ver, err = onAlterTablePlacement(d, t, job)
case model.ActionAlterCacheTable:
Expand Down
11 changes: 2 additions & 9 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,8 +1293,8 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i
return nil
}

// BackfillDataInTxn will backfill table index in a transaction, lock corresponding rowKey, if the value of rowKey is changed,
// indicate that index columns values may changed, index is not allowed to be added, so the txn will rollback and retry.
// BackfillDataInTxn will backfill table index in a transaction. If the value of rowKey is changed, there must be some other transactions
// update the row, result in write conflict, so the txn will rollback and retry.
// BackfillDataInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128.
func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
failpoint.Inject("errorMockPanic", func(val failpoint.Value) {
Expand Down Expand Up @@ -1329,13 +1329,6 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
continue
}

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(context.Background(), new(kv.LockCtx), idxRecord.key)
if err != nil {
return errors.Trace(err)
}

// Create the index.
handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData)
if err != nil {
Expand Down
18 changes: 10 additions & 8 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
return ver, nil
}

// notice: addingDefinitions is empty when job is in state model.StateNone
tblInfo, partInfo, addingDefinitions, err := checkAddPartition(t, job)
if err != nil {
return ver, err
Expand Down Expand Up @@ -117,14 +118,21 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
return ver, errors.Trace(err)
}

// move the adding definition into tableInfo.
updateAddingPartitionInfo(partInfo, tblInfo)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}

// modify placement settings
for _, def := range addingDefinitions {
for _, def := range tblInfo.Partition.AddingDefinitions {
if _, err = checkPlacementPolicyRefValidAndCanNonValidJob(t, job, def.PlacementPolicyRef); err != nil {
return ver, errors.Trace(err)
}
}

bundles, err := alterTablePartitionBundles(t, tblInfo, addingDefinitions)
bundles, err := alterTablePartitionBundles(t, tblInfo, tblInfo.Partition.AddingDefinitions)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand All @@ -135,12 +143,6 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}

// move the adding definition into tableInfo.
updateAddingPartitionInfo(partInfo, tblInfo)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
// none -> replica only
job.SchemaState = model.StateReplicaOnly
case model.StateReplicaOnly:
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func getPlacementPolicyDependedObjectsIDs(t *meta.Meta, policy *model.PolicyInfo
}
if tblInfo.Partition != nil {
for _, part := range tblInfo.Partition.Definitions {
if part.PlacementPolicyRef != nil && part.PlacementPolicyRef.ID == part.ID {
if part.PlacementPolicyRef != nil && part.PlacementPolicyRef.ID == policy.ID {
partIDs = append(partIDs, part.ID)
}
}
Expand Down
Loading

0 comments on commit 33779a5

Please sign in to comment.