diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 7c412f30bd7..cd91d679a3c 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -326,6 +326,8 @@ func (c *captureImpl) run(stdCtx context.Context) error { }() g, stdCtx := errgroup.WithContext(stdCtx) + stdCtx, cancel := context.WithCancel(stdCtx) + ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ CaptureInfo: c.info, EtcdClient: c.EtcdClient, @@ -335,7 +337,6 @@ func (c *captureImpl) run(stdCtx context.Context) error { SorterSystem: c.sorterSystem, SortEngineFactory: c.sortEngineFactory, }) - g.Go(func() error { // when the campaignOwner returns an error, it means that the owner throws // an unrecoverable serious errors (recoverable errors are intercepted in the owner tick) @@ -351,6 +352,17 @@ func (c *captureImpl) run(stdCtx context.Context) error { }) g.Go(func() error { + // Processor manager should be closed as soon as possible to prevent double write issue. + defer func() { + if cancel != nil { + // Propagate the cancel signal to the owner and other goroutines. + cancel() + } + if c.processorManager != nil { + c.processorManager.Close() + } + log.Info("processor manager closed", zap.String("captureID", c.info.ID)) + }() processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval) globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()) @@ -419,7 +431,6 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { } // Campaign to be the owner, it blocks until it been elected. if err := c.campaign(ctx); err != nil { - rootErr := errors.Cause(err) if rootErr == context.Canceled { return nil @@ -491,21 +502,23 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { c.owner.AsyncStop() c.setOwner(nil) - // if owner exits, resign the owner key, - // use a new context to prevent the context from being cancelled. - resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - if resignErr := c.resign(resignCtx); resignErr != nil { - if errors.Cause(resignErr) != context.DeadlineExceeded { - log.Info("owner resign failed", zap.String("captureID", c.info.ID), + if !cerror.ErrNotOwner.Equal(err) { + // if owner exits, resign the owner key, + // use a new context to prevent the context from being cancelled. + resignCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if resignErr := c.resign(resignCtx); resignErr != nil { + if errors.Cause(resignErr) != context.DeadlineExceeded { + log.Info("owner resign failed", zap.String("captureID", c.info.ID), + zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) + cancel() + return errors.Trace(resignErr) + } + + log.Warn("owner resign timeout", zap.String("captureID", c.info.ID), zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) - cancel() - return errors.Trace(resignErr) } - - log.Warn("owner resign timeout", zap.String("captureID", c.info.ID), - zap.Error(resignErr), zap.Int64("ownerRev", ownerRev)) + cancel() } - cancel() log.Info("owner resigned successfully", zap.String("captureID", c.info.ID), zap.Int64("ownerRev", ownerRev)) @@ -622,10 +635,13 @@ func (c *captureImpl) AsyncClose() { c.captureMu.Lock() defer c.captureMu.Unlock() +<<<<<<< HEAD if c.processorManager != nil { c.processorManager.AsyncClose() } log.Info("processor manager closed", zap.String("captureID", c.info.ID)) +======= +>>>>>>> 80aa4a2426 (owner(ticdc): do not resign owner when ErrNotOwner is encountered (#9396)) c.grpcService.Reset(nil) if c.MessageRouter != nil { diff --git a/cdc/capture/election.go b/cdc/capture/election.go index 9012d78e596..6388b1f0696 100644 --- a/cdc/capture/election.go +++ b/cdc/capture/election.go @@ -39,11 +39,11 @@ func newElection(sess *concurrency.Session, key string) election { } } -func (e *electionImpl) campaign(ctx context.Context, key string) error { +func (e *electionImpl) campaign(ctx context.Context, val string) error { failpoint.Inject("capture-campaign-compacted-error", func() { failpoint.Return(errors.Trace(mvcc.ErrCompacted)) }) - return e.election.Campaign(ctx, key) + return e.election.Campaign(ctx, val) } func (e *electionImpl) resign(ctx context.Context) error { diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 9b4756d551c..96d80ba934b 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -411,14 +411,6 @@ func (o *ownerImpl) updateMetrics() { changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID). Set(float64(cf.state.Info.State.ToInt())) } - - // The InfoProvider is a proxy object returning information - // from the scheduler. - infoProvider := cf.GetInfoProvider() - if infoProvider == nil { - // The scheduler has not been initialized yet. - continue - } } } diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index f71f425ac1c..e8c252feaff 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -232,7 +232,7 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if err != nil { // This error means owner is resigned by itself, // and we should exit etcd worker and campaign owner again. - return nil + return err } }