From 055badfbd72fbdd0a5f5ade9d3f08885476be420 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 8 Nov 2023 02:09:42 -0600 Subject: [PATCH] kafka(ticdc): make column selector can work normally to filter out columns (#9920) close pingcap/tiflow#9967 --- cdc/api/v2/api_helpers.go | 11 +- cdc/model/schema_storage.go | 4 +- cdc/model/schema_storage_test.go | 6 +- .../dmlsink/mq/dispatcher/event_router.go | 15 +- .../mq/dispatcher/partition/columns.go | 8 +- .../mq/dispatcher/partition/index_value.go | 6 +- cdc/sink/dmlsink/mq/kafka_dml_sink.go | 8 +- cdc/sink/dmlsink/mq/mq_dml_sink.go | 12 + cdc/sink/dmlsink/mq/pulsar_dml_sink.go | 8 +- .../columnselector/column_selector.go | 186 +++++++++++ .../columnselector/column_selector_test.go | 296 ++++++++++++++++++ .../dmlsink/mq/transformer/transformer.go | 21 ++ errors.toml | 5 + pkg/config/replica_config.go | 7 + pkg/config/replica_config_test.go | 17 + pkg/config/sink.go | 2 +- pkg/errors/cdc_errors.go | 5 + pkg/errors/helper.go | 1 + pkg/sink/codec/avro/avro.go | 3 + pkg/sink/codec/avro/decoder.go | 4 +- .../conf/changefeed.toml | 7 + .../kafka_column_selector/data/data.sql | 49 +++ .../kafka_column_selector/run.sh | 52 +++ .../conf/changefeed.toml | 4 + .../kafka_column_selector_avro/data/data.sql | 15 + .../kafka_column_selector_avro/run.sh | 66 ++++ tests/integration_tests/run_group.sh | 2 +- tests/utils/checksum_checker/main.go | 260 +++++++++++++++ 28 files changed, 1058 insertions(+), 22 deletions(-) create mode 100644 cdc/sink/dmlsink/mq/transformer/columnselector/column_selector.go create mode 100644 cdc/sink/dmlsink/mq/transformer/columnselector/column_selector_test.go create mode 100644 cdc/sink/dmlsink/mq/transformer/transformer.go create mode 100644 tests/integration_tests/kafka_column_selector/conf/changefeed.toml create mode 100644 tests/integration_tests/kafka_column_selector/data/data.sql create mode 100644 tests/integration_tests/kafka_column_selector/run.sh create mode 100644 tests/integration_tests/kafka_column_selector_avro/conf/changefeed.toml create mode 100644 tests/integration_tests/kafka_column_selector_avro/data/data.sql create mode 100644 tests/integration_tests/kafka_column_selector_avro/run.sh create mode 100644 tests/utils/checksum_checker/main.go diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index f84037747f6..aad7294f409 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/owner" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector" "github.com/pingcap/tiflow/cdc/sink/validator" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -518,11 +519,19 @@ func (h APIV2HelpersImpl) getVerifiedTables( if err != nil { return nil, nil, err } - err = eventRouter.VerifyTables(tableInfos) if err != nil { return nil, nil, err } + selectors, err := columnselector.New(replicaConfig) + if err != nil { + return nil, nil, err + } + err = selectors.VerifyTables(tableInfos, eventRouter) + if err != nil { + return nil, nil, err + } + return ineligibleTables, eligibleTables, nil } diff --git a/cdc/model/schema_storage.go b/cdc/model/schema_storage.go index 26d904bba0c..385c7b71c9f 100644 --- a/cdc/model/schema_storage.go +++ b/cdc/model/schema_storage.go @@ -341,9 +341,9 @@ func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) { return names, offset, true } -// ColumnsByNames returns the column offsets of the corresponding columns by names +// OffsetsByNames returns the column offsets of the corresponding columns by names // If any column does not exist, return false -func (ti *TableInfo) ColumnsByNames(names []string) ([]int, bool) { +func (ti *TableInfo) OffsetsByNames(names []string) ([]int, bool) { // todo: optimize it columnOffsets := make(map[string]int, len(ti.Columns)) for _, col := range ti.Columns { diff --git a/cdc/model/schema_storage_test.go b/cdc/model/schema_storage_test.go index 0da4e756153..2cbc940bfb1 100644 --- a/cdc/model/schema_storage_test.go +++ b/cdc/model/schema_storage_test.go @@ -339,17 +339,17 @@ func TestColumnsByNames(t *testing.T) { } names := []string{"col1", "col2", "col3"} - offsets, ok := tableInfo.ColumnsByNames(names) + offsets, ok := tableInfo.OffsetsByNames(names) require.True(t, ok) require.Equal(t, []int{0, 1, 2}, offsets) names = []string{"col2"} - offsets, ok = tableInfo.ColumnsByNames(names) + offsets, ok = tableInfo.OffsetsByNames(names) require.True(t, ok) require.Equal(t, []int{1}, offsets) names = []string{"col1", "col-not-found"} - offsets, ok = tableInfo.ColumnsByNames(names) + offsets, ok = tableInfo.OffsetsByNames(names) require.False(t, ok) require.Nil(t, offsets) } diff --git a/cdc/sink/dmlsink/mq/dispatcher/event_router.go b/cdc/sink/dmlsink/mq/dispatcher/event_router.go index 53fc0c7fd57..4cbaaa3c1b8 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/event_router.go +++ b/cdc/sink/dmlsink/mq/dispatcher/event_router.go @@ -119,13 +119,14 @@ func (s *EventRouter) GetPartitionForRowChange( row *model.RowChangedEvent, partitionNum int32, ) (int32, string, error) { - _, partitionDispatcher := s.matchDispatcher( - row.Table.Schema, row.Table.Table, - ) + return s.GetPartitionDispatcher(row.Table.Schema, row.Table.Table). + DispatchRowChangedEvent(row, partitionNum) +} - return partitionDispatcher.DispatchRowChangedEvent( - row, partitionNum, - ) +// GetPartitionDispatcher returns the partition dispatcher for a specific table. +func (s *EventRouter) GetPartitionDispatcher(schema, table string) partition.Dispatcher { + _, partitionDispatcher := s.matchDispatcher(schema, table) + return partitionDispatcher } // VerifyTables return error if any one table route rule is invalid. @@ -147,7 +148,7 @@ func (s *EventRouter) VerifyTables(infos []*model.TableInfo) error { "index is not unique when verify the table, table: %v, index: %s", table.TableName, v.IndexName) } case *partition.ColumnsDispatcher: - _, ok := table.ColumnsByNames(v.Columns) + _, ok := table.OffsetsByNames(v.Columns) if !ok { return cerror.ErrDispatcherFailed.GenWithStack( "columns not found when verify the table, table: %v, columns: %v", table.TableName, v.Columns) diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/columns.go b/cdc/sink/dmlsink/mq/dispatcher/partition/columns.go index e4a14cfcf2f..e63c1b17e03 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/columns.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/columns.go @@ -55,7 +55,7 @@ func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, dispatchCols = row.PreColumns } - offsets, ok := row.TableInfo.ColumnsByNames(r.Columns) + offsets, ok := row.TableInfo.OffsetsByNames(r.Columns) if !ok { log.Error("columns not found when dispatch event", zap.Any("tableName", row.Table), @@ -65,7 +65,11 @@ func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, } for idx := 0; idx < len(r.Columns); idx++ { - r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value))) + col := dispatchCols[offsets[idx]] + if col == nil { + continue + } + r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(col.Value))) } sum32 := r.hasher.Sum32() diff --git a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go index 0a41369d589..d6e91f65470 100644 --- a/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go +++ b/cdc/sink/dmlsink/mq/dispatcher/partition/index_value.go @@ -73,7 +73,11 @@ func (r *IndexValueDispatcher) DispatchRowChangedEvent(row *model.RowChangedEven "index not found when dispatch event, table: %v, index: %s", row.Table, r.IndexName) } for idx := 0; idx < len(names); idx++ { - r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value))) + col := dispatchCols[offsets[idx]] + if col == nil { + continue + } + r.hasher.Write([]byte(names[idx]), []byte(model.ColumnValueString(col.Value))) } } diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index c4c6bfc0f39..86821ac25ce 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector" "github.com/pingcap/tiflow/cdc/sink/util" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -97,6 +98,11 @@ func NewKafkaDMLSink( return nil, errors.Trace(err) } + trans, err := columnselector.New(replicaConfig) + if err != nil { + return nil, errors.Trace(err) + } + encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, options.MaxMessageBytes) if err != nil { return nil, errors.Trace(err) @@ -118,7 +124,7 @@ func NewKafkaDMLSink( concurrency := tiflowutil.GetOrZero(replicaConfig.Sink.EncoderConcurrency) encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID) s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, - eventRouter, encoderGroup, protocol, scheme, errCh) + eventRouter, trans, encoderGroup, protocol, scheme, errCh) log.Info("DML sink producer created", zap.String("namespace", changefeedID.Namespace), zap.String("changefeedID", changefeedID.ID)) diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index 94f363000b5..14ed6226255 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer" "github.com/pingcap/tiflow/cdc/sink/metrics" "github.com/pingcap/tiflow/cdc/sink/tablesink/state" "github.com/pingcap/tiflow/pkg/config" @@ -48,6 +49,8 @@ type dmlSink struct { alive struct { sync.RWMutex + + transformer transformer.Transformer // eventRouter used to route events to the right topic and partition. eventRouter *dispatcher.EventRouter // topicManager used to manage topics. @@ -77,6 +80,7 @@ func newDMLSink( adminClient kafka.ClusterAdminClient, topicManager manager.TopicManager, eventRouter *dispatcher.EventRouter, + transformer transformer.Transformer, encoderGroup codec.EncoderGroup, protocol config.Protocol, scheme string, @@ -95,6 +99,7 @@ func newDMLSink( dead: make(chan struct{}), scheme: scheme, } + s.alive.transformer = transformer s.alive.eventRouter = eventRouter s.alive.topicManager = topicManager s.alive.worker = worker @@ -170,6 +175,13 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa s.cancel(err) return errors.Trace(err) } + + err = s.alive.transformer.Apply(row) + if err != nil { + s.cancel(err) + return errors.Trace(err) + } + index, key, err := s.alive.eventRouter.GetPartitionForRowChange(row, partitionNum) if err != nil { s.cancel(err) diff --git a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go index 93b40f9ac8b..ea77163106d 100644 --- a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go +++ b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector" "github.com/pingcap/tiflow/cdc/sink/util" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -104,6 +105,11 @@ func NewPulsarDMLSink( return nil, errors.Trace(err) } + trans, err := columnselector.New(replicaConfig) + if err != nil { + return nil, errors.Trace(err) + } + encoderConfig, err := util.GetEncoderConfig(changefeedID, sinkURI, protocol, replicaConfig, config.DefaultMaxMessageBytes) if err != nil { @@ -119,7 +125,7 @@ func NewPulsarDMLSink( encoderGroup := codec.NewEncoderGroup(encoderBuilder, concurrency, changefeedID) s := newDMLSink(ctx, changefeedID, p, nil, topicManager, - eventRouter, encoderGroup, protocol, scheme, errCh) + eventRouter, trans, encoderGroup, protocol, scheme, errCh) return s, nil } diff --git a/cdc/sink/dmlsink/mq/transformer/columnselector/column_selector.go b/cdc/sink/dmlsink/mq/transformer/columnselector/column_selector.go new file mode 100644 index 00000000000..54690fb134b --- /dev/null +++ b/cdc/sink/dmlsink/mq/transformer/columnselector/column_selector.go @@ -0,0 +1,186 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package columnselector + +import ( + filter "github.com/pingcap/tidb/util/table-filter" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher/partition" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" +) + +type selector struct { + tableF filter.Filter + columnM filter.ColumnFilter +} + +func newSelector( + rule *config.ColumnSelector, caseSensitive bool, +) (*selector, error) { + tableM, err := filter.Parse(rule.Matcher) + if err != nil { + return nil, errors.WrapError(errors.ErrFilterRuleInvalid, err, rule.Matcher) + } + if !caseSensitive { + tableM = filter.CaseInsensitive(tableM) + } + columnM, err := filter.ParseColumnFilter(rule.Columns) + if err != nil { + return nil, errors.WrapError(errors.ErrFilterRuleInvalid, err, rule.Columns) + } + + return &selector{ + tableF: tableM, + columnM: columnM, + }, nil +} + +// Match implements Transformer interface +func (s *selector) Match(schema, table string) bool { + return s.tableF.MatchTable(schema, table) +} + +// Apply implements Transformer interface +// return error if the given event cannot match the selector, or the column cannot be filtered out. +// the caller's should make sure the given event match the selector first before apply it. +func (s *selector) Apply(event *model.RowChangedEvent) error { + // defensive check, this should not happen. + if !s.Match(event.Table.Schema, event.Table.Table) { + return errors.ErrColumnSelectorFailed.GenWithStack( + "the given event does not match the column selector, table: %v", event.Table) + } + + for idx, column := range event.Columns { + if s.columnM.MatchColumn(column.Name) { + continue + } + if column.Flag.IsHandleKey() || column.Flag.IsUniqueKey() { + return errors.ErrColumnSelectorFailed.GenWithStack( + "primary key or unique key cannot be filtered out by the column selector, "+ + "table: %v, column: %s", event.Table, column.Name) + } + event.Columns[idx] = nil + } + + for idx, column := range event.PreColumns { + if s.columnM.MatchColumn(column.Name) { + continue + } + if column.Flag.IsHandleKey() || column.Flag.IsUniqueKey() { + return errors.ErrColumnSelectorFailed.GenWithStack( + "primary key or unique key cannot be filtered out by the column selector, "+ + "table: %v, column: %s", event.Table, column.Name) + } + event.PreColumns[idx] = nil + } + + return nil +} + +// ColumnSelector manages an array of selectors, the first selector match the given +// event is used to select out columns. +type ColumnSelector struct { + selectors []*selector +} + +// New return a column selector +func New(cfg *config.ReplicaConfig) (*ColumnSelector, error) { + selectors := make([]*selector, 0, len(cfg.Sink.ColumnSelectors)) + for _, r := range cfg.Sink.ColumnSelectors { + selector, err := newSelector(r, cfg.CaseSensitive) + if err != nil { + return nil, err + } + selectors = append(selectors, selector) + } + + return &ColumnSelector{ + selectors: selectors, + }, nil +} + +// Apply the column selector to the given event. +func (c *ColumnSelector) Apply(event *model.RowChangedEvent) error { + for _, s := range c.selectors { + if s.Match(event.Table.Schema, event.Table.Table) { + return s.Apply(event) + } + } + return nil +} + +// VerifyTables return the error if any given table cannot satisfy the column selector constraints. +// 1. if the column is filter out, it must not be a part of handle key or the unique key. +// 2. if the filtered out column is used in the column dispatcher, return error. +func (c *ColumnSelector) VerifyTables( + infos []*model.TableInfo, eventRouter *dispatcher.EventRouter, +) error { + if len(c.selectors) == 0 { + return nil + } + + for _, table := range infos { + for _, s := range c.selectors { + if !s.Match(table.TableName.Schema, table.TableName.Table) { + continue + } + for columnID, flag := range table.ColumnsFlag { + columnInfo, ok := table.GetColumnInfo(columnID) + if !ok { + return errors.ErrColumnSelectorFailed.GenWithStack( + "column not found when verify the table for the column selector, table: %v, column: %s", + table.TableName, columnInfo.Name) + } + + if s.columnM.MatchColumn(columnInfo.Name.O) { + continue + } + // the column is filter out. + if flag.IsHandleKey() || flag.IsUniqueKey() { + return errors.ErrColumnSelectorFailed.GenWithStack( + "primary key or unique key cannot be filtered out by the column selector, "+ + "table: %v, column: %s", table.TableName, columnInfo.Name) + } + + partitionDispatcher := eventRouter.GetPartitionDispatcher(table.TableName.Schema, table.TableName.Table) + switch v := partitionDispatcher.(type) { + case *partition.ColumnsDispatcher: + for _, col := range v.Columns { + if col == columnInfo.Name.O { + return errors.ErrColumnSelectorFailed.GenWithStack( + "the filtered out column is used in the column dispatcher, "+ + "table: %v, column: %s", table.TableName, columnInfo.Name) + } + } + default: + } + } + } + } + + return nil +} + +// VerifyColumn return true if the given `schema.table` column is matched. +func (c *ColumnSelector) VerifyColumn(schema, table, column string) bool { + for _, s := range c.selectors { + if !s.Match(schema, table) { + continue + } + return s.columnM.MatchColumn(column) + } + return true +} diff --git a/cdc/sink/dmlsink/mq/transformer/columnselector/column_selector_test.go b/cdc/sink/dmlsink/mq/transformer/columnselector/column_selector_test.go new file mode 100644 index 00000000000..e1ae025ccbd --- /dev/null +++ b/cdc/sink/dmlsink/mq/transformer/columnselector/column_selector_test.go @@ -0,0 +1,296 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package columnselector + +import ( + "testing" + + timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher" + "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/stretchr/testify/require" +) + +var event = &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test", + Table: "table1", + }, + Columns: []*model.Column{ + { + Name: "col1", + Value: []byte("val1"), + }, + { + Name: "col2", + Value: []byte("val2"), + }, + { + Name: "col3", + Value: []byte("val3"), + }, + }, + PreColumns: []*model.Column{ + { + Name: "col1", + Value: []byte("val1"), + }, + { + Name: "col2", + Value: []byte("val2"), + }, + { + Name: "col3", + Value: []byte("val3"), + }, + }, +} + +func TestNewColumnSelectorNoRules(t *testing.T) { + // the column selector is not set + replicaConfig := config.GetDefaultReplicaConfig() + selectors, err := New(replicaConfig) + require.NoError(t, err) + require.NotNil(t, selectors) + require.Len(t, selectors.selectors, 0) + + err = selectors.Apply(event) + require.NoError(t, err) + for _, column := range event.Columns { + require.NotNil(t, column.Value) + } + for _, column := range event.PreColumns { + require.NotNil(t, column.Value) + } +} + +func TestNewColumnSelector(t *testing.T) { + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{ + { + Matcher: []string{"test.*"}, + Columns: []string{"col1", "col2"}, + }, + { + Matcher: []string{"test1.*"}, + Columns: []string{"*", "!a"}, + }, + { + Matcher: []string{"test2.*"}, + Columns: []string{"co*", "!col2"}, + }, + { + Matcher: []string{"test3.*"}, + Columns: []string{"co?1"}, + }, + } + selectors, err := New(replicaConfig) + require.NoError(t, err) + require.Len(t, selectors.selectors, 4) + + // column3 is filter out, set to nil. + err = selectors.Apply(event) + require.NoError(t, err) + require.Equal(t, []byte("val1"), event.Columns[0].Value) + require.Equal(t, []byte("val2"), event.Columns[1].Value) + require.Nil(t, event.Columns[2]) + + require.Equal(t, []byte("val1"), event.PreColumns[0].Value) + require.Equal(t, []byte("val2"), event.PreColumns[1].Value) + require.Nil(t, event.PreColumns[2]) + + event = &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test1", + Table: "table1", + }, + Columns: []*model.Column{ + { + Name: "a", + Value: []byte("a"), + }, + { + Name: "b", + Value: []byte("b"), + }, + { + Name: "c", + Value: []byte("c"), + }, + }, + } + // the first column `a` is filter out, set to nil. + err = selectors.Apply(event) + require.NoError(t, err) + require.Nil(t, event.Columns[0]) + require.Equal(t, []byte("b"), event.Columns[1].Value) + require.Equal(t, []byte("c"), event.Columns[2].Value) + + event = &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test2", + Table: "table1", + }, + Columns: []*model.Column{ + { + Name: "col", + Value: []byte("col"), + }, + { + Name: "col1", + Value: []byte("col1"), + }, + { + Name: "col2", + Value: []byte("col2"), + }, + { + Name: "col3", + Value: []byte("col3"), + }, + }, + } + err = selectors.Apply(event) + require.NoError(t, err) + require.Equal(t, []byte("col"), event.Columns[0].Value) + require.Equal(t, []byte("col1"), event.Columns[1].Value) + require.Nil(t, event.Columns[2]) + require.Equal(t, []byte("col3"), event.Columns[3].Value) + + event = &model.RowChangedEvent{ + Table: &model.TableName{ + Schema: "test3", + Table: "table1", + }, + Columns: []*model.Column{ + { + Name: "col", + Value: []byte("col"), + }, + { + Name: "col1", + Value: []byte("col1"), + }, + { + Name: "col2", + Value: []byte("col2"), + }, + { + Name: "coA1", + Value: []byte("coA1"), + }, + }, + } + err = selectors.Apply(event) + require.NoError(t, err) + require.Nil(t, event.Columns[0]) + require.Equal(t, []byte("col1"), event.Columns[1].Value) + require.Nil(t, event.Columns[2]) + require.Equal(t, []byte("coA1"), event.Columns[3].Value) +} + +func TestVerifyTableColumnNotAllowFiltered(t *testing.T) { + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{ + { + Matcher: []string{"test.*"}, + Columns: []string{"b"}, + }, + } + selector, err := New(replicaConfig) + require.NoError(t, err) + + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, config.ProtocolDefault, "default", "default") + require.NoError(t, err) + + info := &timodel.TableInfo{ + Name: timodel.CIStr{O: "t1", L: "t1"}, + Columns: []*timodel.ColumnInfo{ + { + ID: 0, + Name: timodel.CIStr{O: "a", L: "a"}, + Offset: 0, + }, + { + ID: 1, + Name: timodel.CIStr{O: "b", L: "b"}, + Offset: 1, + }, + { + ID: 2, + Name: timodel.CIStr{O: "c", L: "c"}, + Offset: 2, + }, + }, + } + table := model.WrapTableInfo(0, "test", 0, info) + table.ColumnsFlag[0] = model.HandleKeyFlag + infos := []*model.TableInfo{table} + + // column `a` is handle key, but it is filter out, return error. + err = selector.VerifyTables(infos, eventRouter) + require.ErrorIs(t, err, cerror.ErrColumnSelectorFailed) +} + +func TestVerifyTablesColumnFilteredInDispatcher(t *testing.T) { + replicaConfig := config.GetDefaultReplicaConfig() + replicaConfig.Sink.ColumnSelectors = []*config.ColumnSelector{ + { + Matcher: []string{"test.*"}, + Columns: []string{"a", "b"}, + }, + } + replicaConfig.Sink.DispatchRules = []*config.DispatchRule{ + { + Matcher: []string{"test.*"}, + PartitionRule: "columns", + Columns: []string{"c"}, + }, + } + + selectors, err := New(replicaConfig) + require.NoError(t, err) + + eventRouter, err := dispatcher.NewEventRouter(replicaConfig, config.ProtocolDefault, "default", "default") + require.NoError(t, err) + + info := &timodel.TableInfo{ + Name: timodel.CIStr{O: "t1", L: "t1"}, + Columns: []*timodel.ColumnInfo{ + { + ID: 0, + Name: timodel.CIStr{O: "a", L: "a"}, + Offset: 0, + }, + { + ID: 1, + Name: timodel.CIStr{O: "b", L: "b"}, + Offset: 1, + }, + { + ID: 2, + Name: timodel.CIStr{O: "c", L: "c"}, + Offset: 2, + }, + }, + } + + table := model.WrapTableInfo(0, "test", 0, info) + // column `c` is filter out, but it is used in the column dispatcher, return error. + infos := []*model.TableInfo{table} + err = selectors.VerifyTables(infos, eventRouter) + require.ErrorIs(t, err, cerror.ErrColumnSelectorFailed) +} diff --git a/cdc/sink/dmlsink/mq/transformer/transformer.go b/cdc/sink/dmlsink/mq/transformer/transformer.go new file mode 100644 index 00000000000..2cdb8c1780a --- /dev/null +++ b/cdc/sink/dmlsink/mq/transformer/transformer.go @@ -0,0 +1,21 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package transformer + +import "github.com/pingcap/tiflow/cdc/model" + +// Transformer is the interface for transform the event. +type Transformer interface { + Apply(event *model.RowChangedEvent) error +} diff --git a/errors.toml b/errors.toml index 1563e43c05e..9d5021d5de4 100755 --- a/errors.toml +++ b/errors.toml @@ -181,6 +181,11 @@ error = ''' Codec invalid config ''' +["CDC:ErrColumnSelectorFailed"] +error = ''' +column selector failed +''' + ["CDC:ErrCompressionFailed"] error = ''' Compression failed diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 301d3854717..82bb51a587b 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -291,6 +291,13 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin if err := c.Integrity.Validate(); err != nil { return err } + + if c.Integrity.Enabled() && len(c.Sink.ColumnSelectors) != 0 { + log.Error("it's not allowed to enable the integrity check and column selector at the same time") + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "integrity check enabled and column selector set, not allowed") + + } } if c.ChangefeedErrorStuckDuration != nil && diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 12e1f0422ed..09da7e25d4c 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -22,6 +22,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/pingcap/tiflow/pkg/compression" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/integrity" "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" @@ -235,6 +236,22 @@ func TestReplicaConfigValidate(t *testing.T) { require.Error(t, err) } +func TestValidateIntegrity(t *testing.T) { + sinkURL, err := url.Parse("kafka://topic?protocol=avro") + require.NoError(t, err) + + cfg := GetDefaultReplicaConfig() + cfg.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness + cfg.Sink.ColumnSelectors = []*ColumnSelector{ + { + Matcher: []string{"a.b"}, Columns: []string{"c"}, + }, + } + + err = cfg.ValidateAndAdjust(sinkURL) + require.ErrorIs(t, err, cerror.ErrInvalidReplicaConfig) +} + func TestValidateAndAdjust(t *testing.T) { cfg := GetDefaultReplicaConfig() diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 44d59590db5..da1174fe1e1 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -122,7 +122,7 @@ type SinkConfig struct { DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers,omitempty"` // CSVConfig is only available when the downstream is Storage. CSVConfig *CSVConfig `toml:"csv" json:"csv,omitempty"` - // ColumnSelectors is Deprecated. + ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors,omitempty"` // SchemaRegistry is only available when the downstream is MQ using avro protocol. SchemaRegistry *string `toml:"schema-registry" json:"schema-registry,omitempty"` diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index 22b60205b79..ecbd5be843b 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -130,6 +130,11 @@ var ( errors.RFCCodeText("CDC:ErrDispatcherFailed"), ) + ErrColumnSelectorFailed = errors.Normalize( + "column selector failed", + errors.RFCCodeText("CDC:ErrColumnSelectorFailed"), + ) + // internal errors ErrAdminStopProcessor = errors.Normalize( "stop processor by admin command", diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index 9f5a3055bac..7b36ecdee8c 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -78,6 +78,7 @@ var changefeedUnRetryableErrors = []*errors.Error{ ErrChangefeedUnretryable, ErrCorruptedDataMutation, ErrDispatcherFailed, + ErrColumnSelectorFailed, ErrSinkURIInvalid, ErrKafkaInvalidConfig, diff --git a/pkg/sink/codec/avro/avro.go b/pkg/sink/codec/avro/avro.go index 80a863ba745..d3760c8f056 100644 --- a/pkg/sink/codec/avro/avro.go +++ b/pkg/sink/codec/avro/avro.go @@ -569,6 +569,9 @@ func (a *BatchEncoder) columns2AvroSchema( Fields: nil, } for i, col := range input.columns { + if col == nil { + continue + } avroType, err := a.columnToAvroSchema(col, input.colInfos[i].Ft) if err != nil { return nil, err diff --git a/pkg/sink/codec/avro/decoder.go b/pkg/sink/codec/avro/decoder.go index 736bdc4a89c..fb14e8af627 100644 --- a/pkg/sink/codec/avro/decoder.go +++ b/pkg/sink/codec/avro/decoder.go @@ -80,7 +80,7 @@ func (d *decoder) HasNext() (model.MessageType, bool, error) { return model.MessageTypeRow, true, nil } if len(d.value) < 1 { - return model.MessageTypeUnknown, false, errors.ErrAvroInvalidMessage.FastGenByArgs() + return model.MessageTypeUnknown, false, errors.ErrAvroInvalidMessage.FastGenByArgs(d.value) } switch d.value[0] { case magicByte: @@ -90,7 +90,7 @@ func (d *decoder) HasNext() (model.MessageType, bool, error) { case checkpointByte: return model.MessageTypeResolved, true, nil } - return model.MessageTypeUnknown, false, errors.ErrAvroInvalidMessage.FastGenByArgs() + return model.MessageTypeUnknown, false, errors.ErrAvroInvalidMessage.FastGenByArgs(d.value) } // NextRowChangedEvent returns the next row changed event if exists diff --git a/tests/integration_tests/kafka_column_selector/conf/changefeed.toml b/tests/integration_tests/kafka_column_selector/conf/changefeed.toml new file mode 100644 index 00000000000..addd3dd41a5 --- /dev/null +++ b/tests/integration_tests/kafka_column_selector/conf/changefeed.toml @@ -0,0 +1,7 @@ +[sink] +column-selectors = [ + {matcher = ['test.t1'], columns = ['a', 'b']}, + {matcher = ['test.*'], columns = ["*", "!b"]}, + + {matcher = ['test1.t1'], columns = ['column*', '!column1']}, +] diff --git a/tests/integration_tests/kafka_column_selector/data/data.sql b/tests/integration_tests/kafka_column_selector/data/data.sql new file mode 100644 index 00000000000..79b4dcee0da --- /dev/null +++ b/tests/integration_tests/kafka_column_selector/data/data.sql @@ -0,0 +1,49 @@ +drop database if exists `test`; +create database `test`; +use `test`; + +create table t1 ( + a int primary key, + b int, + c int +); + +insert into t1 values (1, 2, 3); +insert into t1 values (2, 3, 4); +insert into t1 values (3, 4, 5); + +create table t2 ( + a int primary key, + b int, + c int +); + +insert into t2 values (1, 2, 3); +insert into t2 values (2, 3, 4); +insert into t2 values (3, 4, 5); + +create table t3 ( + a int primary key, + b int, + c int +); + +insert into t3 values (1, 2, 3); +insert into t3 values (2, 3, 4); +insert into t3 values (3, 4, 5); + +drop database if exists `test1`; +create database `test1`; +use `test1`; + +create table t1 ( + column0 int primary key, + column1 int, + column2 int +); + +insert into t1 values (1, 2, 3); +insert into t1 values (2, 3, 4); +insert into t1 values (3, 4, 5); + +create table finishmark(id int primary key); diff --git a/tests/integration_tests/kafka_column_selector/run.sh b/tests/integration_tests/kafka_column_selector/run.sh new file mode 100644 index 00000000000..e8bd3a83f89 --- /dev/null +++ b/tests/integration_tests/kafka_column_selector/run.sh @@ -0,0 +1,52 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + # test kafka sink only in this case + if [ "$SINK_TYPE" != "kafka" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + changefeed_id="test" + TOPIC_NAME="column-selector-test" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=canal-json&partition-num=1&enable-tidb-extension=true" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c ${changefeed_id} --config="$CUR/conf/changefeed.toml" + + cdc_kafka_consumer --upstream-uri $SINK_URI --downstream-uri="mysql://root@127.0.0.1:3306/?safe-mode=true&batch-dml-enable=false" --upstream-tidb-dsn="root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/?" --config="$CUR/conf/changefeed.toml" 2>&1 & + + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + echo "Starting build checksum checker..." + cd $CUR/../../utils/checksum_checker + if [ ! -f ./checksum_checker ]; then + GO111MODULE=on go build + fi + + check_table_exists "test1.finishmark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + ./checksum_checker --upstream-uri "root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/" --downstream-uri "root@tcp(${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT})/" --databases "test,test1" --config="$CUR/conf/changefeed.toml" + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/kafka_column_selector_avro/conf/changefeed.toml b/tests/integration_tests/kafka_column_selector_avro/conf/changefeed.toml new file mode 100644 index 00000000000..85c3512e396 --- /dev/null +++ b/tests/integration_tests/kafka_column_selector_avro/conf/changefeed.toml @@ -0,0 +1,4 @@ +[sink] +column-selectors = [ + {matcher = ['test.*'], columns = ["*", "!b"]}, +] diff --git a/tests/integration_tests/kafka_column_selector_avro/data/data.sql b/tests/integration_tests/kafka_column_selector_avro/data/data.sql new file mode 100644 index 00000000000..15513e5dc2c --- /dev/null +++ b/tests/integration_tests/kafka_column_selector_avro/data/data.sql @@ -0,0 +1,15 @@ +drop database if exists `test`; +create database `test`; +use `test`; + +create table t1 ( + a int primary key, + b int, + c int +); + +insert into t1 values (1, 2, 3); +insert into t1 values (2, 3, 4); +insert into t1 values (3, 4, 5); + +create table finishmark(id int primary key); diff --git a/tests/integration_tests/kafka_column_selector_avro/run.sh b/tests/integration_tests/kafka_column_selector_avro/run.sh new file mode 100644 index 00000000000..e98fc71fc17 --- /dev/null +++ b/tests/integration_tests/kafka_column_selector_avro/run.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + # test kafka sink only in this case + if [ "$SINK_TYPE" != "kafka" ]; then + return + fi + + echo 'Starting schema registry...' + ./bin/bin/schema-registry-start -daemon ./bin/etc/schema-registry/schema-registry.properties + i=0 + while ! curl -o /dev/null -v -s "http://127.0.0.1:8088"; do + i=$(($i + 1)) + if [ $i -gt 30 ]; then + echo 'Failed to start schema registry' + exit 1 + fi + sleep 2 + done + + curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"compatibility": "NONE"}' http://127.0.0.1:8088/config + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # record tso before we create tables to skip the system table DDLs + start_ts=$(cdc cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + changefeed_id="test" + TOPIC_NAME="column-selector-avro-test" + SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=avro&enable-tidb-extension=true&avro-enable-watermark=true" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c ${changefeed_id} --config="$CUR/conf/changefeed.toml" --schema-registry=http://127.0.0.1:8088 + + run_kafka_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml "http://127.0.0.1:8088" + + run_sql_file $CUR/data/data.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + echo "Starting build checksum checker..." + cd $CUR/../../utils/checksum_checker + if [ ! -f ./checksum_checker ]; then + GO111MODULE=on go build + fi + + check_table_exists "test.finishmark" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + ./checksum_checker --upstream-uri "root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/" --downstream-uri "root@tcp(${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT})/" --databases "test" --config="$CUR/conf/changefeed.toml" + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index 0364ef6831f..e6211a5a46b 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -14,7 +14,7 @@ mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_sui mysql_only_http="http_api http_api_tls api_v2" mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table" -kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume mq_sink_lost_callback mq_sink_dispatcher" +kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume mq_sink_lost_callback mq_sink_dispatcher kafka_column_selector kafka_column_selector_avro" kafka_only_protocol="canal_json_adapter_compatibility canal_json_basic multi_topics avro_basic canal_json_handle_key_only open_protocol_handle_key_only canal_json_claim_check open_protocol_claim_check" kafka_only_v2="kafka_big_txn_v2 kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" diff --git a/tests/utils/checksum_checker/main.go b/tests/utils/checksum_checker/main.go new file mode 100644 index 00000000000..5e2a86e4255 --- /dev/null +++ b/tests/utils/checksum_checker/main.go @@ -0,0 +1,260 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "database/sql" + "flag" + "fmt" + "os" + "strings" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer/columnselector" + cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" +) + +type options struct { + upstreamURI string + downstreamURI string + dbNames string + configFile string +} + +func (o *options) validate() error { + if o.upstreamURI == "" { + return errors.New("upstreamURI is required") + } + if o.downstreamURI == "" { + return errors.New("downstreamURI is required") + } + if len(o.dbNames) == 0 { + return errors.New("dbNames is required") + } + return nil +} + +func main() { + o := &options{} + + flags := flag.NewFlagSet(os.Args[0], flag.ExitOnError) + flags.StringVar(&o.upstreamURI, "upstream-uri", "", "upstream database uri") + flags.StringVar(&o.downstreamURI, "downstream-uri", "", "downstream database uri") + flags.StringVar(&o.dbNames, "databases", "", "database names, separate by the `,`") + flags.StringVar(&o.configFile, "config", "", "config file") + if err := flags.Parse(os.Args[1:]); err != nil { + log.Panic("parse args failed", zap.Error(err)) + } + if err := o.validate(); err != nil { + log.Panic("invalid options", zap.Error(err)) + } + + upstreamDB, err := openDB(o.upstreamURI) + if err != nil { + log.Panic("cannot open db for the upstream", zap.Error(err)) + } + + downstreamDB, err := openDB(o.downstreamURI) + if err != nil { + log.Panic("cannot open db for the downstream", zap.Error(err)) + } + + replicaConfig := config.GetDefaultReplicaConfig() + if o.configFile != "" { + err = cmdUtil.StrictDecodeFile(o.configFile, "checksum checker", replicaConfig) + if err != nil { + log.Panic("cannot decode config file", zap.Error(err)) + } + } + + columnFilter, err := columnselector.New(replicaConfig) + if err != nil { + log.Panic("cannot create column filter", zap.Error(err)) + } + + dbNames := strings.Split(o.dbNames, ",") + err = compareCRC32CheckSum(upstreamDB, downstreamDB, dbNames, columnFilter) + if err != nil { + log.Panic("compare checksum failed", zap.Error(err)) + } + log.Info("compare checksum passed") +} + +func compareCRC32CheckSum( + upstream, downstream *sql.DB, dbNames []string, selector *columnselector.ColumnSelector, +) error { + start := time.Now() + source, err := getChecksum(upstream, dbNames, selector) + if err != nil { + log.Warn("get checksum for the upstream failed", zap.Error(err)) + return errors.Trace(err) + } + log.Info("get checksum for the upstream success", + zap.Duration("elapsed", time.Since(start))) + + start = time.Now() + sink, err := getChecksum(downstream, dbNames, selector) + if err != nil { + log.Warn("get checksum for the downstream failed", zap.Error(err)) + return errors.Trace(err) + } + log.Info("get checksum for the downstream success", + zap.Duration("elapsed", time.Since(start))) + + if len(source) != len(sink) { + log.Error("source and sink have different crc32 size", + zap.Int("source", len(source)), zap.Int("sink", len(sink))) + return fmt.Errorf("source and sink have different crc32 size, source: %d, sink: %d", + len(source), len(sink)) + } + + for tableName, expected := range source { + actual, ok := sink[tableName] + if !ok { + return fmt.Errorf("table not found at sink, table: %s", tableName) + } + if expected != actual { + log.Error("crc32 mismatch", + zap.String("table", tableName), zap.Uint32("source", expected), zap.Uint32("sink", actual)) + return fmt.Errorf("crc32 mismatch, table: %s, source: %d, sink: %d", tableName, expected, actual) + } + } + return nil +} + +func getChecksum( + db *sql.DB, dbNames []string, selector *columnselector.ColumnSelector, +) (map[string]uint32, error) { + result := make(map[string]uint32) + for _, dbName := range dbNames { + tables, err := getAllTables(db, dbName) + if err != nil { + return nil, err + } + for _, table := range tables { + tx, err := db.Begin() + if err != nil { + _ = tx.Rollback() + return nil, errors.Trace(err) + } + columns, err := getColumns(tx, dbName, table, selector) + if err != nil { + _ = tx.Rollback() + return nil, errors.Trace(err) + } + checksum, err := doChecksum(tx, dbName, table, columns) + if err != nil { + _ = tx.Rollback() + return nil, errors.Trace(err) + } + _ = tx.Commit() + result[dbName+"."+table] = checksum + } + } + return result, nil +} + +func doChecksum(tx *sql.Tx, schema, table string, columns []string) (uint32, error) { + a := strings.Join(columns, "`,`") + + concat := fmt.Sprintf("CONCAT_WS(',', `%s`)", a) + tableName := schema + "." + table + query := fmt.Sprintf("SELECT BIT_XOR(CRC32(%s)) AS checksum FROM %s", concat, tableName) + var checkSum uint32 + rows := tx.QueryRow(query) + err := rows.Scan(&checkSum) + if err != nil { + log.Error("get crc32 checksum failed", + zap.Error(err), zap.String("table", tableName), zap.String("query", query)) + return 0, errors.Trace(err) + } + log.Info("do checkSum success", zap.String("table", tableName), zap.Uint32("checkSum", checkSum)) + return checkSum, nil +} + +func getColumns(tx *sql.Tx, schema, table string, selector *columnselector.ColumnSelector) (result []string, err error) { + rows, err := tx.Query(fmt.Sprintf("SHOW COLUMNS FROM %s", schema+"."+table)) + if err != nil { + return nil, errors.Trace(err) + } + defer func() { + if err := rows.Close(); err != nil { + log.Warn("close rows failed", zap.Error(err)) + } + }() + + for rows.Next() { + var t columnInfo + if err := rows.Scan(&t.Field, &t.Type, &t.Null, &t.Key, &t.Default, &t.Extra); err != nil { + return result, errors.Trace(err) + } + if selector.VerifyColumn(schema, table, t.Field) { + result = append(result, t.Field) + } + } + return result, nil +} + +type columnInfo struct { + Field string + Type string + Null string + Key string + Default *string + Extra string +} + +func getAllTables(db *sql.DB, dbName string) ([]string, error) { + var result []string + dbName = strings.TrimSpace(dbName) + tx, err := db.Begin() + if err != nil { + _ = tx.Rollback() + return nil, errors.Trace(err) + } + query := fmt.Sprintf(`show full tables from %s where table_type != "VIEW"`, dbName) + rows, err := tx.Query(query) + if err != nil { + _ = tx.Rollback() + return nil, errors.Trace(err) + } + for rows.Next() { + var t string + var tt string + if err := rows.Scan(&t, &tt); err != nil { + _ = tx.Rollback() + return nil, errors.Trace(err) + } + result = append(result, t) + } + _ = rows.Close() + _ = tx.Commit() + return result, nil +} + +func openDB(uri string) (*sql.DB, error) { + db, err := sql.Open("mysql", uri) + if err != nil { + return nil, errors.Trace(err) + } + + if err := db.Ping(); err != nil { + return nil, errors.Trace(err) + } + return db, nil +}