Skip to content

Commit

Permalink
schema, sink(ticdc): fix exchange partition (#8955)
Browse files Browse the repository at this point in the history
close #8914
  • Loading branch information
CharlesCheung96 authored May 15, 2023
1 parent 1335f98 commit 03285b8
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 42 deletions.
41 changes: 26 additions & 15 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (s *Snapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, error) {
case timodel.ActionRenameTables:
// DDL on multiple tables, ignore pre table info
return nil, nil
case timodel.ActionExchangeTablePartition:
// get the table will be exchanged
table, _, err := s.inner.getSourceTable(job.BinlogInfo.TableInfo)
return table, err
default:
binlogInfo := job.BinlogInfo
if binlogInfo == nil {
Expand Down Expand Up @@ -885,26 +889,21 @@ func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) er
return nil
}

// exchangePartition find the partition's id in the old table info of targetTable,
// and find the sourceTable's id in the new table info of targetTable.
// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot.
// Finally, update both the targetTable's info and the sourceTable's info in snapshot.
func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error {
var sourceTable *model.TableInfo
func (s *snapshot) getSourceTable(targetTable *timodel.TableInfo) (*model.TableInfo, int64, error) {
oldTable, ok := s.physicalTableByID(targetTable.ID)
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID)
return nil, 0, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(targetTable.ID)
}

oldPartitions := oldTable.GetPartitionInfo()
if oldPartitions == nil {
return cerror.ErrSnapshotTableNotFound.
return nil, 0, cerror.ErrSnapshotTableNotFound.
GenWithStack("table %d is not a partitioned table", oldTable.ID)
}

newPartitions := targetTable.GetPartitionInfo()
if newPartitions == nil {
return cerror.ErrSnapshotTableNotFound.
return nil, 0, cerror.ErrSnapshotTableNotFound.
GenWithStack("table %d is not a partitioned table", targetTable.ID)
}

Expand All @@ -926,14 +925,13 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin
}
}
if len(diff) != 1 {
return cerror.ErrExchangePartition.
return nil, 0, cerror.ErrExchangePartition.
GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff))
}
sourceTable, ok = s.physicalTableByID(diff[0])
sourceTable, ok := s.physicalTableByID(diff[0])
if !ok {
return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0])
return nil, 0, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(diff[0])
}

// 3.find the exchanged partition info
diff = diff[:0]
for id := range oldIDs {
Expand All @@ -942,13 +940,26 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin
}
}
if len(diff) != 1 {
return cerror.ErrExchangePartition.
return nil, 0, cerror.ErrExchangePartition.
GenWithStackByArgs(fmt.Sprintf("The exchanged source table number must be 1, but found %v", diff))
}

exchangedPartitionID := diff[0]
return sourceTable, exchangedPartitionID, nil
}

// exchangePartition find the partition's id in the old table info of targetTable,
// and find the sourceTable's id in the new table info of targetTable.
// Then set sourceTable's id to the partition's id, which make the exchange happen in snapshot.
// Finally, update both the targetTable's info and the sourceTable's info in snapshot.
func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uint64) error {
var sourceTable *model.TableInfo
sourceTable, exchangedPartitionID, err := s.getSourceTable(targetTable.TableInfo)
if err != nil {
return errors.Trace(err)
}
// 4.update the targetTable
err := s.updatePartition(targetTable, currentTS)
err = s.updatePartition(targetTable, currentTS)
if err != nil {
return errors.Trace(err)
}
Expand Down
47 changes: 30 additions & 17 deletions cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
Expand Down Expand Up @@ -62,29 +63,41 @@ func NewDDLSink(ctx context.Context, sinkURI *url.URL) (*DDLSink, error) {

// WriteDDLEvent writes the ddl event to the cloud storage.
func (d *DDLSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
writeFile := func(def cloudstorage.TableDefinition) error {
encodedDef, err := def.MarshalWithQuery()
if err != nil {
return errors.Trace(err)
}

path, err := def.GenerateSchemaFilePath()
if err != nil {
return errors.Trace(err)
}
log.Debug("write ddl event to external storage",
zap.String("path", path), zap.Any("ddl", ddl))
return d.statistics.RecordDDLExecution(func() error {
err1 := d.storage.WriteFile(ctx, path, encodedDef)
if err1 != nil {
return err1
}

return nil
})
}

var def cloudstorage.TableDefinition
def.FromDDLEvent(ddl)
encodedDef, err := def.MarshalWithQuery()
if err != nil {
if err := writeFile(def); err != nil {
return errors.Trace(err)
}

path, err := def.GenerateSchemaFilePath()
if err != nil {
return errors.Trace(err)
if ddl.Type == timodel.ActionExchangeTablePartition {
// For exchange partition, we need to write the schema of the source table.
var sourceTableDef cloudstorage.TableDefinition
sourceTableDef.FromTableInfo(ddl.PreTableInfo, ddl.TableInfo.Version)
return writeFile(sourceTableDef)
}
log.Debug("write ddl event to external storage",
zap.String("path", path), zap.Any("ddl", ddl))
err = d.statistics.RecordDDLExecution(func() error {
err1 := d.storage.WriteFile(ctx, path, encodedDef)
if err1 != nil {
return err1
}

return nil
})

return errors.Trace(err)
return nil
}

// WriteCheckpointTs writes the checkpoint ts to the cloud storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ alter table t1 drop partition p1;
insert into t1 values (7),(8),(9);
update t1 set a=a+10 where a=9;

/* TODO(CharlesCheung): EXCHANGE PARTITION will be supported in the future */
-- create table t2 (a int primary key);
-- ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
-- insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
-- insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/
/* TODO: add more test for EXCHANGE PARTITION, ref: https://github.com/pingcap/tiflow/issues/8956 */
create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/

ALTER TABLE t1 REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (5), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN (21));
insert into t1 values (-1),(6),(13);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ alter table t1 drop partition p1;
insert into t1 values (7),(8),(9);
-- update t1 set a=a+10 where a=9;

/* TODO(CharlesCheung): EXCHANGE PARTITION will be supported in the future */
-- create table t2 (a int primary key);
-- ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
-- insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
-- insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/
/* TODO: add more test for EXCHANGE PARTITION, ref: https://github.com/pingcap/tiflow/issues/8956 */
create table t2 (a int primary key);
ALTER TABLE t1 EXCHANGE PARTITION p3 WITH TABLE t2;
insert into t2 values (100),(101),(102),(103),(104),(105); /*these values will be replicated to in downstream t2*/
insert into t1 values (25),(29); /*these values will be replicated to in downstream t1.p3*/

ALTER TABLE t1 REORGANIZE PARTITION p0,p2 INTO (PARTITION p0 VALUES LESS THAN (5), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN (21));
insert into t1 values (-1),(6),(13);
Expand Down

0 comments on commit 03285b8

Please sign in to comment.