Skip to content

Commit

Permalink
fix race
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Nov 6, 2024
1 parent 80dbc6a commit c62c882
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 9 deletions.
6 changes: 4 additions & 2 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,11 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent) (block boo
log.Info("Received a stale event, ignore it", zap.Any("event", event), zap.Any("dispatcher", d.id))
}
if event.GetType() == commonEvent.TypeDMLEvent ||
event.GetType() == commonEvent.TypeDDLEvent {
event.GetType() == commonEvent.TypeDDLEvent ||
event.GetType() == commonEvent.TypeHandshakeEvent {
if event.GetSeq() != d.lastEventSeq.Add(1) {
log.Warn("Received a out-of-order event, reset the dispatcher", zap.Any("event", event), zap.Any("dispatcher", d.id))
log.Warn("Received a out-of-order event, reset the dispatcher", zap.Any("dispatcher", d.id),
zap.Uint64("receivedSeq", event.GetSeq()), zap.Uint64("lastEventSeq", d.lastEventSeq.Load()), zap.Any("event", event))
d.reset()
return false
}
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func SetDispatcherTaskScheduler(taskScheduler threadpool.ThreadPool) {
DispatcherTaskScheduler = taskScheduler
}

// EventsHandler is used to dispatcher the events received.
// EventsHandler is used to dispatch the received events.
// If the event is a DML event, it will be added to the sink for writing to downstream.
// If the event is a resolved TS event, it will be update the resolvedTs of the dispatcher.
// If the event is a DDL event,
Expand Down
4 changes: 2 additions & 2 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (e *eventStore) updateMetrics(ctx context.Context) error {
e.dispatcherStates.RLock()
for _, subscriptionStat := range e.dispatcherStates.n {
// resolved ts lag
resolvedTs := subscriptionStat.resolvedTs
resolvedTs := atomic.LoadUint64(&subscriptionStat.resolvedTs)
resolvedPhyTs := oracle.ExtractPhysical(resolvedTs)
resolvedLag := float64(currentPhyTs-resolvedPhyTs) / 1e3
metrics.EventStoreDispatcherResolvedTsLagHist.Observe(float64(resolvedLag))
Expand Down Expand Up @@ -616,7 +616,7 @@ func (e *eventStore) batchAndWriteEvents(ctx context.Context, db *pebble.DB, inp
log.Warn("unknown subscriptionID", zap.Uint64("subID", uint64(subID)))
continue
}
subscriptionStat.resolvedTs = resolvedTs
atomic.StoreUint64(&subscriptionStat.resolvedTs, resolvedTs)
for dispatcherID := range subscriptionStat.ids {
dispatcherStat := e.dispatcherStates.m[dispatcherID]
dispatcherStat.notifier(resolvedTs)
Expand Down
6 changes: 2 additions & 4 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,11 @@ func (m *Maintainer) getNewBootstrapFn() bootstrap.NewBootstrapMessageFn {
zap.Error(err))
}
return func(id node.ID) *messaging.TargetMessage {
var ddlDispatcherID *heartbeatpb.DispatcherID
// only send dispatcher id to dispatcher manager on the same node
if id == m.selfNode.ID {
ddlDispatcherID = m.tableTriggerEventDispatcherID.ToPB()
log.Info("create table event trigger dispatcher", zap.String("changefeed", m.id.String()),
zap.String("server", id.String()),
zap.String("dispatcher id", ddlDispatcherID.String()))
zap.String("dispatcher id", m.tableTriggerEventDispatcherID.String()))
}
log.Info("send maintainer bootstrap message",
zap.String("changefeed", m.id.String()),
Expand All @@ -633,7 +631,7 @@ func (m *Maintainer) getNewBootstrapFn() bootstrap.NewBootstrapMessageFn {
ChangefeedID: m.id.ToPB(),
Config: cfgBytes,
StartTs: m.startCheckpointTs,
TableTriggerEventDispatcherId: ddlDispatcherID,
TableTriggerEventDispatcherId: m.tableTriggerEventDispatcherID.ToPB(),
})
}
}
Expand Down

0 comments on commit c62c882

Please sign in to comment.