Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]Pass Span as pointer #577

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 3 additions & 30 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/sink/util"
"github.com/pingcap/tiflow/pkg/spanz"
"go.uber.org/zap"
Expand Down Expand Up @@ -81,7 +80,7 @@ type Dispatcher struct {
// componentStatus is the status of the dispatcher, such as working, removing, stopped.
componentStatus *ComponentStateWithMutex
// the config of filter
filterConfig *config.FilterConfig
filterConfig *eventpb.FilterConfig

// tableInfo is the latest table info of the dispatcher
tableInfo *common.TableInfo
Expand Down Expand Up @@ -139,7 +138,7 @@ func NewDispatcher(
schemaID int64,
schemaIDToDispatchers *SchemaIDToDispatchers,
syncPointConfig *syncpoint.SyncPointConfig,
filterConfig *config.FilterConfig,
filterConfig *eventpb.FilterConfig,
currentPdTs uint64,
errCh chan error) *Dispatcher {
dispatcher := &Dispatcher{
Expand Down Expand Up @@ -499,33 +498,7 @@ func (d *Dispatcher) EnableSyncPoint() bool {
}

func (d *Dispatcher) GetFilterConfig() *eventpb.FilterConfig {
return toFilterConfigPB(d.filterConfig)
}

func toFilterConfigPB(filter *config.FilterConfig) *eventpb.FilterConfig {
filterConfig := &eventpb.FilterConfig{
Rules: filter.Rules,
IgnoreTxnStartTs: filter.IgnoreTxnStartTs,
EventFilters: make([]*eventpb.EventFilterRule, len(filter.EventFilters)),
}

for _, eventFilter := range filter.EventFilters {
ignoreEvent := make([]string, len(eventFilter.IgnoreEvent))
for _, event := range eventFilter.IgnoreEvent {
ignoreEvent = append(ignoreEvent, string(event))
}
filterConfig.EventFilters = append(filterConfig.EventFilters, &eventpb.EventFilterRule{
Matcher: eventFilter.Matcher,
IgnoreEvent: ignoreEvent,
IgnoreSql: eventFilter.IgnoreSQL,
IgnoreInsertValueExpr: eventFilter.IgnoreInsertValueExpr,
IgnoreUpdateNewValueExpr: eventFilter.IgnoreUpdateNewValueExpr,
IgnoreUpdateOldValueExpr: eventFilter.IgnoreUpdateOldValueExpr,
IgnoreDeleteValueExpr: eventFilter.IgnoreDeleteValueExpr,
})
}

return filterConfig
return d.filterConfig
}

func (d *Dispatcher) GetSyncPointInterval() time.Duration {
Expand Down
33 changes: 31 additions & 2 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/ticdc/eventpb"
"github.com/pingcap/ticdc/pkg/apperror"
"github.com/pingcap/ticdc/pkg/node"

Expand Down Expand Up @@ -54,7 +55,8 @@ type EventDispatcherManager struct {
changefeedID common.ChangeFeedID
maintainerID node.ID

config *config.ChangefeedConfig
config *config.ChangefeedConfig
filterConfig *eventpb.FilterConfig
// only not nil when enable sync point
// TODO: changefeed update config
syncPointConfig *syncpoint.SyncPointConfig
Expand Down Expand Up @@ -122,6 +124,7 @@ func NewEventDispatcherManager(
errCh: make(chan error, 1),
cancel: cancel,
config: cfConfig,
filterConfig: toFilterConfigPB(cfConfig.Filter),
schemaIDToDispatchers: dispatcher.NewSchemaIDToDispatchers(),
tableEventDispatcherCount: metrics.TableEventDispatcherGauge.WithLabelValues(changefeedID.Namespace(), changefeedID.Name()),
metricCreateDispatcherDuration: metrics.CreateDispatcherDuration.WithLabelValues(changefeedID.Namespace(), changefeedID.Name()),
Expand Down Expand Up @@ -331,7 +334,7 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo) er
schemaIds[idx],
e.schemaIDToDispatchers,
e.syncPointConfig,
e.config.Filter,
e.filterConfig,
pdTsList[idx],
e.errCh)

Expand Down Expand Up @@ -657,3 +660,29 @@ func (d *DispatcherMap) ForEach(fn func(id common.DispatcherID, dispatcher *disp
return true
})
}

func toFilterConfigPB(filter *config.FilterConfig) *eventpb.FilterConfig {
filterConfig := &eventpb.FilterConfig{
Rules: filter.Rules,
IgnoreTxnStartTs: filter.IgnoreTxnStartTs,
EventFilters: make([]*eventpb.EventFilterRule, len(filter.EventFilters)),
}

for _, eventFilter := range filter.EventFilters {
ignoreEvent := make([]string, len(eventFilter.IgnoreEvent))
for _, event := range eventFilter.IgnoreEvent {
ignoreEvent = append(ignoreEvent, string(event))
}
filterConfig.EventFilters = append(filterConfig.EventFilters, &eventpb.EventFilterRule{
Matcher: eventFilter.Matcher,
IgnoreEvent: ignoreEvent,
IgnoreSql: eventFilter.IgnoreSQL,
IgnoreInsertValueExpr: eventFilter.IgnoreInsertValueExpr,
IgnoreUpdateNewValueExpr: eventFilter.IgnoreUpdateNewValueExpr,
IgnoreUpdateOldValueExpr: eventFilter.IgnoreUpdateOldValueExpr,
IgnoreDeleteValueExpr: eventFilter.IgnoreDeleteValueExpr,
})
}

return filterConfig
}
2 changes: 1 addition & 1 deletion logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (e *eventStore) RegisterDispatcher(
// maxEventCommitTs may not be updated correctly and cause data loss.(lost resolved ts is harmless)
// To fix it, we need to alloc subID and initialize dispatcherStat before puller may send events.
// That is allocate subID in a separate method.
stat.subID = e.puller.Subscribe(*tableSpan, startTs, subscriptionTag{
stat.subID = e.puller.Subscribe(tableSpan, startTs, subscriptionTag{
chIndex: chIndex,
tableID: tableSpan.TableID,
uniqueKeyID: uniqueKeyID,
Expand Down
4 changes: 2 additions & 2 deletions logservice/logpuller/log_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (
type spanProgress struct {
client *SubscriptionClient

span heartbeatpb.TableSpan
span *heartbeatpb.TableSpan

subID SubscriptionID

Expand Down Expand Up @@ -157,7 +157,7 @@ func (p *LogPuller) Close(ctx context.Context) error {
}

func (p *LogPuller) Subscribe(
span heartbeatpb.TableSpan,
span *heartbeatpb.TableSpan,
startTs uint64,
tag interface{},
) SubscriptionID {
Expand Down
2 changes: 1 addition & 1 deletion logservice/logpuller/log_puller_multi_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type LogPullerMultiSpan struct {
func NewLogPullerMultiSpan(
client *SubscriptionClient,
pdClock pdutil.Clock,
spans []heartbeatpb.TableSpan,
spans []*heartbeatpb.TableSpan,
startTs uint64,
consume func(context.Context, *common.RawKVEntry) error,
) *LogPullerMultiSpan {
Expand Down
2 changes: 1 addition & 1 deletion logservice/logpuller/region_change_event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (w *changeEventProcessor) doHandle(
zap.Any("tableID", tableID),
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.requestID),
zap.Stringer("span", &state.region.span))
zap.Stringer("span", state.region.span))

for _, cachedEvent := range state.matcher.matchCachedRow(true) {
revent, err := w.assembleRowEvent(regionID, cachedEvent)
Expand Down
6 changes: 3 additions & 3 deletions logservice/logpuller/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type regionInfo struct {
// For instance, consider region-1 with a span of [a, d).
// It contains 3 tables: t1[a, b), t2[b,c), and t3[c,d).
// If only table t1 is subscribed to, then the span of interest is [a,b).
span heartbeatpb.TableSpan
span *heartbeatpb.TableSpan
rpcCtx *tikv.RPCContext

// The table that the region belongs to.
Expand All @@ -52,7 +52,7 @@ func (s *regionInfo) isStopped() bool {

func newRegionInfo(
verID tikv.RegionVerID,
span heartbeatpb.TableSpan,
span *heartbeatpb.TableSpan,
rpcCtx *tikv.RPCContext,
subscribedSpan *subscribedSpan,
) regionInfo {
Expand Down Expand Up @@ -182,6 +182,6 @@ func (s *regionFeedState) getRegionInfo() regionInfo {
return s.region
}

func (s *regionFeedState) getRegionMeta() (uint64, heartbeatpb.TableSpan, string) {
func (s *regionFeedState) getRegionMeta() (uint64, *heartbeatpb.TableSpan, string) {
return s.region.verID.GetID(), s.region.span, s.region.rpcCtx.Addr
}
17 changes: 9 additions & 8 deletions logservice/logpuller/regionlock/region_range_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type LockRangeResult struct {

// RetryRanges is only used when Status is LockRangeStatusStale.
// It contains the ranges that should be retried to lock.
RetryRanges []heartbeatpb.TableSpan
RetryRanges []*heartbeatpb.TableSpan
}

// LockedRangeState is used to access the real-time state changes of a locked range.
Expand Down Expand Up @@ -109,7 +109,7 @@ type RangeLock struct {
// ID to identify different RangeLock instances, so logs of different instances can be distinguished.
id uint64
// totalSpan is the total range of the table, totalSpan = unlockedRanges + lockedRanges
totalSpan heartbeatpb.TableSpan
totalSpan *heartbeatpb.TableSpan

mu sync.RWMutex
// unlockedRanges is used to store the resolvedTs of unlocked ranges.
Expand All @@ -124,12 +124,13 @@ type RangeLock struct {
// NewRangeLock creates a new RangeLock.
func NewRangeLock(
id uint64,
startKey, endKey []byte, startTs uint64,
span *heartbeatpb.TableSpan,
startTs uint64,
) *RangeLock {
return &RangeLock{
id: id,
totalSpan: heartbeatpb.TableSpan{StartKey: startKey, EndKey: endKey},
unlockedRanges: newRangeTsMap(startKey, endKey, startTs),
totalSpan: span,
unlockedRanges: newRangeTsMap(span.StartKey, span.EndKey, startTs),
lockedRanges: btree.NewG(16, rangeLockEntryLess),
regionIDToLockedRanges: make(map[uint64]*rangeLockEntry),
}
Expand Down Expand Up @@ -456,7 +457,7 @@ func (l *RangeLock) tryLockRange(startKey, endKey []byte, regionID, regionVersio
// If the range is stale, we should return the overlapping ranges to the caller,
// so that the caller can retry to lock the rest of the range.
if isStale {
retryRanges := make([]heartbeatpb.TableSpan, 0)
retryRanges := make([]*heartbeatpb.TableSpan, 0)
currentRangeStartKey := startKey

log.Info("try lock range staled",
Expand All @@ -474,13 +475,13 @@ func (l *RangeLock) tryLockRange(startKey, endKey []byte, regionID, regionVersio
// The rest should come from range searching and is sorted in increasing order, and they
// must intersect with the current given range.
if bytes.Compare(currentRangeStartKey, r.startKey) < 0 {
retryRanges = append(retryRanges, heartbeatpb.TableSpan{StartKey: currentRangeStartKey, EndKey: r.startKey})
retryRanges = append(retryRanges, &heartbeatpb.TableSpan{StartKey: currentRangeStartKey, EndKey: r.startKey})
}
currentRangeStartKey = r.endKey
}

if bytes.Compare(currentRangeStartKey, endKey) < 0 {
retryRanges = append(retryRanges, heartbeatpb.TableSpan{StartKey: currentRangeStartKey, EndKey: endKey})
retryRanges = append(retryRanges, &heartbeatpb.TableSpan{StartKey: currentRangeStartKey, EndKey: endKey})
}

return LockRangeResult{
Expand Down
4 changes: 2 additions & 2 deletions logservice/logpuller/regionlock/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import (
)

// CheckRegionsLeftCover checks whether the regions cover the left part of given span
func CheckRegionsLeftCover(regions []*metapb.Region, span heartbeatpb.TableSpan) bool {
func CheckRegionsLeftCover(regions []*metapb.Region, span *heartbeatpb.TableSpan) bool {
subRegions := CutRegionsLeftCoverSpan(regions, span)
return len(regions) > 0 && len(subRegions) == len(regions)
}

// CutRegionsLeftCoverSpan processes a list of regions to remove those that
// do not cover the specified span or are discontinuous with the previous region.
// It returns a new slice containing only the continuous regions that cover the span.
func CutRegionsLeftCoverSpan(regions []*metapb.Region, spanToCover heartbeatpb.TableSpan) []*metapb.Region {
func CutRegionsLeftCoverSpan(regions []*metapb.Region, spanToCover *heartbeatpb.TableSpan) []*metapb.Region {
if len(regions) == 0 {
return nil
}
Expand Down
16 changes: 8 additions & 8 deletions logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type resolveLockTask struct {
// rangeTask represents a task to subscribe a range span of a table.
// It can be a part of a table or a whole table, it also can be a part of a region.
type rangeTask struct {
span heartbeatpb.TableSpan
span *heartbeatpb.TableSpan
subscribedSpan *subscribedSpan
}

Expand All @@ -117,7 +117,7 @@ type subscribedSpan struct {
startTs tablepb.Ts

// The target span
span heartbeatpb.TableSpan
span *heartbeatpb.TableSpan
// The range lock of the span,
// it is used to prevent duplicate requests to the same region range,
// and it also used to calculate this table's resolvedTs.
Expand Down Expand Up @@ -263,7 +263,7 @@ func (s *SubscriptionClient) initMetrics() {
// It new a subscribedSpan and store it in `s.totalSpans`,
// and send a rangeTask to `s.rangeTaskCh`.
// The rangeTask will be handled in `handleRangeTasks` goroutine.
func (s *SubscriptionClient) Subscribe(subID SubscriptionID, span heartbeatpb.TableSpan, startTs uint64) {
func (s *SubscriptionClient) Subscribe(subID SubscriptionID, span *heartbeatpb.TableSpan, startTs uint64) {
if span.TableID == 0 {
log.Panic("subscription client subscribe with zero TableID")
}
Expand Down Expand Up @@ -501,7 +501,7 @@ func (s *SubscriptionClient) handleRangeTasks(ctx context.Context) error {
// 3. Schedule a region request to subscribe the region.
func (s *SubscriptionClient) divideSpanAndScheduleRegionRequests(
ctx context.Context,
span heartbeatpb.TableSpan,
span *heartbeatpb.TableSpan,
subscribedSpan *subscribedSpan,
) error {
// Limit the number of regions loaded at a time to make the load more stable.
Expand Down Expand Up @@ -549,7 +549,7 @@ func (s *SubscriptionClient) divideSpanAndScheduleRegionRequests(
}

for _, regionMeta := range regionMetas {
regionSpan := heartbeatpb.TableSpan{
regionSpan := &heartbeatpb.TableSpan{
StartKey: regionMeta.StartKey,
EndKey: regionMeta.EndKey,
}
Expand Down Expand Up @@ -609,7 +609,7 @@ func (s *SubscriptionClient) scheduleRegionRequest(ctx context.Context, region r
}

func (s *SubscriptionClient) scheduleRangeRequest(
ctx context.Context, span heartbeatpb.TableSpan,
ctx context.Context, span *heartbeatpb.TableSpan,
subscribedSpan *subscribedSpan,
) {
select {
Expand Down Expand Up @@ -805,10 +805,10 @@ func (s *SubscriptionClient) logSlowRegions(ctx context.Context) error {

func (s *SubscriptionClient) newSubscribedSpan(
subID SubscriptionID,
span heartbeatpb.TableSpan,
span *heartbeatpb.TableSpan,
startTs uint64,
) *subscribedSpan {
rangeLock := regionlock.NewRangeLock(uint64(subID), span.StartKey, span.EndKey, startTs)
rangeLock := regionlock.NewRangeLock(uint64(subID), span, startTs)

rt := &subscribedSpan{
subID: subID,
Expand Down
8 changes: 4 additions & 4 deletions logservice/schemastore/ddl_job_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,16 @@ const (
JobHistoryID = ddl.HistoryTableID
)

func getAllDDLSpan() []heartbeatpb.TableSpan {
spans := make([]heartbeatpb.TableSpan, 0, 2)
func getAllDDLSpan() []*heartbeatpb.TableSpan {
spans := make([]*heartbeatpb.TableSpan, 0, 2)
start, end := common.GetTableRange(JobTableID)
spans = append(spans, heartbeatpb.TableSpan{
spans = append(spans, &heartbeatpb.TableSpan{
TableID: JobTableID,
StartKey: common.ToComparableKey(start),
EndKey: common.ToComparableKey(end),
})
start, end = common.GetTableRange(JobHistoryID)
spans = append(spans, heartbeatpb.TableSpan{
spans = append(spans, &heartbeatpb.TableSpan{
TableID: JobHistoryID,
StartKey: common.ToComparableKey(start),
EndKey: common.ToComparableKey(end),
Expand Down
Loading
Loading