Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support index regions and updating the existed table rule when changing partition (#33925) #34025

Merged
merged 3 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion ddl/attributes_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,42 @@ PARTITION BY RANGE (c) (
c.Assert(err, IsNil)
_, err = tk.Exec(`alter table alter_p partition p1 attributes " merge_option=allow , key=value ";`)
c.Assert(err, IsNil)

// reset all
tk.MustExec(`alter table alter_p partition p0 attributes default;`)
tk.MustExec(`alter table alter_p partition p1 attributes default;`)
tk.MustExec(`alter table alter_p partition p2 attributes default;`)
tk.MustExec(`alter table alter_p partition p3 attributes default;`)

// add table level attribute
tk.MustExec(`alter table alter_p attributes="merge_option=deny";`)
rows := tk.MustQuery(`select * from information_schema.attributes;`).Sort().Rows()
c.Assert(len(rows), Equals, 1)

// add a new partition p4
tk.MustExec(`alter table alter_p add partition (PARTITION p4 VALUES LESS THAN (60));`)
rows1 := tk.MustQuery(`select * from information_schema.attributes;`).Sort().Rows()
c.Assert(len(rows1), Equals, 1)
c.Assert(rows[0][3], Not(Equals), rows1[0][3])

// drop the new partition p4
tk.MustExec(`alter table alter_p drop partition p4;`)
rows2 := tk.MustQuery(`select * from information_schema.attributes;`).Sort().Rows()
c.Assert(len(rows2), Equals, 1)
c.Assert(rows[0][3], Equals, rows2[0][3])

// add a new partition p5
tk.MustExec(`alter table alter_p add partition (PARTITION p5 VALUES LESS THAN (80));`)
rows3 := tk.MustQuery(`select * from information_schema.attributes;`).Sort().Rows()
c.Assert(len(rows3), Equals, 1)
c.Assert(rows[0][3], Not(Equals), rows3[0][3])

// truncate the new partition p5
tk.MustExec(`alter table alter_p truncate partition p5;`)
rows4 := tk.MustQuery(`select * from information_schema.attributes;`).Sort().Rows()
c.Assert(len(rows4), Equals, 1)
c.Assert(rows3[0][3], Not(Equals), rows4[0][3])
c.Assert(rows[0][3], Not(Equals), rows4[0][3])
}

func (s *testAttributesDDLSerialSuite) TestTruncateTable(c *C) {
Expand Down Expand Up @@ -523,7 +559,7 @@ PARTITION BY RANGE (c) (
c.Assert(len(rows1), Equals, 2)
c.Assert(rows1[0][0], Equals, "schema/test/part")
c.Assert(rows1[0][2], Equals, `"key=value"`)
c.Assert(rows1[0][3], Equals, rows[0][3])
c.Assert(rows1[0][3], Not(Equals), rows[0][3])
c.Assert(rows1[1][0], Equals, "schema/test/part/p1")
c.Assert(rows1[1][2], Equals, `"key2=value2"`)
c.Assert(rows1[1][3], Equals, rows[2][3])
Expand Down
4 changes: 2 additions & 2 deletions ddl/label/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func (r *Rule) Reset(dbName, tableName, partName string, ids ...int64) *Rule {
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
for i := 0; i < len(ids); i++ {
data := map[string]string{
"start_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(ids[i]))),
"end_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(ids[i]+1))),
"start_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(ids[i]))),
"end_key": hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(ids[i]+1))),
}
r.Data = append(r.Data, data)
}
Expand Down
16 changes: 8 additions & 8 deletions ddl/label/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ func TestReset(t *testing.T) {
require.Equal(t, rule.Index, 2)

r := rule.Data[0].(map[string]string)
require.Equal(t, "7480000000000000ff015f720000000000fa", r["start_key"])
require.Equal(t, "7480000000000000ff025f720000000000fa", r["end_key"])
require.Equal(t, "7480000000000000ff0100000000000000f8", r["start_key"])
require.Equal(t, "7480000000000000ff0200000000000000f8", r["end_key"])
r = rule.Data[1].(map[string]string)
require.Equal(t, "7480000000000000ff025f720000000000fa", r["start_key"])
require.Equal(t, "7480000000000000ff035f720000000000fa", r["end_key"])
require.Equal(t, "7480000000000000ff0200000000000000f8", r["start_key"])
require.Equal(t, "7480000000000000ff0300000000000000f8", r["end_key"])
r = rule.Data[2].(map[string]string)
require.Equal(t, "7480000000000000ff035f720000000000fa", r["start_key"])
require.Equal(t, "7480000000000000ff045f720000000000fa", r["end_key"])
require.Equal(t, "7480000000000000ff0300000000000000f8", r["start_key"])
require.Equal(t, "7480000000000000ff0400000000000000f8", r["end_key"])

r1 := rule.Clone()
require.Equal(t, r1, rule)
Expand All @@ -97,8 +97,8 @@ func TestReset(t *testing.T) {
require.Equal(t, rule.Index, 3)

r = r2.Data[0].(map[string]string)
require.Equal(t, "7480000000000000ff025f720000000000fa", r["start_key"])
require.Equal(t, "7480000000000000ff035f720000000000fa", r["end_key"])
require.Equal(t, "7480000000000000ff0200000000000000f8", r["start_key"])
require.Equal(t, "7480000000000000ff0300000000000000f8", r["end_key"])

// default case
spec = &ast.AttributesSpec{Default: true}
Expand Down
42 changes: 42 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}

ids := getIDs([]*model.TableInfo{tblInfo})
for _, p := range tblInfo.Partition.AddingDefinitions {
ids = append(ids, p.ID)
}
if err := alterTableLabelRule(job.SchemaName, tblInfo, ids); err != nil {
job.State = model.JobStateCancelled
return ver, err
}

// none -> replica only
job.SchemaState = model.StateReplicaOnly
case model.StateReplicaOnly:
Expand Down Expand Up @@ -184,6 +193,27 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
return ver, errors.Trace(err)
}

func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64) error {
tableRuleID := fmt.Sprintf(label.TableIDFormat, label.IDPrefix, schemaName, meta.Name.L)
oldRule, err := infosync.GetLabelRules(context.TODO(), []string{tableRuleID})
if err != nil {
return errors.Trace(err)
}
if len(oldRule) == 0 {
return nil
}

r, ok := oldRule[tableRuleID]
if ok {
rule := r.Reset(schemaName, meta.Name.L, "", ids...)
err = infosync.PutLabelRule(context.TODO(), rule)
if err != nil {
return errors.Wrapf(err, "failed to notify PD label rule")
}
}
return nil
}

func alterTablePartitionBundles(t *meta.Meta, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) {
var bundles []*placement.Bundle

Expand Down Expand Up @@ -1059,6 +1089,12 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}

if err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil {
job.State = model.JobStateCancelled
return ver, err
}

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -1090,6 +1126,12 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the label rules")
}

if err := alterTableLabelRule(job.SchemaName, tblInfo, getIDs([]*model.TableInfo{tblInfo})); err != nil {
job.State = model.JobStateCancelled
return ver, err
}

job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != job.SchemaState)
case model.StateDeleteOnly:
Expand Down
4 changes: 2 additions & 2 deletions store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1973,8 +1973,8 @@ func (w *GCWorker) doGCLabelRules(dr util.DelRangeTask) (err error) {
func getGCRules(ids []int64, rules map[string]*label.Rule) []string {
oldRange := make(map[string]struct{})
for _, id := range ids {
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(id)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(id+1)))
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(id)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTablePrefix(id+1)))
oldRange[startKey+endKey] = struct{}{}
}

Expand Down