Skip to content

Commit

Permalink
config, changefeed (ticdc): add sql mode config for changefeed (#9941) (
Browse files Browse the repository at this point in the history
#9994)

close #9876
  • Loading branch information
ti-chi-bot authored Nov 24, 2023
1 parent 9ad90e6 commit 3a5901e
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ clean_integration_test_containers: ## Clean MySQL and Kafka integration test con
docker-compose -f $(TICDC_DOCKER_DEPLOYMENTS_DIR)/docker-compose-mysql-integration.yml down -v
docker-compose -f $(TICDC_DOCKER_DEPLOYMENTS_DIR)/docker-compose-kafka-integration.yml down -v

fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci generate_mock go-generate
fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci #generate_mock go-generate
@echo "run gci (format imports)"
tools/bin/gci write $(FILES) 2>&1 | $(FAIL_ON_STDOUT)
@echo "run gofumports"
Expand Down
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ type ReplicaConfig struct {
Consistent *ConsistentConfig `json:"consistent"`

ChangefeedErrorStuckDuration *JSONDuration `json:"changefeed_error_stuck_duration,omitempty" swaggertype:"string"`
SQLMode string `json:"sql_mode,omitempty"`
}

// ToInternalReplicaConfig coverts *v2.ReplicaConfig into *config.ReplicaConfig
Expand All @@ -208,6 +209,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
res.ForceReplicate = c.ForceReplicate
res.CheckGCSafePoint = c.CheckGCSafePoint
res.EnableSyncPoint = c.EnableSyncPoint
res.SQLMode = c.SQLMode
if c.SyncPointInterval != nil {
res.SyncPointInterval = c.SyncPointInterval.duration
}
Expand Down Expand Up @@ -358,6 +360,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
SyncPointRetention: &JSONDuration{cloned.SyncPointRetention},
BDRMode: cloned.BDRMode,
ChangefeedErrorStuckDuration: &JSONDuration{cloned.ChangefeedErrorStuckDuration},
SQLMode: cloned.SQLMode,
}

if cloned.Filter != nil {
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var defaultAPIConfig = &ReplicaConfig{
},
ChangefeedErrorStuckDuration: &JSONDuration{config.
GetDefaultReplicaConfig().ChangefeedErrorStuckDuration},
SQLMode: config.GetDefaultReplicaConfig().SQLMode,
}

