Skip to content

Commit

Permalink
statistics: delete old partition's stats meta after removing partitio…
Browse files Browse the repository at this point in the history
…ning (#49553)

close #49547
  • Loading branch information
Rustin170506 authored Dec 19, 2023
1 parent ad1efe4 commit 8bebe44
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 27 deletions.
15 changes: 8 additions & 7 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3000,14 +3000,12 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
tblInfo.Partition.AddingDefinitions = nil
tblInfo.Partition.DDLState = model.StateNone

var oldTblID int64
if job.Type != model.ActionReorganizePartition {
// ALTER TABLE ... PARTITION BY
// REMOVE PARTITIONING
// New Table ID, so needs to recreate the table by drop+create.
oldTblID := tblInfo.ID
// Overloading the NewTableID here with the oldTblID instead,
// for keeping the old global statistics
statisticsPartInfo.NewTableID = oldTblID
// Storing the old table ID, used for updating statistics.
oldTblID = tblInfo.ID
// TODO: Handle bundles?
// TODO: Add concurrent test!
// TODO: Will this result in big gaps?
Expand Down Expand Up @@ -3067,7 +3065,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
// Include the old table ID, if changed, which may contain global statistics,
// so it can be reused for the new (non)partitioned table.
event, err := newStatsDDLEventForJob(
job.Type, tblInfo, statisticsPartInfo, droppedPartInfo,
job.Type, oldTblID, tblInfo, statisticsPartInfo, droppedPartInfo,
)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -3087,6 +3085,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
// It is used for reorganize partition, add partitioning and remove partitioning.
func newStatsDDLEventForJob(
jobType model.ActionType,
oldTblID int64,
tblInfo *model.TableInfo,
addedPartInfo *model.PartitionInfo,
droppedPartInfo *model.PartitionInfo,
Expand All @@ -3101,13 +3100,15 @@ func newStatsDDLEventForJob(
)
case model.ActionAlterTablePartitioning:
event = statsutil.NewAddPartitioningEvent(
oldTblID,
tblInfo,
addedPartInfo,
)
case model.ActionRemovePartitioning:
event = statsutil.NewRemovePartitioningEvent(
oldTblID,
tblInfo,
addedPartInfo,
droppedPartInfo,
)
default:
return nil, errors.Errorf("unknown job type: %s", jobType.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ go_test(
timeout = "short",
srcs = ["ddl_test.go"],
flaky = True,
shard_count = 13,
shard_count = 15,
deps = [
"//pkg/parser/model",
"//pkg/planner/cardinality",
Expand Down
30 changes: 18 additions & 12 deletions pkg/statistics/handle/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,26 +129,32 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error {
return err
}
case model.ActionAlterTablePartitioning:
globalTableInfo, addedPartInfo := t.GetAddPartitioningInfo()
// Add partitioning
oldSingleTableID, globalTableInfo, addedPartInfo := t.GetAddPartitioningInfo()
// Add new partition stats.
for _, def := range addedPartInfo.Definitions {
// TODO: Should we trigger analyze instead of adding 0s?
if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil {
return err
}
}
// Change id for global stats, since the data has not changed!
// Note that globalTableInfo is the new table info
// and addedPartInfo.NewTableID is actually the old table ID!
// (see onReorganizePartition)
return h.statsWriter.ChangeGlobalStatsID(addedPartInfo.NewTableID, globalTableInfo.ID)
// Note: This operation will update all tables related to statistics with the new ID.
return h.statsWriter.ChangeGlobalStatsID(oldSingleTableID, globalTableInfo.ID)
case model.ActionRemovePartitioning:
// Change id for global stats, since the data has not changed!
// Note that newSingleTableInfo is the new table info
// and droppedPartInfo.NewTableID is actually the old table ID!
// (see onReorganizePartition)
newSingleTableInfo, droppedPartInfo := t.GetRemovePartitioningInfo()
return h.statsWriter.ChangeGlobalStatsID(droppedPartInfo.NewTableID, newSingleTableInfo.ID)
// Note: This operation will update all tables related to statistics with the new ID.
oldTblID,
newSingleTableInfo,
droppedPartInfo := t.GetRemovePartitioningInfo()
if err := h.statsWriter.ChangeGlobalStatsID(oldTblID, newSingleTableInfo.ID); err != nil {
return err
}

// Remove partition stats.
for _, def := range droppedPartInfo.Definitions {
if err := h.statsWriter.ResetTableStats2KVForDrop(def.ID); err != nil {
return err
}
}
case model.ActionFlashbackCluster:
return h.statsWriter.UpdateStatsVersion()
}
Expand Down
137 changes: 137 additions & 0 deletions pkg/statistics/handle/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,143 @@ func TestExchangeAPartition(t *testing.T) {
)
}

func TestRemovePartitioning(t *testing.T) {
store, do := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
h := do.StatsHandle()
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
// Create a table with 4 partitions.
testKit.MustExec(`
create table t (
a int,
b int,
primary key(a),
index idx(b)
)
partition by range (a) (
partition p0 values less than (6),
partition p1 values less than (11),
partition p2 values less than (16),
partition p3 values less than (21)
)
`)
testKit.MustExec("insert into t values (1,2),(2,2),(6,2),(11,2),(16,2)")
h.DumpStatsDeltaToKV(true)

testKit.MustExec("analyze table t")
is := do.InfoSchema()
tbl, err := is.TableByName(
model.NewCIStr("test"), model.NewCIStr("t"),
)
require.NoError(t, err)
tableInfo := tbl.Meta()
pi := tableInfo.GetPartitionInfo()
for _, def := range pi.Definitions {
statsTbl := h.GetPartitionStats(tableInfo, def.ID)
require.False(t, statsTbl.Pseudo)
}

// Get all partitions' stats update version.
partitionP0ID := pi.Definitions[0].ID
partitionP1ID := pi.Definitions[1].ID
partitionP2ID := pi.Definitions[2].ID
partitionP3ID := pi.Definitions[3].ID
// Get it from stats_meta first.
rows := testKit.MustQuery(
"select version from mysql.stats_meta where table_id in (?, ?, ?, ?) order by table_id",
partitionP0ID, partitionP1ID, partitionP2ID, partitionP3ID,
).Rows()
require.Len(t, rows, 4)
versionP0 := rows[0][0].(string)
versionP1 := rows[1][0].(string)
versionP2 := rows[2][0].(string)
versionP3 := rows[3][0].(string)

// Remove partitioning.
testKit.MustExec("alter table t remove partitioning")
// Find the remove partitioning event.
removePartitioningEvent := findEvent(h.DDLEventCh(), model.ActionRemovePartitioning)
err = h.HandleDDLEvent(removePartitioningEvent)
require.NoError(t, err)
// Check the global stats meta make sure the count and modify count are not changed.
// Get new table id after remove partitioning.
is = do.InfoSchema()
tbl, err = is.TableByName(
model.NewCIStr("test"), model.NewCIStr("t"),
)
require.NoError(t, err)
tableInfo = tbl.Meta()
testKit.MustQuery(
fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tableInfo.ID),
).Check(
testkit.Rows("5 0"),
)

// Check the update versions are changed.
rows = testKit.MustQuery(
"select version from mysql.stats_meta where table_id in (?, ?, ?, ?) order by table_id",
partitionP0ID, partitionP1ID, partitionP2ID, partitionP3ID,
).Rows()
require.Len(t, rows, 4)
require.NotEqual(t, versionP0, rows[0][0].(string))
require.NotEqual(t, versionP1, rows[1][0].(string))
require.NotEqual(t, versionP2, rows[2][0].(string))
require.NotEqual(t, versionP3, rows[3][0].(string))
}

func TestAddPartitioning(t *testing.T) {
store, do := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
h := do.StatsHandle()
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
// Create a table without partitioning.
testKit.MustExec(`
create table t (
a int,
b int,
primary key(a),
index idx(b)
)
`)
testKit.MustExec("insert into t values (1,2),(2,2),(6,2),(11,2),(16,2)")
h.DumpStatsDeltaToKV(true)
testKit.MustExec("analyze table t")
is := do.InfoSchema()
tbl, err := is.TableByName(
model.NewCIStr("test"), model.NewCIStr("t"),
)
require.NoError(t, err)
tableInfo := tbl.Meta()
// Check the global stats meta before add partitioning.
testKit.MustQuery(
fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tableInfo.ID),
).Check(
testkit.Rows("5 0"),
)

// Add partitioning.
testKit.MustExec("alter table t partition by hash(a) partitions 3")
// Find the add partitioning event.
addPartitioningEvent := findEvent(h.DDLEventCh(), model.ActionAlterTablePartitioning)
err = h.HandleDDLEvent(addPartitioningEvent)
require.NoError(t, err)
// Check the global stats meta make sure the count and modify count are not changed.
// Get new table id after remove partitioning.
is = do.InfoSchema()
tbl, err = is.TableByName(
model.NewCIStr("test"), model.NewCIStr("t"),
)
require.NoError(t, err)
tableInfo = tbl.Meta()
testKit.MustQuery(
fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tableInfo.ID),
).Check(
testkit.Rows("5 0"),
)
}

func findEvent(eventCh <-chan *util.DDLEvent, eventType model.ActionType) *util.DDLEvent {
// Find the target event.
for {
Expand Down
26 changes: 19 additions & 7 deletions pkg/statistics/handle/util/ddl_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ type DDLEvent struct {
oldTableInfo *model.TableInfo
oldPartInfo *model.PartitionInfo
columnInfos []*model.ColumnInfo
tp model.ActionType
// This value is used to store the table ID during a transition.
// It applies when a table structure is being changed from partitioned to non-partitioned, or vice versa.
oldTableID int64
tp model.ActionType
}

// NewCreateTableEvent creates a new ddl event that creates a table.
Expand Down Expand Up @@ -244,40 +247,49 @@ func (e *DDLEvent) GetTruncatePartitionInfo() (
// NewAddPartitioningEvent creates a new ddl event that converts a single table to a partitioned table.
// For example, `alter table t partition by range (c1) (partition p1 values less than (10))`.
func NewAddPartitioningEvent(
oldSingleTableID int64,
newGlobalTableInfo *model.TableInfo,
addedPartInfo *model.PartitionInfo,
) *DDLEvent {
return &DDLEvent{
tp: model.ActionAlterTablePartitioning,
tableInfo: newGlobalTableInfo,
partInfo: addedPartInfo,
tp: model.ActionAlterTablePartitioning,
oldTableID: oldSingleTableID,
tableInfo: newGlobalTableInfo,
partInfo: addedPartInfo,
}
}

// GetAddPartitioningInfo gets the table info of the table that is converted to a partitioned table.
func (e *DDLEvent) GetAddPartitioningInfo() (newGlobalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo) {
return e.tableInfo, e.partInfo
func (e *DDLEvent) GetAddPartitioningInfo() (
oldSingleTableID int64,
newGlobalTableInfo *model.TableInfo,
addedPartInfo *model.PartitionInfo,
) {
return e.oldTableID, e.tableInfo, e.partInfo
}

// NewRemovePartitioningEvent creates a new ddl event that converts a partitioned table to a single table.
// For example, `alter table t remove partitioning`.
func NewRemovePartitioningEvent(
oldPartitionedTableID int64,
newSingleTableInfo *model.TableInfo,
droppedPartInfo *model.PartitionInfo,
) *DDLEvent {
return &DDLEvent{
tp: model.ActionRemovePartitioning,
oldTableID: oldPartitionedTableID,
tableInfo: newSingleTableInfo,
oldPartInfo: droppedPartInfo,
}
}

// GetRemovePartitioningInfo gets the table info of the table that is converted to a single table.
func (e *DDLEvent) GetRemovePartitioningInfo() (
oldPartitionedTableID int64,
newSingleTableInfo *model.TableInfo,
droppedPartInfo *model.PartitionInfo,
) {
return e.tableInfo, e.oldPartInfo
return e.oldTableID, e.tableInfo, e.oldPartInfo
}

// NewFlashbackClusterEvent creates a new ddl event that flashes back the cluster.
Expand Down

0 comments on commit 8bebe44

Please sign in to comment.