Skip to content

Commit

Permalink
executor: ALTER TABLE COMPACT support partition (#36173)
Browse files Browse the repository at this point in the history
  • Loading branch information
hehechen authored Sep 28, 2022
1 parent 3f85f8e commit 415d03f
Show file tree
Hide file tree
Showing 9 changed files with 6,032 additions and 5,782 deletions.
23 changes: 23 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5196,9 +5196,32 @@ func (b *executorBuilder) buildCompactTable(v *plannercore.CompactTable) Executo
return nil
}

var partitionIDs []int64
if v.PartitionNames != nil {
if v.TableInfo.Partition == nil {
b.err = errors.Errorf("table:%s is not a partition table, but user specify partition name list:%+v", v.TableInfo.Name.O, v.PartitionNames)
return nil
}
// use map to avoid FindPartitionDefinitionByName
partitionMap := map[string]int64{}
for _, partition := range v.TableInfo.Partition.Definitions {
partitionMap[partition.Name.L] = partition.ID
}

for _, partitionName := range v.PartitionNames {
partitionID, ok := partitionMap[partitionName.L]
if !ok {
b.err = table.ErrUnknownPartition.GenWithStackByArgs(partitionName.O, v.TableInfo.Name.O)
return nil
}
partitionIDs = append(partitionIDs, partitionID)
}
}

