Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan committed Nov 21, 2024
1 parent c2f69f8 commit a481163
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 68 deletions.
33 changes: 3 additions & 30 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/sink/util"
"github.com/pingcap/tiflow/pkg/spanz"
"go.uber.org/zap"
Expand Down Expand Up @@ -81,7 +80,7 @@ type Dispatcher struct {
// componentStatus is the status of the dispatcher, such as working, removing, stopped.
componentStatus *ComponentStateWithMutex
// the config of filter
filterConfig *config.FilterConfig
filterConfig *eventpb.FilterConfig

// tableInfo is the latest table info of the dispatcher
tableInfo *common.TableInfo
Expand Down Expand Up @@ -139,7 +138,7 @@ func NewDispatcher(
schemaID int64,
schemaIDToDispatchers *SchemaIDToDispatchers,
syncPointConfig *syncpoint.SyncPointConfig,
filterConfig *config.FilterConfig,
filterConfig *eventpb.FilterConfig,
currentPdTs uint64,
errCh chan error) *Dispatcher {
dispatcher := &Dispatcher{
Expand Down Expand Up @@ -499,33 +498,7 @@ func (d *Dispatcher) EnableSyncPoint() bool {
}

func (d *Dispatcher) GetFilterConfig() *eventpb.FilterConfig {
return toFilterConfigPB(d.filterConfig)
}

func toFilterConfigPB(filter *config.FilterConfig) *eventpb.FilterConfig {
filterConfig := &eventpb.FilterConfig{
Rules: filter.Rules,
IgnoreTxnStartTs: filter.IgnoreTxnStartTs,
EventFilters: make([]*eventpb.EventFilterRule, len(filter.EventFilters)),
}

for _, eventFilter := range filter.EventFilters {
ignoreEvent := make([]string, len(eventFilter.IgnoreEvent))
for _, event := range eventFilter.IgnoreEvent {
ignoreEvent = append(ignoreEvent, string(event))
}
filterConfig.EventFilters = append(filterConfig.EventFilters, &eventpb.EventFilterRule{
Matcher: eventFilter.Matcher,
IgnoreEvent: ignoreEvent,
IgnoreSql: eventFilter.IgnoreSQL,
IgnoreInsertValueExpr: eventFilter.IgnoreInsertValueExpr,
IgnoreUpdateNewValueExpr: eventFilter.IgnoreUpdateNewValueExpr,
IgnoreUpdateOldValueExpr: eventFilter.IgnoreUpdateOldValueExpr,
IgnoreDeleteValueExpr: eventFilter.IgnoreDeleteValueExpr,
})
}

return filterConfig
return d.filterConfig
}

func (d *Dispatcher) GetSyncPointInterval() time.Duration {
Expand Down
33 changes: 31 additions & 2 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/ticdc/eventpb"
"github.com/pingcap/ticdc/pkg/apperror"
"github.com/pingcap/ticdc/pkg/node"

Expand Down Expand Up @@ -54,7 +55,8 @@ type EventDispatcherManager struct {
changefeedID common.ChangeFeedID
maintainerID node.ID

config *config.ChangefeedConfig
config *config.ChangefeedConfig
filterConfig *eventpb.FilterConfig
// only not nil when enable sync point
// TODO: changefeed update config
syncPointConfig *syncpoint.SyncPointConfig
Expand Down Expand Up @@ -122,6 +124,7 @@ func NewEventDispatcherManager(
errCh: make(chan error, 1),
cancel: cancel,
config: cfConfig,
filterConfig: toFilterConfigPB(cfConfig.Filter),
schemaIDToDispatchers: dispatcher.NewSchemaIDToDispatchers(),
tableEventDispatcherCount: metrics.TableEventDispatcherGauge.WithLabelValues(changefeedID.Namespace(), changefeedID.Name()),
metricCreateDispatcherDuration: metrics.CreateDispatcherDuration.WithLabelValues(changefeedID.Namespace(), changefeedID.Name()),
Expand Down Expand Up @@ -331,7 +334,7 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo) er
schemaIds[idx],
e.schemaIDToDispatchers,
e.syncPointConfig,
e.config.Filter,
e.filterConfig,
pdTsList[idx],
e.errCh)

Expand Down Expand Up @@ -657,3 +660,29 @@ func (d *DispatcherMap) ForEach(fn func(id common.DispatcherID, dispatcher *disp
return true
})
}

func toFilterConfigPB(filter *config.FilterConfig) *eventpb.FilterConfig {
filterConfig := &eventpb.FilterConfig{
Rules: filter.Rules,
IgnoreTxnStartTs: filter.IgnoreTxnStartTs,
EventFilters: make([]*eventpb.EventFilterRule, len(filter.EventFilters)),
}

for _, eventFilter := range filter.EventFilters {
ignoreEvent := make([]string, len(eventFilter.IgnoreEvent))
for _, event := range eventFilter.IgnoreEvent {
ignoreEvent = append(ignoreEvent, string(event))
}
filterConfig.EventFilters = append(filterConfig.EventFilters, &eventpb.EventFilterRule{
Matcher: eventFilter.Matcher,
IgnoreEvent: ignoreEvent,
IgnoreSql: eventFilter.IgnoreSQL,
IgnoreInsertValueExpr: eventFilter.IgnoreInsertValueExpr,
IgnoreUpdateNewValueExpr: eventFilter.IgnoreUpdateNewValueExpr,
IgnoreUpdateOldValueExpr: eventFilter.IgnoreUpdateOldValueExpr,
IgnoreDeleteValueExpr: eventFilter.IgnoreDeleteValueExpr,
})
}

return filterConfig
}
8 changes: 2 additions & 6 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,13 +720,9 @@ func (c *eventBroker) getDispatcher(id common.DispatcherID) (*dispatcherStat, bo
}

func (c *eventBroker) addDispatcher(info DispatcherInfo) {
filterConfig := info.GetFilterConfig()
filter, err := filter.GetSharedFilterStorage().GetOrSetFilter(info.GetChangefeedID(), filterConfig, "", false)
if err != nil {
log.Panic("create filter failed", zap.Error(err), zap.Any("filterConfig", filterConfig))
}

defer c.metricDispatcherCount.Inc()
filter := info.GetFilter()

start := time.Now()
id := info.GetID()
span := info.GetTableSpan()
Expand Down
4 changes: 2 additions & 2 deletions pkg/eventservice/event_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/pingcap/ticdc/logservice/schemastore"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/messaging"
"go.uber.org/zap"
)
Expand All @@ -33,7 +33,7 @@ type DispatcherInfo interface {
GetStartTs() uint64
GetActionType() eventpb.ActionType
GetChangefeedID() common.ChangeFeedID
GetFilterConfig() *config.FilterConfig
GetFilter() filter.Filter

// sync point related
SyncPointEnabled() bool
Expand Down
32 changes: 30 additions & 2 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"sync"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/eventpb"
"github.com/pingcap/ticdc/pkg/apperror"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
timodel "github.com/pingcap/tidb/pkg/meta/model"
tfilter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/pingcap/tiflow/cdc/model"
bf "github.com/pingcap/tiflow/pkg/binlog-filter"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -255,13 +257,39 @@ func GetSharedFilterStorage() *SharedFilterStorage {
return storage
}

func (s *SharedFilterStorage) GetOrSetFilter(changeFeedID common.ChangeFeedID, filterConfig *config.FilterConfig, tz string, caseSensitive bool) (Filter, error) {
func (s *SharedFilterStorage) GetOrSetFilter(
changeFeedID common.ChangeFeedID,
cfg *eventpb.FilterConfig,
tz string,
caseSensitive bool,
) (Filter, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if f, ok := s.m[changeFeedID]; ok {
return f, nil
}
f, err := NewFilter(filterConfig, tz, caseSensitive)
// convert eventpb.FilterConfig to config.FilterConfig
filterCfg := &config.FilterConfig{
Rules: cfg.Rules,
IgnoreTxnStartTs: cfg.IgnoreTxnStartTs,
}
for _, rule := range cfg.EventFilters {
f := &config.EventFilterRule{
Matcher: rule.Matcher,
IgnoreSQL: rule.IgnoreSql,
IgnoreInsertValueExpr: rule.IgnoreInsertValueExpr,
IgnoreUpdateNewValueExpr: rule.IgnoreUpdateNewValueExpr,
IgnoreUpdateOldValueExpr: rule.IgnoreUpdateOldValueExpr,
IgnoreDeleteValueExpr: rule.IgnoreDeleteValueExpr,
}
for _, e := range rule.IgnoreEvent {
f.IgnoreEvent = append(f.IgnoreEvent, bf.EventType(e))
}

filterCfg.EventFilters = append(filterCfg.EventFilters, f)
}
//generate table filter
f, err := NewFilter(filterCfg, tz, caseSensitive)
if err != nil {
return nil, err
}
Expand Down
33 changes: 7 additions & 26 deletions pkg/messaging/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"

"github.com/pingcap/ticdc/logservice/logservicepb"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/node"

"github.com/pingcap/log"
Expand All @@ -14,7 +14,6 @@ import (
"github.com/pingcap/ticdc/pkg/apperror"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
bf "github.com/pingcap/tiflow/pkg/binlog-filter"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -161,31 +160,13 @@ func (r RegisterDispatcherRequest) GetChangefeedID() common.ChangeFeedID {
return common.NewChangefeedIDFromPB(r.ChangefeedId)
}

func (r RegisterDispatcherRequest) GetFilterConfig() *config.FilterConfig {
cfg := r.RegisterDispatcherRequest.FilterConfig
if cfg == nil {
return nil
func (r RegisterDispatcherRequest) GetFilter() filter.Filter {
changefeedID := r.GetChangefeedID()
filter, err := filter.GetSharedFilterStorage().GetOrSetFilter(changefeedID, r.RegisterDispatcherRequest.FilterConfig, "", false)
if err != nil {
log.Panic("create filter failed", zap.Error(err), zap.Any("filterConfig", r.RegisterDispatcherRequest.FilterConfig))
}
filterCfg := &config.FilterConfig{
Rules: cfg.Rules,
IgnoreTxnStartTs: cfg.IgnoreTxnStartTs,
}
for _, rule := range cfg.EventFilters {
f := &config.EventFilterRule{
Matcher: rule.Matcher,
IgnoreSQL: rule.IgnoreSql,
IgnoreInsertValueExpr: rule.IgnoreInsertValueExpr,
IgnoreUpdateNewValueExpr: rule.IgnoreUpdateNewValueExpr,
IgnoreUpdateOldValueExpr: rule.IgnoreUpdateOldValueExpr,
IgnoreDeleteValueExpr: rule.IgnoreDeleteValueExpr,
}
for _, e := range rule.IgnoreEvent {
f.IgnoreEvent = append(f.IgnoreEvent, bf.EventType(e))
}

filterCfg.EventFilters = append(filterCfg.EventFilters, f)
}
return filterCfg
return filter
}

func (r RegisterDispatcherRequest) SyncPointEnabled() bool {
Expand Down

0 comments on commit a481163

Please sign in to comment.