Skip to content

Commit

Permalink
ddl_puller(ticdc): discard DDLs that irrelevant with changefeed (#6446)
Browse files Browse the repository at this point in the history
ref #3607, ref #6447
  • Loading branch information
asddongmen authored Sep 2, 2022
1 parent e5c8874 commit e2d1f6e
Show file tree
Hide file tree
Showing 22 changed files with 947 additions and 163 deletions.
4 changes: 2 additions & 2 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct {

jobs, err := getAllHistoryDDLJob(store)
require.Nil(t, err)
scheamStorage, err := NewSchemaStorage(nil, 0, nil, false, dummyChangeFeedID)
scheamStorage, err := NewSchemaStorage(nil, 0, false, dummyChangeFeedID)
require.Nil(t, err)
for _, job := range jobs {
err := scheamStorage.HandleDDLJob(job)
Expand Down Expand Up @@ -993,7 +993,7 @@ func TestDecodeEventIgnoreRow(t *testing.T) {
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
schemaStorage, err := NewSchemaStorage(helper.GetCurrentMeta(),
ver.Ver, filter, false, cfID)
ver.Ver, false, cfID)
require.Nil(t, err)
// apply ddl to schemaStorage
for _, ddl := range ddls {
Expand Down
3 changes: 1 addition & 2 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ func (s *snapshot) renameTables(job *timodel.Job, currentTs uint64) error {
if !ok {
return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(newSchemaIDs[i])
}
newSchemaName := newSchema.Name.L
newSchemaName := newSchema.Name.O
tbInfo := model.WrapTableInfo(newSchemaIDs[i], newSchemaName, job.BinlogInfo.FinishedTS, tableInfo)
err = s.createTable(tbInfo, currentTs)
if err != nil {
Expand All @@ -936,7 +936,6 @@ func (s *snapshot) iterTables(includeIneligible bool, f func(i *model.TableInfo)
}
return true
})
return
}

func (s *snapshot) iterPartitions(includeIneligible bool, f func(id int64, i *model.TableInfo)) {
Expand Down
12 changes: 1 addition & 11 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/retry"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -58,15 +57,14 @@ type schemaStorageImpl struct {
resolvedTs uint64
schemaVersion int64

filter filter.Filter
forceReplicate bool

id model.ChangeFeedID
}

// NewSchemaStorage creates a new schema storage
func NewSchemaStorage(
meta *timeta.Meta, startTs uint64, filter filter.Filter,
meta *timeta.Meta, startTs uint64,
forceReplicate bool, id model.ChangeFeedID,
) (SchemaStorage, error) {
var (
Expand All @@ -93,7 +91,6 @@ func NewSchemaStorage(
schema := &schemaStorageImpl{
snaps: []*schema.Snapshot{snap},
resolvedTs: startTs,
filter: filter,
forceReplicate: forceReplicate,
id: id,
schemaVersion: version,
Expand Down Expand Up @@ -273,12 +270,5 @@ func (s *schemaStorageImpl) skipJob(job *timodel.Job) bool {
zap.String("DDL", job.Query), zap.Stringer("job", job),
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID))
if s.filter != nil && s.filter.ShouldDiscardDDL(job.Type) {
log.Info("discard DDL",
zap.Int64("jobID", job.ID), zap.String("DDL", job.Query),
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID))
return true
}
return !job.IsSynced() && !job.IsDone()
}
4 changes: 2 additions & 2 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func TestMultiVersionStorage(t *testing.T) {
}

jobs = append(jobs, job)
storage, err := NewSchemaStorage(nil, 0, nil, false, dummyChangeFeedID)
storage, err := NewSchemaStorage(nil, 0, false, dummyChangeFeedID)
require.Nil(t, err)
for _, job := range jobs {
err := storage.HandleDDLJob(job)
Expand Down Expand Up @@ -856,7 +856,7 @@ func TestSchemaStorage(t *testing.T) {

jobs, err := getAllHistoryDDLJob(store)
require.Nil(t, err)
schemaStorage, err := NewSchemaStorage(nil, 0, nil, false, dummyChangeFeedID)
schemaStorage, err := NewSchemaStorage(nil, 0, false, dummyChangeFeedID)
require.Nil(t, err)
for _, job := range jobs {
err := schemaStorage.HandleDDLJob(job)
Expand Down
40 changes: 39 additions & 1 deletion cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package entry

import (
"encoding/json"
"strings"
"testing"

ticonfig "github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -64,7 +66,43 @@ func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1)
require.Nil(s.t, err)
require.Len(s.t, jobs, 1)
return jobs[0]
res := jobs[0]
if res.Type != timodel.ActionRenameTables {
return res
}

// the RawArgs field in job fetched from tidb snapshot meta is incorrent,
// so we manually construct `job.RawArgs` to do the workaround.
// we assume the old schema name is same as the new schema name here.
// for example, "ALTER TABLE RENAME test.t1 TO test.t1, test.t2 to test.t22", schema name is "test"
schema := strings.Split(strings.Split(strings.Split(res.Query, ",")[1], " ")[1], ".")[0]
tableNum := len(res.BinlogInfo.MultipleTableInfos)
oldSchemaIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaIDs[i] = res.SchemaID
}
oldTableIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldTableIDs[i] = res.BinlogInfo.MultipleTableInfos[i].ID
}
newTableNames := make([]timodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
newTableNames[i] = res.BinlogInfo.MultipleTableInfos[i].Name
}
oldSchemaNames := make([]timodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaNames[i] = timodel.NewCIStr(schema)
}
newSchemaIDs := oldSchemaIDs

args := []interface{}{
oldSchemaIDs, newSchemaIDs,
newTableNames, oldTableIDs, oldSchemaNames,
}
rawArgs, err := json.Marshal(args)
require.NoError(s.t, err)
res.RawArgs = rawArgs
return res
}

// DDL2Jobs executes the DDL statement and return the corresponding DDL jobs.
Expand Down
17 changes: 2 additions & 15 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,6 @@ func (s *schemaWrap4Owner) AllTableNames() []model.TableName {
}

func (s *schemaWrap4Owner) HandleDDL(job *timodel.Job) error {
// We use schemaVersion to check if an already-executed DDL job is processed for a second time.
// Unexecuted DDL jobs should have largest schemaVersions
if job.BinlogInfo.FinishedTS <= s.ddlHandledTs || job.BinlogInfo.SchemaVersion <= s.schemaVersion {
log.Warn("job finishTs is less than schema handleTs, discard invalid job",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.Stringer("job", job),
zap.Any("ddlHandledTs", s.ddlHandledTs),
zap.Int64("schemaVersion", s.schemaVersion),
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion),
)
return nil
}
s.allPhysicalTablesCache = nil
err := s.schemaSnapshot.HandleDDL(job)
if err != nil {
Expand Down Expand Up @@ -189,8 +176,8 @@ func (s *schemaWrap4Owner) parseRenameTables(
return nil, cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(
newSchemaIDs[i])
}
newSchemaName := newSchema.Name.L
oldSchemaName := oldSchemaNames[i].L
newSchemaName := newSchema.Name.O
oldSchemaName := oldSchemaNames[i].O
event := new(model.DDLEvent)
preTableInfo, ok := s.schemaSnapshot.PhysicalTableByID(tableInfo.ID)
if !ok {
Expand Down
1 change: 0 additions & 1 deletion cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,5 +537,4 @@ func TestBuildIgnoredDDLJob(t *testing.T) {
events, err = schema.BuildDDLEvents(job)
require.Nil(t, err)
require.Len(t, events, 0)
require.Nil(t, schema.HandleDDL(job))
}
3 changes: 2 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S
p.upstream.PDClock,
checkpointTs,
kvCfg,
p.changefeed.Info.Config,
p.changefeedID,
)
if err != nil {
Expand All @@ -786,7 +787,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S
if err != nil {
return nil, errors.Trace(err)
}
schemaStorage, err := entry.NewSchemaStorage(meta, checkpointTs, p.filter,
schemaStorage, err := entry.NewSchemaStorage(meta, checkpointTs,
p.changefeed.Info.Config.ForceReplicate, p.changefeedID)
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit e2d1f6e

Please sign in to comment.