func TestDefaultReplicaConfig(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ func (info *ChangeFeedInfo) VerifyAndComplete() {
if info.Config.ChangefeedErrorStuckDuration == 0 {
info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration
}
if info.Config.SQLMode == "" {
info.Config.SQLMode = defaultConfig.SQLMode
}
}

// FixIncompatible fixes incompatible changefeed meta info.
Expand Down
14 changes: 13 additions & 1 deletion cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/format"
timysql "github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
sinkv1 "github.com/pingcap/tiflow/cdc/sink"
Expand Down Expand Up @@ -454,7 +455,18 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) {

// addSpecialComment translate tidb feature to comment
func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) {
stms, _, err := parser.New().Parse(ddl.Query, ddl.Charset, ddl.Collate)
p := parser.New()
// We need to use the correct SQL mode to parse the DDL query.
// Otherwise, the parser may fail to parse the DDL query.
// For example, it is needed to parse the following DDL query:
// `alter table "t" add column "c" int default 1;`
// by adding `ANSI_QUOTES` to the SQL mode.
mode, err := timysql.GetSQLMode(s.info.Config.SQLMode)
if err != nil {
return "", errors.Trace(err)
}
p.SetSQLMode(mode)
stms, _, err := p.Parse(ddl.Query, ddl.Charset, ddl.Collate)
if err != nil {
return "", errors.Trace(err)
}
Expand Down
9 changes: 8 additions & 1 deletion cdc/owner/ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -65,7 +66,10 @@ func (m *mockSink) GetDDL() *model.DDLEvent {

func newDDLSink4Test(reportErr func(err error), reportWarn func(err error)) (DDLSink, *mockSink) {
mockSink := &mockSink{}
ddlSink := newDDLSink(model.DefaultChangeFeedID("changefeed-test"), &model.ChangeFeedInfo{}, reportErr, reportWarn)
ddlSink := newDDLSink(model.DefaultChangeFeedID("changefeed-test"),
&model.ChangeFeedInfo{
Config: config.GetDefaultReplicaConfig(),
}, reportErr, reportWarn)
ddlSink.(*ddlSinkImpl).sinkInitHandler = func(ctx context.Context, s *ddlSinkImpl) error {
s.sinkV1 = mockSink
return nil
Expand Down Expand Up @@ -487,6 +491,9 @@ func TestAddSpecialComment(t *testing.T) {
}

s := &ddlSinkImpl{}
s.info = &model.ChangeFeedInfo{
Config: config.GetDefaultReplicaConfig(),
}
for _, ca := range testCase {
re, err := s.addSpecialComment(ca.event)
require.Nil(t, err)
Expand Down
9 changes: 6 additions & 3 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ const (
"storage": "",
"use-file-backend": false
},
"changefeed-error-stuck-duration": 1800000000000
"changefeed-error-stuck-duration": 1800000000000,
"sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
}`

testCfgTestServerConfigMarshal = `{
Expand Down Expand Up @@ -222,7 +223,8 @@ const (
"storage": "",
"use-file-backend": false
},
"changefeed-error-stuck-duration": 1800000000000
"changefeed-error-stuck-duration": 1800000000000,
"sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
}`

testCfgTestReplicaConfigMarshal2 = `{
Expand Down Expand Up @@ -286,6 +288,7 @@ const (
"storage": "",
"use-file-backend": false
},
"changefeed-error-stuck-duration": 1800000000000
"changefeed-error-stuck-duration": 1800000000000,
"sql-mode":"ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
}`
)
9 changes: 9 additions & 0 deletions pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/pkg/config/outdated"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
Expand All @@ -35,6 +36,12 @@ const (
// minSyncPointRetention is the minimum of SyncPointRetention can be set.
minSyncPointRetention = time.Hour * 1
minChangeFeedErrorStuckDuration = time.Minute * 30
// The default SQL Mode of TiDB: "ONLY_FULL_GROUP_BY,
// STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,
// NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"
// Note: The SQL Mode of TiDB is not the same as ORACLE.
// If you want to use the same SQL Mode as ORACLE, you need to add "ORACLE" to the SQL Mode.
defaultSQLMode = mysql.DefaultSQLMode
)

var defaultReplicaConfig = &ReplicaConfig{
Expand Down Expand Up @@ -76,6 +83,7 @@ var defaultReplicaConfig = &ReplicaConfig{
UseFileBackend: false,
},
ChangefeedErrorStuckDuration: time.Minute * 30,
SQLMode: defaultSQLMode,
}

// GetDefaultReplicaConfig returns the default replica config.
Expand Down Expand Up @@ -116,6 +124,7 @@ type replicaConfig struct {
Sink *SinkConfig `toml:"sink" json:"sink"`
Consistent *ConsistentConfig `toml:"consistent" json:"consistent"`
ChangefeedErrorStuckDuration time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"`
SQLMode string `toml:"sql-mode" json:"sql-mode,omitempty"`
}

// Marshal returns the json marshal format of a ReplicationConfig
Expand Down
22 changes: 17 additions & 5 deletions pkg/filter/expr_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package filter

import (
"fmt"
"strings"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -77,10 +79,16 @@ func newExprFilterRule(

// verifyAndInitRule will verify and init the rule.
// It should only be called in dmlExprFilter's verify method.
func (r *dmlExprFilterRule) verify(tableInfos []*model.TableInfo) error {
func (r *dmlExprFilterRule) verify(tableInfos []*model.TableInfo, sqlMode string) error {
// verify expression filter rule syntax.
p := parser.New()
_, _, err := p.ParseSQL(completeExpression(r.config.IgnoreInsertValueExpr))
mode, err := mysql.GetSQLMode(sqlMode)
if err != nil {
log.Error("failed to get sql mode", zap.Error(err))
return cerror.ErrInvalidReplicaConfig.FastGenByArgs(fmt.Sprintf("invalid sqlMode %s", sqlMode))
}
p.SetSQLMode(mode)
_, _, err = p.ParseSQL(completeExpression(r.config.IgnoreInsertValueExpr))
if err != nil {
log.Error("failed to parse expression", zap.Error(err))
return cerror.ErrExpressionParseFailed.
Expand Down Expand Up @@ -347,14 +355,18 @@ func getColumnFromError(err error) string {

// dmlExprFilter is a filter that filters DML events by SQL expression.
type dmlExprFilter struct {
rules []*dmlExprFilterRule
rules []*dmlExprFilterRule
sqlMode string
}

func newExprFilter(
timezone string,
cfg *config.FilterConfig,
sqlMode string,
) (*dmlExprFilter, error) {
res := &dmlExprFilter{}
res := &dmlExprFilter{
sqlMode: sqlMode,
}
sessCtx := utils.NewSessionCtx(map[string]string{
"time_zone": timezone,
})
Expand Down Expand Up @@ -382,7 +394,7 @@ func (f *dmlExprFilter) addRule(
// verify checks if all rules in this filter is valid.
func (f *dmlExprFilter) verify(tableInfos []*model.TableInfo) error {
for _, rule := range f.rules {
err := rule.verify(tableInfos)
err := rule.verify(tableInfos, f.sqlMode)
if err != nil {
log.Error("failed to verify expression filter rule", zap.Error(err))
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/expr_filter_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func BenchmarkSkipDML(b *testing.B) {
sessCtx := utils.NewSessionCtx(map[string]string{
"time_zone": "",
})
f, err := newExprFilter("", cfg)
f, err := newExprFilter("", cfg, config.GetDefaultReplicaConfig().SQLMode)
require.Nil(b, err)

type innerCase struct {
Expand Down
8 changes: 4 additions & 4 deletions pkg/filter/expr_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func TestShouldSkipDMLBasic(t *testing.T) {

for _, tc := range testCases {
tableInfo := helper.execDDL(tc.ddl)
f, err := newExprFilter("", tc.cfg)
f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.Nil(t, err)
for _, c := range tc.cases {
rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns)
Expand Down Expand Up @@ -441,7 +441,7 @@ func TestShouldSkipDMLError(t *testing.T) {

for _, tc := range testCases {
tableInfo := helper.execDDL(tc.ddl)
f, err := newExprFilter("", tc.cfg)
f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.Nil(t, err)
for _, c := range tc.cases {
rowDatums, err := utils.AdjustBinaryProtocolForDatum(sessCtx, c.row, tableInfo.Columns)
Expand Down Expand Up @@ -634,7 +634,7 @@ func TestShouldSkipDMLTableUpdated(t *testing.T) {

for _, tc := range testCases {
tableInfo := helper.execDDL(tc.ddl)
f, err := newExprFilter("", tc.cfg)
f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.Nil(t, err)
for _, c := range tc.cases {
if c.updateDDl != "" {
Expand Down Expand Up @@ -754,7 +754,7 @@ func TestVerify(t *testing.T) {
ti := helper.execDDL(ddl)
tableInfos = append(tableInfos, ti)
}
f, err := newExprFilter("", tc.cfg)
f, err := newExprFilter("", tc.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.Nil(t, err)
err = f.verify(tableInfos)
require.True(t, errors.ErrorEqual(tc.err, err), "case: %+v", tc, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func NewFilter(cfg *config.ReplicaConfig, tz string) (Filter, error) {
f = tfilter.CaseInsensitive(f)
}

dmlExprFilter, err := newExprFilter(tz, cfg.Filter)
dmlExprFilter, err := newExprFilter(tz, cfg.Filter, cfg.SQLMode)
if err != nil {
return nil, err
}
sqlEventFilter, err := newSQLEventFilter(cfg.Filter)
sqlEventFilter, err := newSQLEventFilter(cfg.Filter, cfg.SQLMode)
if err != nil {
return nil, err
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/filter/sql_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package filter

import (
"fmt"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb/parser"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
tfilter "github.com/pingcap/tidb/util/table-filter"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -107,9 +109,17 @@ type sqlEventFilter struct {
rules []*sqlEventRule
}

func newSQLEventFilter(cfg *config.FilterConfig) (*sqlEventFilter, error) {
func newSQLEventFilter(cfg *config.FilterConfig, sqlMode string) (*sqlEventFilter, error) {
p := parser.New()
mode, err := mysql.GetSQLMode(sqlMode)
if err != nil {
log.Error("failed to get sql mode", zap.Error(err))
return nil, cerror.ErrInvalidReplicaConfig.FastGenByArgs(fmt.Sprintf("invalid sqlMode %s", sqlMode))
}
p.SetSQLMode(mode)

res := &sqlEventFilter{
ddlParser: parser.New(),
ddlParser: p,
}
for _, rule := range cfg.EventFilters {
if err := res.addRule(rule); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/filter/sql_event_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestShouldSkipDDL(t *testing.T) {
}

for _, tc := range testCases {
f, err := newSQLEventFilter(tc.cfg)
f, err := newSQLEventFilter(tc.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.True(t, errors.ErrorEqual(err, tc.err), "case: %+s", err)
for _, c := range tc.cases {
ddl := &model.DDLEvent{
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestShouldSkipDML(t *testing.T) {
tCase := tc
t.Run(tCase.name, func(t *testing.T) {
t.Parallel()
f, err := newSQLEventFilter(tCase.cfg)
f, err := newSQLEventFilter(tCase.cfg, config.GetDefaultReplicaConfig().SQLMode)
require.NoError(t, err)
for _, c := range tCase.cases {
event := &model.RowChangedEvent{
Expand Down
Loading

0 comments on commit 3a5901e

Please sign in to comment.