Skip to content

Commit

Permalink
cdc: retry on resign the owner, if old owner in power again. (#2036)
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Sep 13, 2022
1 parent 3671d82 commit fb81eac
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
17 changes: 16 additions & 1 deletion pkg/cluster/api/cdcapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,18 @@ func (c *CDCOpenAPIClient) DrainCapture(target string, apiTimeoutSeconds int) er
}

// ResignOwner resign the cdc owner, and wait for a new owner be found
func (c *CDCOpenAPIClient) ResignOwner() error {
// address is the current owner's address
func (c *CDCOpenAPIClient) ResignOwner(address string) error {
err := utils.Retry(func() error {
return resignOwner(c, address)
}, utils.RetryOption{
Delay: 2 * time.Second,
Timeout: 10 * time.Second,
})
return err
}

func resignOwner(c *CDCOpenAPIClient, addr string) error {
api := "api/v1/owner/resign"
endpoints := c.getEndpoints(api)
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
Expand All @@ -155,6 +166,10 @@ func (c *CDCOpenAPIClient) ResignOwner() error {
return err
}

if owner.AdvertiseAddr == addr {
return fmt.Errorf("old owner in power again, resign again, owner: %+v", owner)
}

c.l().Debugf("cdc resign owner successfully, and new owner found, owner: %+v", owner)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,12 @@ func (i *CDCInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutS
// this may happen if the capture crashed right away.
if !found {
logger.Debugf("cdc pre-restart finished, cannot found the capture, trigger hard restart, "+
"addr: %s, captureID: %s, elapsed: %+v", address, captureID, time.Since(start))
"addr: %s, elapsed: %+v", address, time.Since(start))
return nil
}

if isOwner {
if err := client.ResignOwner(); err != nil {
if err := client.ResignOwner(address); err != nil {
// if resign the owner failed, no more need to drain the current capture,
// since it's not allowed by the cdc.
// return nil to trigger hard restart.
Expand Down

0 comments on commit fb81eac

Please sign in to comment.