From fb81eacbdd5549eed85024e7bf342519834a5f2a Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Tue, 13 Sep 2022 11:30:58 +0800 Subject: [PATCH] cdc: retry on resign the owner, if old owner in power again. (#2036) --- pkg/cluster/api/cdcapi.go | 17 ++++++++++++++++- pkg/cluster/spec/cdc.go | 4 ++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/pkg/cluster/api/cdcapi.go b/pkg/cluster/api/cdcapi.go index c3e09886fe..15af059519 100644 --- a/pkg/cluster/api/cdcapi.go +++ b/pkg/cluster/api/cdcapi.go @@ -131,7 +131,18 @@ func (c *CDCOpenAPIClient) DrainCapture(target string, apiTimeoutSeconds int) er } // ResignOwner resign the cdc owner, and wait for a new owner be found -func (c *CDCOpenAPIClient) ResignOwner() error { +// address is the current owner's address +func (c *CDCOpenAPIClient) ResignOwner(address string) error { + err := utils.Retry(func() error { + return resignOwner(c, address) + }, utils.RetryOption{ + Delay: 2 * time.Second, + Timeout: 10 * time.Second, + }) + return err +} + +func resignOwner(c *CDCOpenAPIClient, addr string) error { api := "api/v1/owner/resign" endpoints := c.getEndpoints(api) _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { @@ -155,6 +166,10 @@ func (c *CDCOpenAPIClient) ResignOwner() error { return err } + if owner.AdvertiseAddr == addr { + return fmt.Errorf("old owner in power again, resign again, owner: %+v", owner) + } + c.l().Debugf("cdc resign owner successfully, and new owner found, owner: %+v", owner) return nil } diff --git a/pkg/cluster/spec/cdc.go b/pkg/cluster/spec/cdc.go index eafb011af8..b03f2ef956 100644 --- a/pkg/cluster/spec/cdc.go +++ b/pkg/cluster/spec/cdc.go @@ -269,12 +269,12 @@ func (i *CDCInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutS // this may happen if the capture crashed right away. if !found { logger.Debugf("cdc pre-restart finished, cannot found the capture, trigger hard restart, "+ - "addr: %s, captureID: %s, elapsed: %+v", address, captureID, time.Since(start)) + "addr: %s, elapsed: %+v", address, time.Since(start)) return nil } if isOwner { - if err := client.ResignOwner(); err != nil { + if err := client.ResignOwner(address); err != nil { // if resign the owner failed, no more need to drain the current capture, // since it's not allowed by the cdc. // return nil to trigger hard restart.