From 808a55494e0025a7c7b073fe3a87622dc7a4a75d Mon Sep 17 00:00:00 2001 From: CharlesCheung Date: Wed, 19 Jul 2023 09:42:00 +0800 Subject: [PATCH] fail fast in capture --- cdc/capture/capture.go | 19 +++++++++++++------ cdc/capture/election.go | 4 ++-- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 16439e1edf8..80b96583aee 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -336,6 +336,8 @@ func (c *captureImpl) run(stdCtx context.Context) error { }() g, stdCtx := errgroup.WithContext(stdCtx) + stdCtx, cancel := context.WithCancel(stdCtx) + ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{ CaptureInfo: c.info, EtcdClient: c.EtcdClient, @@ -343,7 +345,6 @@ func (c *captureImpl) run(stdCtx context.Context) error { MessageRouter: c.MessageRouter, SortEngineFactory: c.sortEngineFactory, }) - g.Go(func() error { // when the campaignOwner returns an error, it means that the owner throws // an unrecoverable serious errors (recoverable errors are intercepted in the owner tick) @@ -359,6 +360,17 @@ func (c *captureImpl) run(stdCtx context.Context) error { }) g.Go(func() error { + // Processor manager should be closed as soon as possible. + defer func() { + if cancel != nil { + // Propagate the cancel signal to the owner and other goroutines. + cancel() + } + if c.processorManager != nil { + c.processorManager.Close() + } + log.Info("processor manager closed", zap.String("captureID", c.info.ID)) + }() processorFlushInterval := time.Duration(c.config.ProcessorFlushInterval) globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID()) @@ -427,7 +439,6 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error { } // Campaign to be the owner, it blocks until it been elected. if err := c.campaign(ctx); err != nil { - rootErr := errors.Cause(err) if rootErr == context.Canceled { return nil @@ -668,10 +679,6 @@ func (c *captureImpl) Close() { c.captureMu.Lock() defer c.captureMu.Unlock() - if c.processorManager != nil { - c.processorManager.Close() - } - log.Info("processor manager closed", zap.String("captureID", c.info.ID)) c.grpcService.Reset(nil) if c.MessageRouter != nil { diff --git a/cdc/capture/election.go b/cdc/capture/election.go index 9012d78e596..6388b1f0696 100644 --- a/cdc/capture/election.go +++ b/cdc/capture/election.go @@ -39,11 +39,11 @@ func newElection(sess *concurrency.Session, key string) election { } } -func (e *electionImpl) campaign(ctx context.Context, key string) error { +func (e *electionImpl) campaign(ctx context.Context, val string) error { failpoint.Inject("capture-campaign-compacted-error", func() { failpoint.Return(errors.Trace(mvcc.ErrCompacted)) }) - return e.election.Campaign(ctx, key) + return e.election.Campaign(ctx, val) } func (e *electionImpl) resign(ctx context.Context) error {