Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#9396
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Jul 19, 2023
1 parent aa97333 commit f8ba913
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 25 deletions.
44 changes: 30 additions & 14 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ func (c *captureImpl) run(stdCtx context.Context) error {
}()

g, stdCtx := errgroup.WithContext(stdCtx)
stdCtx, cancel := context.WithCancel(stdCtx)

ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
Expand All @@ -335,7 +337,6 @@ func (c *captureImpl) run(stdCtx context.Context) error {
SorterSystem: c.sorterSystem,
SortEngineFactory: c.sortEngineFactory,
})

g.Go(func() error {
// when the campaignOwner returns an error, it means that the owner throws
// an unrecoverable serious errors (recoverable errors are intercepted in the owner tick)
Expand All @@ -351,6 +352,17 @@ func (c *captureImpl) run(stdCtx context.Context) error {
})

g.Go(func() error {
// Processor manager should be closed as soon as possible to prevent double write issue.
defer func() {
if cancel != nil {
// Propagate the cancel signal to the owner and other goroutines.
cancel()
}
if c.processorManager != nil {
c.processorManager.Close()
}
log.Info("processor manager closed", zap.String("captureID", c.info.ID))
}()
processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
Expand Down Expand Up @@ -419,7 +431,6 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
// Campaign to be the owner, it blocks until it been elected.
if err := c.campaign(ctx); err != nil {

rootErr := errors.Cause(err)
if rootErr == context.Canceled {
return nil
Expand Down Expand Up @@ -491,21 +502,23 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
c.owner.AsyncStop()
c.setOwner(nil)

// if owner exits, resign the owner key,
// use a new context to prevent the context from being cancelled.
resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if resignErr := c.resign(resignCtx); resignErr != nil {
if errors.Cause(resignErr) != context.DeadlineExceeded {
log.Info("owner resign failed", zap.String("captureID", c.info.ID),
if !cerror.ErrNotOwner.Equal(err) {
// if owner exits, resign the owner key,
// use a new context to prevent the context from being cancelled.
resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if resignErr := c.resign(resignCtx); resignErr != nil {
if errors.Cause(resignErr) != context.DeadlineExceeded {
log.Info("owner resign failed", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
return errors.Trace(resignErr)
}

log.Warn("owner resign timeout", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
return errors.Trace(resignErr)
}

log.Warn("owner resign timeout", zap.String("captureID", c.info.ID),
zap.Error(resignErr), zap.Int64("ownerRev", ownerRev))
cancel()
}
cancel()

log.Info("owner resigned successfully",
zap.String("captureID", c.info.ID), zap.Int64("ownerRev", ownerRev))
Expand Down Expand Up @@ -622,10 +635,13 @@ func (c *captureImpl) AsyncClose() {

c.captureMu.Lock()
defer c.captureMu.Unlock()
<<<<<<< HEAD
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
log.Info("processor manager closed", zap.String("captureID", c.info.ID))
=======
>>>>>>> 80aa4a2426 (owner(ticdc): do not resign owner when ErrNotOwner is encountered (#9396))

c.grpcService.Reset(nil)
if c.MessageRouter != nil {
Expand Down
4 changes: 2 additions & 2 deletions cdc/capture/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func newElection(sess *concurrency.Session, key string) election {
}
}

func (e *electionImpl) campaign(ctx context.Context, key string) error {
func (e *electionImpl) campaign(ctx context.Context, val string) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
return e.election.Campaign(ctx, key)
return e.election.Campaign(ctx, val)
}

func (e *electionImpl) resign(ctx context.Context) error {
Expand Down
8 changes: 0 additions & 8 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,14 +411,6 @@ func (o *ownerImpl) updateMetrics() {
changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID).
Set(float64(cf.state.Info.State.ToInt()))
}

// The InfoProvider is a proxy object returning information
// from the scheduler.
infoProvider := cf.GetInfoProvider()
if infoProvider == nil {
// The scheduler has not been initialized yet.
continue
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/etcd_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session,
if err != nil {
// This error means owner is resigned by itself,
// and we should exit etcd worker and campaign owner again.
return nil
return err
}
}

Expand Down

0 comments on commit f8ba913

Please sign in to comment.