Skip to content

Commit

Permalink
capture (ticdc): fix processor exit unexpectedly when some pd node fa…
Browse files Browse the repository at this point in the history
…il (#8884) (#8900)

ref #8868, close #8877
  • Loading branch information
ti-chi-bot authored May 8, 2023
1 parent 3c6e704 commit 09c43f4
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
18 changes: 11 additions & 7 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -403,11 +404,13 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
}
// Campaign to be the owner, it blocks until it been elected.
if err := c.campaign(ctx); err != nil {
switch errors.Cause(err) {
case context.Canceled:

rootErr := errors.Cause(err)
if rootErr == context.Canceled {
return nil
case mvcc.ErrCompacted:
// the revision we requested is compacted, just retry
} else if rootErr == mvcc.ErrCompacted || isErrCompacted(rootErr) {
log.Warn("campaign owner failed due to etcd revision "+
"has been compacted, retry later", zap.Error(err))
continue
}
log.Warn("campaign owner failed",
Expand Down Expand Up @@ -550,9 +553,6 @@ func (c *captureImpl) GetOwner() (owner.Owner, error) {

// campaign to be an owner.
func (c *captureImpl) campaign(ctx context.Context) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
// TODO: `Campaign` will get stuck when send SIGSTOP to pd leader.
// For `Campaign`, when send SIGSTOP to pd leader, cdc maybe call `cancel`
// (cause by `processor routine` exit). And inside `Campaign`, the routine
Expand Down Expand Up @@ -714,3 +714,7 @@ func (c *captureImpl) StatusProvider() owner.StatusProvider {
func (c *captureImpl) IsReady() bool {
return c.migrator.IsMigrateDone()
}

func isErrCompacted(err error) bool {
return strings.Contains(err.Error(), "required revision has been compacted")
}
6 changes: 6 additions & 0 deletions cdc/capture/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package capture
import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
)

// election wraps the owner election methods.
Expand All @@ -37,6 +40,9 @@ func newElection(sess *concurrency.Session, key string) election {
}

func (e *electionImpl) campaign(ctx context.Context, key string) error {
failpoint.Inject("capture-campaign-compacted-error", func() {
failpoint.Return(errors.Trace(mvcc.ErrCompacted))
})
return e.election.Campaign(ctx, key)
}

Expand Down
1 change: 1 addition & 0 deletions tests/integration_tests/availability/owner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ function test_owner_cleanup_stale_tasks() {
function test_owner_retryable_error() {
echo "run test case test_owner_retryable_error"
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/capture/capture-campaign-compacted-error=1*return(true)'

# start a capture server
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_retryable_error.server1
# ensure the server become the owner
Expand Down

0 comments on commit 09c43f4

Please sign in to comment.