diff --git a/downstreamadapter/dispatcher/dispatcher.go b/downstreamadapter/dispatcher/dispatcher.go index a7399fd1..0b11d4ae 100644 --- a/downstreamadapter/dispatcher/dispatcher.go +++ b/downstreamadapter/dispatcher/dispatcher.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" - "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/sink/util" "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" @@ -81,7 +80,7 @@ type Dispatcher struct { // componentStatus is the status of the dispatcher, such as working, removing, stopped. componentStatus *ComponentStateWithMutex // the config of filter - filterConfig *config.FilterConfig + filterConfig *eventpb.FilterConfig // tableInfo is the latest table info of the dispatcher tableInfo *common.TableInfo @@ -139,7 +138,7 @@ func NewDispatcher( schemaID int64, schemaIDToDispatchers *SchemaIDToDispatchers, syncPointConfig *syncpoint.SyncPointConfig, - filterConfig *config.FilterConfig, + filterConfig *eventpb.FilterConfig, currentPdTs uint64, errCh chan error) *Dispatcher { dispatcher := &Dispatcher{ @@ -499,33 +498,7 @@ func (d *Dispatcher) EnableSyncPoint() bool { } func (d *Dispatcher) GetFilterConfig() *eventpb.FilterConfig { - return toFilterConfigPB(d.filterConfig) -} - -func toFilterConfigPB(filter *config.FilterConfig) *eventpb.FilterConfig { - filterConfig := &eventpb.FilterConfig{ - Rules: filter.Rules, - IgnoreTxnStartTs: filter.IgnoreTxnStartTs, - EventFilters: make([]*eventpb.EventFilterRule, len(filter.EventFilters)), - } - - for _, eventFilter := range filter.EventFilters { - ignoreEvent := make([]string, len(eventFilter.IgnoreEvent)) - for _, event := range eventFilter.IgnoreEvent { - ignoreEvent = append(ignoreEvent, string(event)) - } - filterConfig.EventFilters = append(filterConfig.EventFilters, &eventpb.EventFilterRule{ - Matcher: eventFilter.Matcher, - IgnoreEvent: ignoreEvent, - IgnoreSql: eventFilter.IgnoreSQL, - IgnoreInsertValueExpr: eventFilter.IgnoreInsertValueExpr, - IgnoreUpdateNewValueExpr: eventFilter.IgnoreUpdateNewValueExpr, - IgnoreUpdateOldValueExpr: eventFilter.IgnoreUpdateOldValueExpr, - IgnoreDeleteValueExpr: eventFilter.IgnoreDeleteValueExpr, - }) - } - - return filterConfig + return d.filterConfig } func (d *Dispatcher) GetSyncPointInterval() time.Duration { diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index 2a7abef9..73e90e90 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -19,6 +19,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/ticdc/eventpb" "github.com/pingcap/ticdc/pkg/apperror" "github.com/pingcap/ticdc/pkg/node" @@ -54,7 +55,8 @@ type EventDispatcherManager struct { changefeedID common.ChangeFeedID maintainerID node.ID - config *config.ChangefeedConfig + config *config.ChangefeedConfig + filterConfig *eventpb.FilterConfig // only not nil when enable sync point // TODO: changefeed update config syncPointConfig *syncpoint.SyncPointConfig @@ -122,6 +124,7 @@ func NewEventDispatcherManager( errCh: make(chan error, 1), cancel: cancel, config: cfConfig, + filterConfig: toFilterConfigPB(cfConfig.Filter), schemaIDToDispatchers: dispatcher.NewSchemaIDToDispatchers(), tableEventDispatcherCount: metrics.TableEventDispatcherGauge.WithLabelValues(changefeedID.Namespace(), changefeedID.Name()), metricCreateDispatcherDuration: metrics.CreateDispatcherDuration.WithLabelValues(changefeedID.Namespace(), changefeedID.Name()), @@ -331,7 +334,7 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo) er schemaIds[idx], e.schemaIDToDispatchers, e.syncPointConfig, - e.config.Filter, + e.filterConfig, pdTsList[idx], e.errCh) @@ -657,3 +660,29 @@ func (d *DispatcherMap) ForEach(fn func(id common.DispatcherID, dispatcher *disp return true }) } + +func toFilterConfigPB(filter *config.FilterConfig) *eventpb.FilterConfig { + filterConfig := &eventpb.FilterConfig{ + Rules: filter.Rules, + IgnoreTxnStartTs: filter.IgnoreTxnStartTs, + EventFilters: make([]*eventpb.EventFilterRule, len(filter.EventFilters)), + } + + for _, eventFilter := range filter.EventFilters { + ignoreEvent := make([]string, len(eventFilter.IgnoreEvent)) + for _, event := range eventFilter.IgnoreEvent { + ignoreEvent = append(ignoreEvent, string(event)) + } + filterConfig.EventFilters = append(filterConfig.EventFilters, &eventpb.EventFilterRule{ + Matcher: eventFilter.Matcher, + IgnoreEvent: ignoreEvent, + IgnoreSql: eventFilter.IgnoreSQL, + IgnoreInsertValueExpr: eventFilter.IgnoreInsertValueExpr, + IgnoreUpdateNewValueExpr: eventFilter.IgnoreUpdateNewValueExpr, + IgnoreUpdateOldValueExpr: eventFilter.IgnoreUpdateOldValueExpr, + IgnoreDeleteValueExpr: eventFilter.IgnoreDeleteValueExpr, + }) + } + + return filterConfig +} diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 3af3c069..2d67edf5 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -720,13 +720,9 @@ func (c *eventBroker) getDispatcher(id common.DispatcherID) (*dispatcherStat, bo } func (c *eventBroker) addDispatcher(info DispatcherInfo) { - filterConfig := info.GetFilterConfig() - filter, err := filter.GetSharedFilterStorage().GetOrSetFilter(info.GetChangefeedID(), filterConfig, "", false) - if err != nil { - log.Panic("create filter failed", zap.Error(err), zap.Any("filterConfig", filterConfig)) - } - defer c.metricDispatcherCount.Inc() + filter := info.GetFilter() + start := time.Now() id := info.GetID() span := info.GetTableSpan() diff --git a/pkg/eventservice/event_service.go b/pkg/eventservice/event_service.go index 3b34e760..a4a398fa 100644 --- a/pkg/eventservice/event_service.go +++ b/pkg/eventservice/event_service.go @@ -11,7 +11,7 @@ import ( "github.com/pingcap/ticdc/logservice/schemastore" "github.com/pingcap/ticdc/pkg/common" appcontext "github.com/pingcap/ticdc/pkg/common/context" - "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/messaging" "go.uber.org/zap" ) @@ -33,7 +33,7 @@ type DispatcherInfo interface { GetStartTs() uint64 GetActionType() eventpb.ActionType GetChangefeedID() common.ChangeFeedID - GetFilterConfig() *config.FilterConfig + GetFilter() filter.Filter // sync point related SyncPointEnabled() bool diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 95081edf..d9b4b44e 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -18,6 +18,7 @@ import ( "sync" "github.com/pingcap/log" + "github.com/pingcap/ticdc/eventpb" "github.com/pingcap/ticdc/pkg/apperror" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" @@ -25,6 +26,7 @@ import ( timodel "github.com/pingcap/tidb/pkg/meta/model" tfilter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/pingcap/tiflow/cdc/model" + bf "github.com/pingcap/tiflow/pkg/binlog-filter" "go.uber.org/zap" ) @@ -255,13 +257,39 @@ func GetSharedFilterStorage() *SharedFilterStorage { return storage } -func (s *SharedFilterStorage) GetOrSetFilter(changeFeedID common.ChangeFeedID, filterConfig *config.FilterConfig, tz string, caseSensitive bool) (Filter, error) { +func (s *SharedFilterStorage) GetOrSetFilter( + changeFeedID common.ChangeFeedID, + cfg *eventpb.FilterConfig, + tz string, + caseSensitive bool, +) (Filter, error) { s.mutex.Lock() defer s.mutex.Unlock() if f, ok := s.m[changeFeedID]; ok { return f, nil } - f, err := NewFilter(filterConfig, tz, caseSensitive) + // convert eventpb.FilterConfig to config.FilterConfig + filterCfg := &config.FilterConfig{ + Rules: cfg.Rules, + IgnoreTxnStartTs: cfg.IgnoreTxnStartTs, + } + for _, rule := range cfg.EventFilters { + f := &config.EventFilterRule{ + Matcher: rule.Matcher, + IgnoreSQL: rule.IgnoreSql, + IgnoreInsertValueExpr: rule.IgnoreInsertValueExpr, + IgnoreUpdateNewValueExpr: rule.IgnoreUpdateNewValueExpr, + IgnoreUpdateOldValueExpr: rule.IgnoreUpdateOldValueExpr, + IgnoreDeleteValueExpr: rule.IgnoreDeleteValueExpr, + } + for _, e := range rule.IgnoreEvent { + f.IgnoreEvent = append(f.IgnoreEvent, bf.EventType(e)) + } + + filterCfg.EventFilters = append(filterCfg.EventFilters, f) + } + //generate table filter + f, err := NewFilter(filterCfg, tz, caseSensitive) if err != nil { return nil, err } diff --git a/pkg/messaging/message.go b/pkg/messaging/message.go index f620ff00..b889e5a6 100644 --- a/pkg/messaging/message.go +++ b/pkg/messaging/message.go @@ -5,7 +5,7 @@ import ( "time" "github.com/pingcap/ticdc/logservice/logservicepb" - "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/filter" "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/log" @@ -14,7 +14,6 @@ import ( "github.com/pingcap/ticdc/pkg/apperror" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" - bf "github.com/pingcap/tiflow/pkg/binlog-filter" "go.uber.org/zap" ) @@ -161,31 +160,13 @@ func (r RegisterDispatcherRequest) GetChangefeedID() common.ChangeFeedID { return common.NewChangefeedIDFromPB(r.ChangefeedId) } -func (r RegisterDispatcherRequest) GetFilterConfig() *config.FilterConfig { - cfg := r.RegisterDispatcherRequest.FilterConfig - if cfg == nil { - return nil +func (r RegisterDispatcherRequest) GetFilter() filter.Filter { + changefeedID := r.GetChangefeedID() + filter, err := filter.GetSharedFilterStorage().GetOrSetFilter(changefeedID, r.RegisterDispatcherRequest.FilterConfig, "", false) + if err != nil { + log.Panic("create filter failed", zap.Error(err), zap.Any("filterConfig", r.RegisterDispatcherRequest.FilterConfig)) } - filterCfg := &config.FilterConfig{ - Rules: cfg.Rules, - IgnoreTxnStartTs: cfg.IgnoreTxnStartTs, - } - for _, rule := range cfg.EventFilters { - f := &config.EventFilterRule{ - Matcher: rule.Matcher, - IgnoreSQL: rule.IgnoreSql, - IgnoreInsertValueExpr: rule.IgnoreInsertValueExpr, - IgnoreUpdateNewValueExpr: rule.IgnoreUpdateNewValueExpr, - IgnoreUpdateOldValueExpr: rule.IgnoreUpdateOldValueExpr, - IgnoreDeleteValueExpr: rule.IgnoreDeleteValueExpr, - } - for _, e := range rule.IgnoreEvent { - f.IgnoreEvent = append(f.IgnoreEvent, bf.EventType(e)) - } - - filterCfg.EventFilters = append(filterCfg.EventFilters, f) - } - return filterCfg + return filter } func (r RegisterDispatcherRequest) SyncPointEnabled() bool {