Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: delete old partition's stats meta after removing partitioning #49553

Merged
merged 11 commits into from
Dec 19, 2023
15 changes: 8 additions & 7 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3000,14 +3000,12 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
tblInfo.Partition.AddingDefinitions = nil
tblInfo.Partition.DDLState = model.StateNone

var oldTblID int64
if job.Type != model.ActionReorganizePartition {
// ALTER TABLE ... PARTITION BY
// REMOVE PARTITIONING
// New Table ID, so needs to recreate the table by drop+create.
oldTblID := tblInfo.ID
// Overloading the NewTableID here with the oldTblID instead,
// for keeping the old global statistics
statisticsPartInfo.NewTableID = oldTblID
// Storing the old table ID, used for updating statistics.
oldTblID = tblInfo.ID
// TODO: Handle bundles?
// TODO: Add concurrent test!
// TODO: Will this result in big gaps?
Expand Down Expand Up @@ -3067,7 +3065,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
// Include the old table ID, if changed, which may contain global statistics,
// so it can be reused for the new (non)partitioned table.
event, err := newStatsDDLEventForJob(
job.Type, tblInfo, statisticsPartInfo, droppedPartInfo,
job.Type, oldTblID, tblInfo, statisticsPartInfo, droppedPartInfo,
)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -3087,6 +3085,7 @@ func (w *worker) onReorganizePartition(d *ddlCtx, t *meta.Meta, job *model.Job)
// It is used for reorganize partition, add partitioning and remove partitioning.
func newStatsDDLEventForJob(
jobType model.ActionType,
oldTblID int64,
tblInfo *model.TableInfo,
addedPartInfo *model.PartitionInfo,
droppedPartInfo *model.PartitionInfo,
Expand All @@ -3101,13 +3100,15 @@ func newStatsDDLEventForJob(
)
case model.ActionAlterTablePartitioning:
event = statsutil.NewAddPartitioningEvent(
oldTblID,
tblInfo,
addedPartInfo,
)
case model.ActionRemovePartitioning:
event = statsutil.NewRemovePartitioningEvent(
oldTblID,
tblInfo,
addedPartInfo,
droppedPartInfo,
)
default:
return nil, errors.Errorf("unknown job type: %s", jobType.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ go_test(
timeout = "short",
srcs = ["ddl_test.go"],
flaky = True,
shard_count = 9,
shard_count = 11,
deps = [
"//pkg/parser/model",
"//pkg/planner/cardinality",
Expand Down
30 changes: 18 additions & 12 deletions pkg/statistics/handle/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,26 +134,32 @@ func (h *ddlHandlerImpl) HandleDDLEvent(t *util.DDLEvent) error {
// Do not update global stats, since the data have not changed!
}
case model.ActionAlterTablePartitioning:
globalTableInfo, addedPartInfo := t.GetAddPartitioningInfo()
// Add partitioning
oldSingleTableID, globalTableInfo, addedPartInfo := t.GetAddPartitioningInfo()
// Add new partition stats.
for _, def := range addedPartInfo.Definitions {
// TODO: Should we trigger analyze instead of adding 0s?
if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil {
return err
}
}
// Change id for global stats, since the data has not changed!
// Note that globalTableInfo is the new table info
// and addedPartInfo.NewTableID is actually the old table ID!
// (see onReorganizePartition)
return h.statsWriter.ChangeGlobalStatsID(addedPartInfo.NewTableID, globalTableInfo.ID)
// Note: This operation will update all tables related to statistics with the new ID.
return h.statsWriter.ChangeGlobalStatsID(oldSingleTableID, globalTableInfo.ID)
case model.ActionRemovePartitioning:
// Change id for global stats, since the data has not changed!
// Note that newSingleTableInfo is the new table info
// and droppedPartInfo.NewTableID is actually the old table ID!
// (see onReorganizePartition)
newSingleTableInfo, droppedPartInfo := t.GetRemovePartitioningInfo()
return h.statsWriter.ChangeGlobalStatsID(droppedPartInfo.NewTableID, newSingleTableInfo.ID)
// Note: This operation will update all tables related to statistics with the new ID.
oldTblID,
newSingleTableInfo,
droppedPartInfo := t.GetRemovePartitioningInfo()
if err := h.statsWriter.ChangeGlobalStatsID(oldTblID, newSingleTableInfo.ID); err != nil {
return err
}

// Remove partition stats.
for _, def := range droppedPartInfo.Definitions {
if err := h.statsWriter.ResetTableStats2KVForDrop(def.ID); err != nil {
return err
}
}
case model.ActionFlashbackCluster:
return h.statsWriter.UpdateStatsVersion()
}
Expand Down
137 changes: 137 additions & 0 deletions pkg/statistics/handle/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,143 @@ func TestExchangeAPartition(t *testing.T) {
)
}

func TestRemovePartitioning(t *testing.T) {
store, do := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
h := do.StatsHandle()
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
// Create a table with 4 partitions.
testKit.MustExec(`
create table t (
a int,
b int,
primary key(a),
index idx(b)
)
partition by range (a) (
partition p0 values less than (6),
partition p1 values less than (11),
partition p2 values less than (16),
partition p3 values less than (21)
)
`)
testKit.MustExec("insert into t values (1,2),(2,2),(6,2),(11,2),(16,2)")
h.DumpStatsDeltaToKV(true)

testKit.MustExec("analyze table t")
is := do.InfoSchema()
tbl, err := is.TableByName(
model.NewCIStr("test"), model.NewCIStr("t"),
)
require.NoError(t, err)
tableInfo := tbl.Meta()
pi := tableInfo.GetPartitionInfo()
for _, def := range pi.Definitions {
statsTbl := h.GetPartitionStats(tableInfo, def.ID)
require.False(t, statsTbl.Pseudo)
}

// Get all partitions' stats update version.
partitionP0ID := pi.Definitions[0].ID
partitionP1ID := pi.Definitions[1].ID
partitionP2ID := pi.Definitions[2].ID
partitionP3ID := pi.Definitions[3].ID
// Get it from stats_meta first.
rows := testKit.MustQuery(
"select version from mysql.stats_meta where table_id in (?, ?, ?, ?) order by table_id",
partitionP0ID, partitionP1ID, partitionP2ID, partitionP3ID,
).Rows()
require.Len(t, rows, 4)
versionP0 := rows[0][0].(string)
versionP1 := rows[1][0].(string)
versionP2 := rows[2][0].(string)
versionP3 := rows[3][0].(string)

// Remove partitioning.
testKit.MustExec("alter table t remove partitioning")
// Find the remove partitioning event.
removePartitioningEvent := findEvent(h.DDLEventCh(), model.ActionRemovePartitioning)
err = h.HandleDDLEvent(removePartitioningEvent)
require.NoError(t, err)
// Check the global stats meta make sure the count and modify count are not changed.
// Get new table id after remove partitioning.
is = do.InfoSchema()
tbl, err = is.TableByName(
model.NewCIStr("test"), model.NewCIStr("t"),
)
require.NoError(t, err)
tableInfo = tbl.Meta()
testKit.MustQuery(
fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tableInfo.ID),
).Check(
testkit.Rows("5 0"),
)

// Check the update versions are changed.
rows = testKit.MustQuery(
"select version from mysql.stats_meta where table_id in (?, ?, ?, ?) order by table_id",
partitionP0ID, partitionP1ID, partitionP2ID, partitionP3ID,
).Rows()
require.Len(t, rows, 4)
require.NotEqual(t, versionP0, rows[0][0].(string))
require.NotEqual(t, versionP1, rows[1][0].(string))
require.NotEqual(t, versionP2, rows[2][0].(string))
require.NotEqual(t, versionP3, rows[3][0].(string))
}

func TestAddPartitioning(t *testing.T) {
store, do := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
h := do.StatsHandle()
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t")
// Create a table without partitioning.
testKit.MustExec(`
create table t (
a int,
b int,
primary key(a),
index idx(b)
)
`)
testKit.MustExec("insert into t values (1,2),(2,2),(6,2),(11,2),(16,2)")
h.DumpStatsDeltaToKV(true)
testKit.MustExec("analyze table t")
is := do.InfoSchema()
tbl, err := is.TableByName(
model.NewCIStr("test"), model.NewCIStr("t"),
)
require.NoError(t, err)
tableInfo := tbl.Meta()
// Check the global stats meta before add partitioning.
testKit.MustQuery(
fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tableInfo.ID),
).Check(
testkit.Rows("5 0"),
)

// Add partitioning.
testKit.MustExec("alter table t partition by hash(a) partitions 3")
// Find the add partitioning event.
addPartitioningEvent := findEvent(h.DDLEventCh(), model.ActionAlterTablePartitioning)
err = h.HandleDDLEvent(addPartitioningEvent)
require.NoError(t, err)
// Check the global stats meta make sure the count and modify count are not changed.
// Get new table id after remove partitioning.
is = do.InfoSchema()
tbl, err = is.TableByName(
model.NewCIStr("test"), model.NewCIStr("t"),
)
require.NoError(t, err)
tableInfo = tbl.Meta()
testKit.MustQuery(
fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tableInfo.ID),
).Check(
testkit.Rows("5 0"),
)
}

