Skip to content

Commit

Permalink
ddl: support index regions and updating the existed table rule when c…
Browse files Browse the repository at this point in the history
…hanging partition (#33925) (#34025)

close #33929
  • Loading branch information
ti-srebot authored Apr 28, 2022
1 parent 92be685 commit 0ce2900
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 13 deletions.
38 changes: 37 additions & 1 deletion ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,42 @@ PARTITION BY RANGE (c) (
c.Assert(err, IsNil)
_, err = tk.Exec(`alter table alter_p partition p1 attributes " merge_option=allow , key=value ";`)
c.Assert(err, IsNil)

// reset all
tk.MustExec(`alter table alter_p partition p0 attributes default;`)
tk.MustExec(`alter table alter_p partition p1 attributes default;`)
tk.MustExec(`alter table alter_p partition p2 attributes default;`)
tk.MustExec(`alter table alter_p partition p3 attributes default;`)

// add table level attribute
tk.MustExec(`alter table alter_p attributes="merge_option=deny";`)
rows := tk.MustQuery(`select * from information_schema.attributes;`).Sort().Rows()
c.Assert(len(rows), Equals, 1)

// add a new partition p4
tk.MustExec(`alter table alter_p add partition (PARTITION p4 VALUES LESS THAN (60));`)
rows1 := tk.MustQuery(`select * from information_schema.attributes;`).Sort().Rows()
c.Assert(len(rows1), Equals, 1)
c.Assert(rows[0][3], Not(Equals), rows1[0][3])

// drop the new partition p4
tk.MustExec(`alter table alter_p drop partition p4;`)
rows2 := tk.MustQuery(`select * from information_schema.attributes;`).Sort().Rows()
c.Assert(len(rows2), Equals, 1)
c.Assert(rows[0][3], Equals, rows2[0][3])

// add a new partition p5
tk.MustExec(`alter table alter_p add partition (PARTITION p5 VALUES LESS THAN (80));`)
rows3 := tk.MustQuery(`select * from information_schema.attributes;`).Sort().Rows()
c.Assert(len(rows3), Equals, 1)
c.Assert(rows[0][3], Not(Equals), rows3[0][3])

// truncate the new partition p5
tk.MustExec(`alter table alter_p truncate partition p5;`)
rows4 := tk.MustQuery(`select * from information_schema.attributes;`).Sort().Rows()
c.Assert(len(rows4), Equals, 1)
c.Assert(rows3[0][3], Not(Equals), rows4[0][3])
c.Assert(rows[0][3], Not(Equals), rows4[0][3])
}

func (s *testAttributesDDLSerialSuite) TestTruncateTable(c *C) {
Expand Down Expand Up @@ -523,7 +559,7 @@ PARTITION BY RANGE (c) (
c.Assert(len(rows1), Equals, 2)
c.Assert(rows1[0][0], Equals, "schema/test/part")
c.Assert(rows1[0][2], Equals, `"key=value"`)
c.Assert(rows1[0][3], Equals, rows[0][3])
c.Assert(rows1[0][3], Not(Equals), rows[0][3])
c.Assert(rows1[1][0], Equals, "schema/test/part/p1")
c.Assert(rows1[1][2], Equals, `"key2=value2"`)
c.Assert(rows1[1][3], Equals, rows[2][3])
Expand Down
4 changes: 2 additions & 2 deletions ddl/label/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (r *Rule) Reset(dbName, tableName, partName string, ids ...int64) *Rule {
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
for i := 0; i < len(ids); i++ {
data := map[string]string{
"start_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(ids[i]))),
"end_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(ids[i]+1))),
"start_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(ids[i]))),
"end_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(ids[i]+1))),
}
r.Data = append(r.Data, data)
}
Expand Down
16 changes: 8 additions & 8 deletions ddl/label/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ func TestReset(t *testing.T) {
require.Equal(t, rule.Index, 2)

r := rule.Data[0].(map[string]string)
require.Equal(t, "7480000000000000ff015f720000000000fa", r["start_key"])
require.Equal(t, "7480000000000000ff025f720000000000fa", r["end_key"])
require.Equal(t, "7480000000000000ff0100000000000000f8", r["start_key"])
require.Equal(t, "7480000000000000ff0200000000000000f8", r["end_key"])
r = rule.Data[1].(map[string]string)
require.Equal(t, "7480000000000000ff025f720000000000fa", r["start_key"])
require.Equal(t, "7480000000000000ff035f720000000000fa", r["end_key"])
require.Equal(t, "7480000000000000ff0200000000000000f8", r["start_key"])
require.Equal(t, "7480000000000000ff0300000000000000f8", r["end_key"])
r = rule.Data[2].(map[string]string)
require.Equal(t, "7480000000000000ff035f720000000000fa", r["start_key"])
require.Equal(t, "7480000000000000ff045f720000000000fa", r["end_key"])
require.Equal(t, "7480000000000000ff0300000000000000f8", r["start_key"])
require.Equal(t, "7480000000000000ff0400000000000000f8", r["end_key"])

r1 := rule.Clone()
require.Equal(t, r1, rule)
Expand All @@ -97,8 +97,8 @@ func TestReset(t *testing.T) {
require.Equal(t, rule.Index, 3)

r = r2.Data[0].(map[string]string)
require.Equal(t, "7480000000000000ff025f720000000000fa", r["start_key"])
require.Equal(t, "7480000000000000ff035f720000000000fa", r["end_key"])
require.Equal(t, "7480000000000000ff0200000000000000f8", r["start_key"])
require.Equal(t, "7480000000000000ff0300000000000000f8", r["end_key"])

// default case
spec = &ast.AttributesSpec{Default: true}
Expand Down
42 changes: 42 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}

ids := getIDs([]*model.TableInfo{tblInfo})
for _, p := range tblInfo.Partition.AddingDefinitions {
ids = append(ids, p.ID)
}
if err := alterTableLabelRule(job.SchemaName, tblInfo, ids); err != nil {
job.State = model.JobStateCancelled
return ver, err
}

// none -> replica only
job.SchemaState = model.StateReplicaOnly
case model.StateReplicaOnly:
Expand Down Expand Up @@ -184,6 +193,27 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
return ver, errors.Trace(err)
}

func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64) error {
tableRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, schemaName, meta.Name.L)
oldRule, err := infosync.GetLabelRules(context.TODO(), []string{tableRuleID})
if err != nil {
return errors.Trace(err)
}
if len(oldRule) == 0 {
return nil
}

r, ok := oldRule[tableRuleID]
if ok {
rule := r.Reset(schemaName, meta.Name.L, "", ids...)
err = infosync.PutLabelRule(context.TODO(), rule)
if err != nil {
return errors.Wrapf(err, "failed to notify PD label rule")
}
}
return nil
}

func alterTablePartitionBundles(t *meta.Meta, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) {
var bundles []*placement.Bundle

Expand Down Expand Up @@ -1059,6 +1089,12 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}

if err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil {
job.State = model.JobStateCancelled
return ver, err
}

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -1090,6 +1126,12 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}

if err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil {
job.State = model.JobStateCancelled
return ver, err
}

job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != job.SchemaState)
case model.StateDeleteOnly:
Expand Down
4 changes: 2 additions & 2 deletions store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1973,8 +1973,8 @@ func (w *GCWorker) doGCLabelRules(dr util.DelRangeTask) (err error) {
func getGCRules(ids []int64, rules map[string]*label.Rule) []string {
oldRange := make(map[string]struct{})
for _, id := range ids {
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(id)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(id+1)))
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(id)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(id+1)))
oldRange[startKey+endKey] = struct{}{}
}

Expand Down

0 comments on commit 0ce2900

Please sign in to comment.