return &CompactTableTiFlashExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
tableInfo: v.TableInfo,
partitionIDs: partitionIDs,
tikvStore: tikvStore,
}
}
24 changes: 17 additions & 7 deletions executor/compact_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ func getTiFlashStores(ctx sessionctx.Context) ([]infoschema.ServerInfo, error) {
type CompactTableTiFlashExec struct {
baseExecutor

tableInfo *model.TableInfo
done bool
tableInfo *model.TableInfo
partitionIDs []int64
done bool

tikvStore tikv.Storage
}
Expand Down Expand Up @@ -139,26 +140,32 @@ func (task *storeCompactTask) work() error {
log.Info("Begin compacting table in a store",
zap.String("table", task.parentExec.tableInfo.Name.O),
zap.Int64("table-id", task.parentExec.tableInfo.ID),
zap.Int64s("partition-id", task.parentExec.partitionIDs),
zap.String("store-address", task.targetStore.Address),
)

task.startAt = time.Now()
task.lastProgressOutputAt = task.startAt

if task.parentExec.tableInfo.Partition != nil {
// There are partitions, let's do it partition by partition.
// There is no need for partition-level concurrency, as TiFlash will limit table compaction one at a time.
allPartitions := task.parentExec.tableInfo.Partition.Definitions
allPartitions := task.parentExec.partitionIDs
if len(allPartitions) == 0 {
// There are partitions, but user did not specify partitions.
for _, definition := range task.parentExec.tableInfo.Partition.Definitions {
allPartitions = append(allPartitions, definition.ID)
}
}
task.allPhysicalTables = len(allPartitions)
task.compactedPhysicalTables = 0
for _, partition := range allPartitions {
stopAllTasks, err = task.compactOnePhysicalTable(partition.ID)
for _, partitionID := range allPartitions {
stopAllTasks, err = task.compactOnePhysicalTable(partitionID)
task.compactedPhysicalTables++
if err != nil {
// Stop remaining partitions when error happens.
break
}
} // For partition table, there must be no data in task.parentExec.tableInfo.ID. So no need to compact it.
}
} else {
task.allPhysicalTables = 1
task.compactedPhysicalTables = 0
Expand All @@ -171,6 +178,7 @@ func (task *storeCompactTask) work() error {
zap.Duration("elapsed", time.Since(task.startAt)),
zap.String("table", task.parentExec.tableInfo.Name.O),
zap.Int64("table-id", task.parentExec.tableInfo.ID),
zap.Int64s("partition-id", task.parentExec.partitionIDs),
zap.String("store-address", task.targetStore.Address),
)
}
Expand All @@ -187,6 +195,7 @@ func (task *storeCompactTask) logFailure(otherFields ...zap.Field) {
allFields := []zap.Field{
zap.String("table", task.parentExec.tableInfo.Name.O),
zap.Int64("table-id", task.parentExec.tableInfo.ID),
zap.Int64s("partition-id", task.parentExec.partitionIDs),
zap.String("store-address", task.targetStore.Address),
}
log.Warn("Compact table failed", append(allFields, otherFields...)...)
Expand All @@ -201,6 +210,7 @@ func (task *storeCompactTask) logProgressOptionally() {
zap.Duration("elapsed", time.Since(task.startAt)),
zap.String("table", task.parentExec.tableInfo.Name.O),
zap.Int64("table-id", task.parentExec.tableInfo.ID),
zap.Int64s("partition-id", task.parentExec.partitionIDs),
zap.String("store-address", task.targetStore.Address),
zap.Int("all-physical-tables", task.allPhysicalTables),
zap.Int("compacted-physical-tables", task.compactedPhysicalTables),
Expand Down
166 changes: 166 additions & 0 deletions executor/compact_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,42 @@ func TestCompactTableNoTiFlashReplica(t *testing.T) {
))
}

func TestCompactTableNoPartition(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec("create table t(a int)")
_, err := tk.Exec("alter table t compact partition p1,p2 tiflash replica;")
require.NotNil(t, err)
require.Equal(t, "table:t is not a partition table, but user specify partition name list:[p1 p2]", err.Error())
}

func TestCompactTablePartitionInvalid(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("use test")
tk.MustExec(`
CREATE TABLE t (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
fname VARCHAR(25) NOT NULL,
lname VARCHAR(25) NOT NULL,
store_id INT NOT NULL,
department_id INT NOT NULL
)
PARTITION BY RANGE(id) (
PARTITION p0 VALUES LESS THAN (5),
PARTITION p1 VALUES LESS THAN (10),
PARTITION p2 VALUES LESS THAN (15),
PARTITION p3 VALUES LESS THAN MAXVALUE
);
`)
_, err := tk.Exec("alter table t compact partition p1,p2,p4 tiflash replica;")
require.NotNil(t, err)
require.Equal(t, "[table:1735]Unknown partition 'p4' in table 't'", err.Error())
}

func TestCompactTableTooBusy(t *testing.T) {
mocker := newCompactRequestMocker(t)
mocker.MockFrom(`tiflash0/#1`, func(req *kvrpcpb.CompactRequest) (*kvrpcpb.CompactResponse, error) {
Expand Down Expand Up @@ -678,6 +714,136 @@ func TestCompactTableWithTiFlashDown(t *testing.T) {
))
}

// TestCompactTableWithSpecifiedRangePartition: 1 TiFlash, table has 4 partitions.
// only compact Partition 1: 3 Partials
// There will be 3 requests sent in series.
func TestCompactTableWithSpecifiedRangePartition(t *testing.T) {
mocker := newCompactRequestMocker(t)
defer mocker.RequireAllHandlersHit()
store, do := testkit.CreateMockStoreAndDomain(t, withMockTiFlash(1), mocker.AsOpt())
tk := testkit.NewTestKit(t, store)

mocker.MockFrom(`tiflash0/#1`, func(req *kvrpcpb.CompactRequest) (*kvrpcpb.CompactResponse, error) {
tableID := do.MustGetTableID(t, "test", "employees")
pid := do.MustGetPartitionAt(t, "test", "employees", 1)
require.Empty(t, req.StartKey)
require.EqualValues(t, req.PhysicalTableId, pid)
require.EqualValues(t, req.LogicalTableId, tableID)
return &kvrpcpb.CompactResponse{
HasRemaining: true,
CompactedStartKey: []byte{},
CompactedEndKey: []byte{0xCC},
}, nil
})
mocker.MockFrom(`tiflash0/#2`, func(req *kvrpcpb.CompactRequest) (*kvrpcpb.CompactResponse, error) {
tableID := do.MustGetTableID(t, "test", "employees")
pid := do.MustGetPartitionAt(t, "test", "employees", 1)
require.Equal(t, []byte{0xCC}, req.StartKey)
require.EqualValues(t, req.PhysicalTableId, pid)
require.EqualValues(t, req.LogicalTableId, tableID)
return &kvrpcpb.CompactResponse{
HasRemaining: true,
CompactedStartKey: []byte{},
CompactedEndKey: []byte{0xFF},
}, nil
})
mocker.MockFrom(`tiflash0/#3`, func(req *kvrpcpb.CompactRequest) (*kvrpcpb.CompactResponse, error) {
tableID := do.MustGetTableID(t, "test", "employees")
pid := do.MustGetPartitionAt(t, "test", "employees", 1)
require.Equal(t, []byte{0xFF}, req.StartKey)
require.EqualValues(t, req.PhysicalTableId, pid)
require.EqualValues(t, req.LogicalTableId, tableID)
return &kvrpcpb.CompactResponse{
HasRemaining: false,
CompactedStartKey: []byte{},
CompactedEndKey: []byte{0xFF, 0xAA},
}, nil
})

tk.MustExec("use test")
tk.MustExec(`
CREATE TABLE employees (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
fname VARCHAR(25) NOT NULL,
lname VARCHAR(25) NOT NULL,
store_id INT NOT NULL,
department_id INT NOT NULL
)
PARTITION BY RANGE(id) (
PARTITION p0 VALUES LESS THAN (5),
PARTITION p1 VALUES LESS THAN (10),
PARTITION p2 VALUES LESS THAN (15),
PARTITION p3 VALUES LESS THAN MAXVALUE
);
`)
tk.MustExec(`alter table employees set tiflash replica 1;`)
tk.MustExec(`alter table employees compact partition p1 tiflash replica;`)
tk.MustQuery(`show warnings;`).Check(testkit.Rows())
}

// TestCompactTableWithSpecifiedHashPartition: 1 TiFlash, table has 3 partitions (hash partition).
// only compact p1, p2
// During compacting the partition, one partition will return failure PhysicalTableNotExist. The remaining partitions should be still compacted.
func TestCompactTableWithSpecifiedHashPartitionAndOnePartitionFailed(t *testing.T) {
mocker := newCompactRequestMocker(t)
defer mocker.RequireAllHandlersHit()
store, do := testkit.CreateMockStoreAndDomain(t, withMockTiFlash(1), mocker.AsOpt())
tk := testkit.NewTestKit(t, store)

mocker.MockFrom(`tiflash0/#1`, func(req *kvrpcpb.CompactRequest) (*kvrpcpb.CompactResponse, error) {
tableID := do.MustGetTableID(t, "test", "employees")
pid := do.MustGetPartitionAt(t, "test", "employees", 1)
require.Empty(t, req.StartKey)
require.EqualValues(t, req.PhysicalTableId, pid)
require.EqualValues(t, req.LogicalTableId, tableID)
return &kvrpcpb.CompactResponse{
HasRemaining: true,
CompactedStartKey: []byte{},
CompactedEndKey: []byte{0xA0},
}, nil
})
mocker.MockFrom(`tiflash0/#2`, func(req *kvrpcpb.CompactRequest) (*kvrpcpb.CompactResponse, error) {
tableID := do.MustGetTableID(t, "test", "employees")
pid := do.MustGetPartitionAt(t, "test", "employees", 1)
require.Equal(t, []byte{0xA0}, req.StartKey)
require.EqualValues(t, req.PhysicalTableId, pid)
require.EqualValues(t, req.LogicalTableId, tableID)
return &kvrpcpb.CompactResponse{
Error: &kvrpcpb.CompactError{Error: &kvrpcpb.CompactError_ErrPhysicalTableNotExist{}}, // For example, may be this partition got dropped
}, nil
})
mocker.MockFrom(`tiflash0/#3`, func(req *kvrpcpb.CompactRequest) (*kvrpcpb.CompactResponse, error) {
tableID := do.MustGetTableID(t, "test", "employees")
pid := do.MustGetPartitionAt(t, "test", "employees", 2)
require.Empty(t, req.StartKey)
require.EqualValues(t, req.PhysicalTableId, pid)
require.EqualValues(t, req.LogicalTableId, tableID)
return &kvrpcpb.CompactResponse{
HasRemaining: false,
CompactedStartKey: []byte{},
CompactedEndKey: []byte{0xCD},
}, nil
})

tk.MustExec("use test")
tk.MustExec(`
CREATE TABLE employees (
id INT NOT NULL,
fname VARCHAR(30),
lname VARCHAR(30),
hired DATE NOT NULL DEFAULT '1970-01-01',
separated DATE DEFAULT '9999-12-31',
job_code INT,
store_id INT
)
PARTITION BY HASH(store_id)
PARTITIONS 3;
`)
tk.MustExec(`alter table employees set tiflash replica 1;`)
tk.MustExec(`alter table employees compact PARTITION p1,p2 tiflash replica;`)
tk.MustQuery(`show warnings;`).Check(testkit.Rows())
}

// TestCompactTableWithTiFlashDownAndRestore: 2 TiFlash stores.
// Store0 - #1 (remaining=true, takes 3s), #2 (remaining=false)
// Store1 - #1 (remaining=true), #2 (down), #3 (restored, remaining=false)
Expand Down
26 changes: 16 additions & 10 deletions parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,26 +385,32 @@ const (
type CompactTableStmt struct {
stmtNode

Table *TableName
ReplicaKind CompactReplicaKind
Table *TableName
PartitionNames []model.CIStr
ReplicaKind CompactReplicaKind
}

// Restore implements Node interface.
func (n *CompactTableStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("ALTER TABLE ")
if err := n.Table.Restore(ctx); err != nil {
return errors.Annotate(err, "An error occurred while add table")
}
n.Table.restoreName(ctx)

if n.ReplicaKind == CompactReplicaKindAll {
ctx.WriteKeyWord(" COMPACT")
} else {
ctx.WriteKeyWord(" COMPACT")
if len(n.PartitionNames) != 0 {
ctx.WriteKeyWord(" PARTITION ")
for i, partition := range n.PartitionNames {
if i != 0 {
ctx.WritePlain(",")
}
ctx.WriteName(partition.O)
}
}
if n.ReplicaKind != CompactReplicaKindAll {
ctx.WriteKeyWord(" ")
// Note: There is only TiFlash replica available now. TiKV will be added later.
ctx.WriteKeyWord(" COMPACT ")
ctx.WriteKeyWord(string(n.ReplicaKind))
ctx.WriteKeyWord(" REPLICA")
}

return nil
}

Expand Down
Loading

0 comments on commit 415d03f

Please sign in to comment.