func findEvent(eventCh <-chan *util.DDLEvent, eventType model.ActionType) *util.DDLEvent {
// Find the target event.
for {
Expand Down
26 changes: 19 additions & 7 deletions pkg/statistics/handle/util/ddl_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ type DDLEvent struct {
oldTableInfo *model.TableInfo
oldPartInfo *model.PartitionInfo
columnInfos []*model.ColumnInfo
tp model.ActionType
// This value is used to store the table ID during a transition.
// It applies when a table structure is being changed from partitioned to non-partitioned, or vice versa.
oldTableID int64
tp model.ActionType
}

// NewCreateTableEvent creates a new ddl event that creates a table.
Expand Down Expand Up @@ -244,40 +247,49 @@ func (e *DDLEvent) GetTruncatePartitionInfo() (
// NewAddPartitioningEvent creates a new ddl event that converts a single table to a partitioned table.
// For example, `alter table t partition by range (c1) (partition p1 values less than (10))`.
func NewAddPartitioningEvent(
oldSingleTableID int64,
newGlobalTableInfo *model.TableInfo,
addedPartInfo *model.PartitionInfo,
) *DDLEvent {
return &DDLEvent{
tp: model.ActionAlterTablePartitioning,
tableInfo: newGlobalTableInfo,
partInfo: addedPartInfo,
tp: model.ActionAlterTablePartitioning,
oldTableID: oldSingleTableID,
tableInfo: newGlobalTableInfo,
partInfo: addedPartInfo,
}
}

// GetAddPartitioningInfo gets the table info of the table that is converted to a partitioned table.
func (e *DDLEvent) GetAddPartitioningInfo() (newGlobalTableInfo *model.TableInfo, addedPartInfo *model.PartitionInfo) {
return e.tableInfo, e.partInfo
func (e *DDLEvent) GetAddPartitioningInfo() (
oldSingleTableID int64,
newGlobalTableInfo *model.TableInfo,
addedPartInfo *model.PartitionInfo,
) {
return e.oldTableID, e.tableInfo, e.partInfo
}

// NewRemovePartitioningEvent creates a new ddl event that converts a partitioned table to a single table.
// For example, `alter table t remove partitioning`.
func NewRemovePartitioningEvent(
oldPartitionedTableID int64,
newSingleTableInfo *model.TableInfo,
droppedPartInfo *model.PartitionInfo,
) *DDLEvent {
return &DDLEvent{
tp: model.ActionRemovePartitioning,
oldTableID: oldPartitionedTableID,
tableInfo: newSingleTableInfo,
oldPartInfo: droppedPartInfo,
}
}

// GetRemovePartitioningInfo gets the table info of the table that is converted to a single table.
func (e *DDLEvent) GetRemovePartitioningInfo() (
oldPartitionedTableID int64,
newSingleTableInfo *model.TableInfo,
droppedPartInfo *model.PartitionInfo,
) {
return e.tableInfo, e.oldPartInfo
return e.oldTableID, e.tableInfo, e.oldPartInfo
}

// NewFlashbackClusterEvent creates a new ddl event that flashes back the cluster.
Expand Down