Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl_puller(ticdc): discard DDLs that irrelevant with changefeed #6446

Merged
merged 37 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a8ee472
schema(ticdc): ingore ddl by filter
asddongmen Jul 24, 2022
778f477
Merge branch 'master' into filter_ddl
asddongmen Jul 24, 2022
ed385cf
fix failed unit test
asddongmen Jul 24, 2022
8e1bf76
Merge branch 'filter_ddl' of github.com:asddongmen/ticdc into filter_ddl
asddongmen Jul 24, 2022
3581a68
ddl_puller (ticdc): filter out unrelated ddl
asddongmen Jul 26, 2022
d487375
fix unit test
asddongmen Jul 26, 2022
1abee67
Merge branch 'master' into filter_ddl
asddongmen Jul 26, 2022
b289461
fix `create schema` cause panic
asddongmen Jul 26, 2022
0cd59a1
fix ddl reentrant error
asddongmen Jul 26, 2022
6700099
add unit test and fix failed test case
asddongmen Jul 27, 2022
fd10652
Merge branch 'master' into filter_ddl
asddongmen Jul 27, 2022
02eaaeb
fix data race
asddongmen Jul 28, 2022
279e2ac
fix unit test error
asddongmen Jul 28, 2022
2771296
Merge branch 'master' into filter_ddl
asddongmen Aug 11, 2022
2ccfa75
fix error
asddongmen Aug 11, 2022
7505a02
ddl_puller: handle rename table error
asddongmen Aug 15, 2022
c738928
Merge remote-tracking branch 'upstream/master' into filter_ddl
asddongmen Aug 15, 2022
efb5045
fix ut error
asddongmen Aug 15, 2022
e03465e
fix test error
asddongmen Aug 16, 2022
22e530b
address comments
asddongmen Aug 17, 2022
fc90dac
fix it erroe
asddongmen Aug 17, 2022
e765637
fix it error
asddongmen Aug 17, 2022
a27cfea
fix it error
asddongmen Aug 17, 2022
5bb1a4f
address comments
asddongmen Aug 19, 2022
75ea70d
fix it error
asddongmen Aug 19, 2022
4f39228
fix lint error
asddongmen Aug 19, 2022
9436145
Merge branch 'master' into filter_ddl
asddongmen Aug 19, 2022
06390b0
fix kafka it error
asddongmen Aug 22, 2022
806e469
address comments
asddongmen Aug 22, 2022
b42ca32
Merge branch 'master' into filter_ddl
asddongmen Aug 22, 2022
bf32e6e
Merge remote-tracking branch 'upstream/master' into filter_ddl
asddongmen Sep 1, 2022
98cf7ee
resolved conflict
asddongmen Sep 1, 2022
ec6cb9e
ddl_puller: address comments
asddongmen Sep 2, 2022
ba1cdfa
Merge branch 'master' into filter_ddl
asddongmen Sep 2, 2022
7d482e5
update err doc
asddongmen Sep 2, 2022
0d5272f
Merge branch 'filter_ddl' of github.com:asddongmen/ticdc into filter_ddl
asddongmen Sep 2, 2022
9de291c
Merge branch 'master' into filter_ddl
ti-chi-bot Sep 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func testMounterDisableOldValue(t *testing.T, tc struct {

jobs, err := getAllHistoryDDLJob(store)
require.Nil(t, err)
scheamStorage, err := NewSchemaStorage(nil, 0, nil, false, dummyChangeFeedID)
scheamStorage, err := NewSchemaStorage(nil, 0, false, dummyChangeFeedID)
require.Nil(t, err)
for _, job := range jobs {
err := scheamStorage.HandleDDLJob(job)
Expand Down Expand Up @@ -993,7 +993,7 @@ func TestDecodeEventIgnoreRow(t *testing.T) {
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.Nil(t, err)
schemaStorage, err := NewSchemaStorage(helper.GetCurrentMeta(),
ver.Ver, filter, false, cfID)
ver.Ver, false, cfID)
require.Nil(t, err)
// apply ddl to schemaStorage
for _, ddl := range ddls {
Expand Down
3 changes: 1 addition & 2 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ func (s *snapshot) renameTables(job *timodel.Job, currentTs uint64) error {
if !ok {
return cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(newSchemaIDs[i])
}
newSchemaName := newSchema.Name.L
newSchemaName := newSchema.Name.O
tbInfo := model.WrapTableInfo(newSchemaIDs[i], newSchemaName, job.BinlogInfo.FinishedTS, tableInfo)
err = s.createTable(tbInfo, currentTs)
if err != nil {
Expand All @@ -937,7 +937,6 @@ func (s *snapshot) iterTables(includeIneligible bool, f func(i *model.TableInfo)
}
return true
})
return
}

func (s *snapshot) iterPartitions(includeIneligible bool, f func(id int64, i *model.TableInfo)) {
Expand Down
12 changes: 1 addition & 11 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/retry"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -58,15 +57,14 @@ type schemaStorageImpl struct {
resolvedTs uint64
schemaVersion int64

filter filter.Filter
forceReplicate bool

id model.ChangeFeedID
}

// NewSchemaStorage creates a new schema storage
func NewSchemaStorage(
meta *timeta.Meta, startTs uint64, filter filter.Filter,
meta *timeta.Meta, startTs uint64,
forceReplicate bool, id model.ChangeFeedID,
) (SchemaStorage, error) {
var (
Expand All @@ -93,7 +91,6 @@ func NewSchemaStorage(
schema := &schemaStorageImpl{
snaps: []*schema.Snapshot{snap},
resolvedTs: startTs,
filter: filter,
forceReplicate: forceReplicate,
id: id,
schemaVersion: version,
Expand Down Expand Up @@ -269,12 +266,5 @@ func (s *schemaStorageImpl) skipJob(job *timodel.Job) bool {
zap.String("DDL", job.Query), zap.Stringer("job", job),
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID))
if s.filter != nil && s.filter.ShouldDiscardDDL(job.Type) {
log.Info("discard DDL",
zap.Int64("jobID", job.ID), zap.String("DDL", job.Query),
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID))
return true
}
return !job.IsSynced() && !job.IsDone()
}
4 changes: 2 additions & 2 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func TestMultiVersionStorage(t *testing.T) {
}

jobs = append(jobs, job)
storage, err := NewSchemaStorage(nil, 0, nil, false, dummyChangeFeedID)
storage, err := NewSchemaStorage(nil, 0, false, dummyChangeFeedID)
require.Nil(t, err)
for _, job := range jobs {
err := storage.HandleDDLJob(job)
Expand Down Expand Up @@ -856,7 +856,7 @@ func TestSchemaStorage(t *testing.T) {

jobs, err := getAllHistoryDDLJob(store)
require.Nil(t, err)
schemaStorage, err := NewSchemaStorage(nil, 0, nil, false, dummyChangeFeedID)
schemaStorage, err := NewSchemaStorage(nil, 0, false, dummyChangeFeedID)
require.Nil(t, err)
for _, job := range jobs {
err := schemaStorage.HandleDDLJob(job)
Expand Down
40 changes: 39 additions & 1 deletion cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package entry

import (
"encoding/json"
"strings"
"testing"

ticonfig "github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -64,7 +66,43 @@ func (s *SchemaTestHelper) DDL2Job(ddl string) *timodel.Job {
jobs, err := tiddl.GetLastNHistoryDDLJobs(s.GetCurrentMeta(), 1)
require.Nil(s.t, err)
require.Len(s.t, jobs, 1)
return jobs[0]
res := jobs[0]
if res.Type != timodel.ActionRenameTables {
return res
}

// the RawArgs field in job fetched from tidb snapshot meta is incorrent,
// so we manually construct `job.RawArgs` to do the workaround.
// we assume the old schema name is same as the new schema name here.
// for example, "ALTER TABLE RENAME test.t1 TO test.t1, test.t2 to test.t22", schema name is "test"
schema := strings.Split(strings.Split(strings.Split(res.Query, ",")[1], " ")[1], ".")[0]
tableNum := len(res.BinlogInfo.MultipleTableInfos)
oldSchemaIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaIDs[i] = res.SchemaID
}
oldTableIDs := make([]int64, tableNum)
for i := 0; i < tableNum; i++ {
oldTableIDs[i] = res.BinlogInfo.MultipleTableInfos[i].ID
}
newTableNames := make([]timodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
newTableNames[i] = res.BinlogInfo.MultipleTableInfos[i].Name
}
oldSchemaNames := make([]timodel.CIStr, tableNum)
for i := 0; i < tableNum; i++ {
oldSchemaNames[i] = timodel.NewCIStr(schema)
}
newSchemaIDs := oldSchemaIDs

args := []interface{}{
oldSchemaIDs, newSchemaIDs,
newTableNames, oldTableIDs, oldSchemaNames,
}
rawArgs, err := json.Marshal(args)
require.NoError(s.t, err)
res.RawArgs = rawArgs
return res
}

// DDL2Jobs executes the DDL statement and return the corresponding DDL jobs.
Expand Down
17 changes: 2 additions & 15 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,6 @@ func (s *schemaWrap4Owner) AllTableNames() []model.TableName {
}

func (s *schemaWrap4Owner) HandleDDL(job *timodel.Job) error {
// We use schemaVersion to check if an already-executed DDL job is processed for a second time.
// Unexecuted DDL jobs should have largest schemaVersions
if job.BinlogInfo.FinishedTS <= s.ddlHandledTs || job.BinlogInfo.SchemaVersion <= s.schemaVersion {
log.Warn("job finishTs is less than schema handleTs, discard invalid job",
zap.String("namespace", s.id.Namespace),
zap.String("changefeed", s.id.ID),
zap.Stringer("job", job),
zap.Any("ddlHandledTs", s.ddlHandledTs),
zap.Int64("schemaVersion", s.schemaVersion),
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion),
)
return nil
}
s.allPhysicalTablesCache = nil
err := s.schemaSnapshot.HandleDDL(job)
if err != nil {
Expand Down Expand Up @@ -187,8 +174,8 @@ func (s *schemaWrap4Owner) parseRenameTables(
return nil, cerror.ErrSnapshotSchemaNotFound.GenWithStackByArgs(
newSchemaIDs[i])
}
newSchemaName := newSchema.Name.L
oldSchemaName := oldSchemaNames[i].L
newSchemaName := newSchema.Name.O
oldSchemaName := oldSchemaNames[i].O
event := new(model.DDLEvent)
preTableInfo, ok := s.schemaSnapshot.PhysicalTableByID(tableInfo.ID)
if !ok {
Expand Down
1 change: 0 additions & 1 deletion cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,5 +537,4 @@ func TestBuildIgnoredDDLJob(t *testing.T) {
events, err = schema.BuildDDLEvents(job)
require.Nil(t, err)
require.Len(t, events, 0)
require.Nil(t, schema.HandleDDL(job))
}
3 changes: 2 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S
p.upstream.PDClock,
checkpointTs,
kvCfg,
p.changefeed.Info.Config,
p.changefeedID,
)
if err != nil {
Expand All @@ -785,7 +786,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S
if err != nil {
return nil, errors.Trace(err)
}
schemaStorage, err := entry.NewSchemaStorage(meta, checkpointTs, p.filter,
schemaStorage, err := entry.NewSchemaStorage(meta, checkpointTs,
p.changefeed.Info.Config.ForceReplicate, p.changefeedID)
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading