Skip to content

Commit

Permalink
kafka(ticdc): support columns dispatcher. (pingcap#9863)
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Oct 17, 2023
1 parent 3b8d55b commit 3802006
Show file tree
Hide file tree
Showing 15 changed files with 436 additions and 22 deletions.
9 changes: 6 additions & 3 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
DispatcherRule: "",
PartitionRule: rule.PartitionRule,
IndexName: rule.IndexName,
Columns: rule.Columns,
TopicRule: rule.TopicRule,
})
}
Expand Down Expand Up @@ -554,6 +555,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
Matcher: rule.Matcher,
PartitionRule: rule.PartitionRule,
IndexName: rule.IndexName,
Columns: rule.Columns,
TopicRule: rule.TopicRule,
})
}
Expand Down Expand Up @@ -915,9 +917,10 @@ type LargeMessageHandleConfig struct {
// This is a duplicate of config.DispatchRule
type DispatchRule struct {
Matcher []string `json:"matcher,omitempty"`
PartitionRule string `json:"partition"`
IndexName string `json:"index"`
TopicRule string `json:"topic"`
PartitionRule string `json:"partition,omitempty"`
IndexName string `json:"index,omitempty"`
Columns []string `json:"columns,omitempty"`
TopicRule string `json:"topic,omitempty"`
}

// ColumnSelector represents a column selector for a table.
Expand Down
52 changes: 42 additions & 10 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,18 +316,50 @@ func (ti *TableInfo) Clone() *TableInfo {
return WrapTableInfo(ti.SchemaID, ti.TableName.Schema, ti.Version, ti.TableInfo.Clone())
}

// GetIndex return the corresponding index by the given name.
func (ti *TableInfo) GetIndex(name string) *model.IndexInfo {
for _, index := range ti.Indices {
if index != nil && index.Name.O == name {
return index
}
}
return nil
}

// IndexByName returns the index columns and offsets of the corresponding index by name
func (ti *TableInfo) IndexByName(name string) ([]string, []int, bool) {
for _, index := range ti.Indices {
if index.Name.O == name {
names := make([]string, 0, len(index.Columns))
offset := make([]int, 0, len(index.Columns))
for _, col := range index.Columns {
names = append(names, col.Name.O)
offset = append(offset, col.Offset)
}
return names, offset, true
index := ti.GetIndex(name)
if index == nil {
return nil, nil, false
}
names := make([]string, 0, len(index.Columns))
offset := make([]int, 0, len(index.Columns))
for _, col := range index.Columns {
names = append(names, col.Name.O)
offset = append(offset, col.Offset)
}
return names, offset, true
}

// ColumnsByNames returns the column offsets of the corresponding columns by names
// If any column does not exist, return false
func (ti *TableInfo) ColumnsByNames(names []string) ([]int, bool) {
// todo: optimize it
columnOffsets := make(map[string]int, len(ti.Columns))
for _, col := range ti.Columns {
if col != nil {
columnOffsets[col.Name.O] = col.Offset
}
}
return nil, nil, false

result := make([]int, 0, len(names))
for _, col := range names {
offset, ok := columnOffsets[col]
if !ok {
return nil, false
}
result = append(result, offset)
}

return result, true
}
54 changes: 53 additions & 1 deletion cdc/model/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,16 @@ func TestTableInfoClone(t *testing.T) {

func TestIndexByName(t *testing.T) {
tableInfo := &TableInfo{
TableInfo: &timodel.TableInfo{
Indices: nil,
},
}
names, offsets, ok := tableInfo.IndexByName("idx1")
require.False(t, ok)
require.Nil(t, names)
require.Nil(t, offsets)

tableInfo = &TableInfo{
TableInfo: &timodel.TableInfo{
Indices: []*timodel.IndexInfo{
{
Expand All @@ -291,7 +301,7 @@ func TestIndexByName(t *testing.T) {
},
}

names, offsets, ok := tableInfo.IndexByName("idx2")
names, offsets, ok = tableInfo.IndexByName("idx2")
require.False(t, ok)
require.Nil(t, names)
require.Nil(t, offsets)
Expand All @@ -301,3 +311,45 @@ func TestIndexByName(t *testing.T) {
require.Equal(t, []string{"col1"}, names)
require.Equal(t, []int{0}, offsets)
}

func TestColumnsByNames(t *testing.T) {
tableInfo := &TableInfo{
TableInfo: &timodel.TableInfo{
Columns: []*timodel.ColumnInfo{
{
Name: timodel.CIStr{
O: "col2",
},
Offset: 1,
},
{
Name: timodel.CIStr{
O: "col1",
},
Offset: 0,
},
{
Name: timodel.CIStr{
O: "col3",
},
Offset: 2,
},
},
},
}

names := []string{"col1", "col2", "col3"}
offsets, ok := tableInfo.ColumnsByNames(names)
require.True(t, ok)
require.Equal(t, []int{0, 1, 2}, offsets)

names = []string{"col2"}
offsets, ok = tableInfo.ColumnsByNames(names)
require.True(t, ok)
require.Equal(t, []int{1}, offsets)

names = []string{"col1", "col-not-found"}
offsets, ok = tableInfo.ColumnsByNames(names)
require.False(t, ok)
require.Nil(t, offsets)
}
33 changes: 27 additions & 6 deletions cdc/sink/dmlsink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func NewEventRouter(
f = filter.CaseInsensitive(f)
}

d := getPartitionDispatcher(ruleConfig.PartitionRule, scheme, ruleConfig.IndexName)
d := getPartitionDispatcher(
ruleConfig.PartitionRule, scheme, ruleConfig.IndexName, ruleConfig.Columns,
)
t, err := getTopicDispatcher(ruleConfig.TopicRule, defaultTopic, protocol, scheme)
if err != nil {
return nil, err
Expand Down Expand Up @@ -130,12 +132,27 @@ func (s *EventRouter) GetPartitionForRowChange(
func (s *EventRouter) VerifyTables(infos []*model.TableInfo) error {
for _, table := range infos {
_, partitionDispatcher := s.matchDispatcher(table.TableName.Schema, table.TableName.Table)
if v, ok := partitionDispatcher.(*partition.IndexValueDispatcher); ok {
_, _, ok = table.IndexByName(v.IndexName)
if !ok {
switch v := partitionDispatcher.(type) {
case *partition.IndexValueDispatcher:
index := table.GetIndex(v.IndexName)
if index == nil {
return cerror.ErrDispatcherFailed.GenWithStack(
"index not found when verify the table, table: %v, index: %s", table.TableName, v.IndexName)
}
// only allow the unique index to be set.
// For the non-unique index, if any column belongs to the index is updated,
// the event is not split, it may cause incorrect data consumption.
if !index.Unique {
return cerror.ErrDispatcherFailed.GenWithStack(
"index is not unique when verify the table, table: %v, index: %s", table.TableName, v.IndexName)
}
case *partition.ColumnsDispatcher:
_, ok := table.ColumnsByNames(v.Columns)
if !ok {
return cerror.ErrDispatcherFailed.GenWithStack(
"columns not found when verify the table, table: %v, columns: %v", table.TableName, v.Columns)
}
default:
}
}
return nil
Expand Down Expand Up @@ -191,7 +208,9 @@ func (s *EventRouter) matchDispatcher(
}

// getPartitionDispatcher returns the partition dispatcher for a specific partition rule.
func getPartitionDispatcher(rule string, scheme string, indexName string) partition.Dispatcher {
func getPartitionDispatcher(
rule string, scheme string, indexName string, columns []string,
) partition.Dispatcher {
switch strings.ToLower(rule) {
case "default":
return partition.NewDefaultDispatcher()
Expand All @@ -204,14 +223,16 @@ func getPartitionDispatcher(rule string, scheme string, indexName string) partit
case "rowid":
log.Warn("rowid is deprecated, index-value is used as the partition dispatcher.")
return partition.NewIndexValueDispatcher(indexName)
case "columns":
return partition.NewColumnsDispatcher(columns)
default:
}

if sink.IsPulsarScheme(scheme) {
return partition.NewKeyDispatcher(rule)
}

log.Warn("the partition dispatch rule is not default/ts/table/index-value," +
log.Warn("the partition dispatch rule is not default/ts/table/index-value/columns," +
" use the default rule instead.")
return partition.NewDefaultDispatcher()
}
Expand Down
73 changes: 73 additions & 0 deletions cdc/sink/dmlsink/mq/dispatcher/partition/columns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package partition

import (
"strconv"
"sync"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/hash"
"go.uber.org/zap"
)

// ColumnsDispatcher is a partition dispatcher
// which dispatches events based on the given columns.
type ColumnsDispatcher struct {
hasher *hash.PositionInertia
lock sync.Mutex

Columns []string
}

// NewColumnsDispatcher creates a ColumnsDispatcher.
func NewColumnsDispatcher(columns []string) *ColumnsDispatcher {
return &ColumnsDispatcher{
hasher: hash.NewPositionInertia(),
Columns: columns,
}
}

// DispatchRowChangedEvent returns the target partition to which
// a row changed event should be dispatched.
func (r *ColumnsDispatcher) DispatchRowChangedEvent(row *model.RowChangedEvent, partitionNum int32) (int32, string, error) {
r.lock.Lock()
defer r.lock.Unlock()
r.hasher.Reset()

r.hasher.Write([]byte(row.Table.Schema), []byte(row.Table.Table))

dispatchCols := row.Columns
if len(dispatchCols) == 0 {
dispatchCols = row.PreColumns
}

offsets, ok := row.TableInfo.ColumnsByNames(r.Columns)
if !ok {
log.Error("columns not found when dispatch event",
zap.Any("tableName", row.Table),
zap.Strings("columns", r.Columns))
return 0, "", errors.ErrDispatcherFailed.GenWithStack(
"columns not found when dispatch event, table: %v, columns: %v", row.Table, r.Columns)
}

for idx := 0; idx < len(r.Columns); idx++ {
r.hasher.Write([]byte(r.Columns[idx]), []byte(model.ColumnValueString(dispatchCols[offsets[idx]].Value)))
}

sum32 := r.hasher.Sum32()
return int32(sum32 % uint32(partitionNum)), strconv.FormatInt(int64(sum32), 10), nil
}
81 changes: 81 additions & 0 deletions cdc/sink/dmlsink/mq/dispatcher/partition/columns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package partition

import (
"testing"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
)

func TestColumnsDispatcher(t *testing.T) {
t.Parallel()

event := &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "t1",
},
TableInfo: &model.TableInfo{
TableInfo: &timodel.TableInfo{
Columns: []*timodel.ColumnInfo{
{
Name: timodel.CIStr{
O: "col2",
},
Offset: 1,
},
{
Name: timodel.CIStr{
O: "col1",
},
Offset: 0,
},
{
Name: timodel.CIStr{
O: "col3",
},
Offset: 2,
},
},
},
},
Columns: []*model.Column{
{
Name: "col1",
Value: 11,
},
{
Name: "col2",
Value: 22,
},
{
Name: "col3",
Value: 33,
},
},
}

p := NewColumnsDispatcher([]string{"col-2", "col-not-found"})
_, _, err := p.DispatchRowChangedEvent(event, 16)
require.ErrorIs(t, err, errors.ErrDispatcherFailed)

p = NewColumnsDispatcher([]string{"col2", "col1"})
index, _, err := p.DispatchRowChangedEvent(event, 16)
require.NoError(t, err)
require.Equal(t, int32(15), index)
}
Loading

0 comments on commit 3802006

Please sign in to comment.