Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788) #5110

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,17 @@ func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *kv.CDC
}

func (c *Capture) reset(ctx context.Context) error {
conf := config.GetGlobalServerConfig()
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create capture session")
}

c.captureMu.Lock()
defer c.captureMu.Unlock()
conf := config.GetGlobalServerConfig()
c.info = &model.CaptureInfo{
ID: uuid.New().String(),
AdvertiseAddr: conf.AdvertiseAddr,
Expand All @@ -93,11 +101,7 @@ func (c *Capture) reset(ctx context.Context) error {
// It can't be handled even after it fails, so we ignore it.
_ = c.session.Close()
}
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}

c.session = sess
c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey)

Expand Down
14 changes: 2 additions & 12 deletions cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand All @@ -42,8 +41,6 @@ const (
apiOpVarCaptureID = "capture_id"
// forWardFromCapture is a header to be set when a request is forwarded from another capture
forWardFromCapture = "TiCDC-ForwardFromCapture"
// getOwnerRetryMaxTime is the retry max time to get an owner
getOwnerRetryMaxTime = 3
)

// HTTPHandler is a HTTPHandler of capture
Expand Down Expand Up @@ -698,16 +695,9 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {

var owner *model.CaptureInfo
// get owner
err := retry.Do(ctx, func() error {
o, err := h.capture.GetOwner(ctx)
if err != nil {
log.Info("get owner failed, retry later", zap.Error(err))
return err
}
owner = o
return nil
}, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime))
owner, err := h.capture.GetOwner(ctx)
if err != nil {
log.Info("get owner failed", zap.Error(err))
_ = c.Error(err)
return
}
Expand Down
56 changes: 40 additions & 16 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ const (
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
// etcdClientTimeoutDuration represents the timeout duration for
// etcd client to execute a remote call
etcdClientTimeoutDuration = 30 * time.Second
)

var (
Expand Down Expand Up @@ -106,20 +106,28 @@ func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) e
}

// Put delegates request to clientv3.KV.Put
func (c *Client) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (resp *clientv3.PutResponse, err error) {
func (c *Client) Put(
ctx context.Context, key, val string, opts ...clientv3.OpOption,
) (resp *clientv3.PutResponse, err error) {
putCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdPut, c.metrics[EtcdPut], func() error {
var inErr error
resp, inErr = c.cli.Put(ctx, key, val, opts...)
resp, inErr = c.cli.Put(putCtx, key, val, opts...)
return inErr
})
return
}

// Get delegates request to clientv3.KV.Get
func (c *Client) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.GetResponse, err error) {
func (c *Client) Get(
ctx context.Context, key string, opts ...clientv3.OpOption,
) (resp *clientv3.GetResponse, err error) {
getCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdGet, c.metrics[EtcdGet], func() error {
var inErr error
resp, inErr = c.cli.Get(ctx, key, opts...)
resp, inErr = c.cli.Get(getCtx, key, opts...)
return inErr
})
return
Expand All @@ -130,8 +138,10 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti
if metric, ok := c.metrics[EtcdDel]; ok {
metric.Inc()
}
delCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
// We don't retry on delete operation. It's dangerous.
return c.cli.Delete(ctx, key, opts...)
return c.cli.Delete(delCtx, key, opts...)
}

// TxnWithoutRetry delegates request to clientv3.KV.Txn
Expand All @@ -144,8 +154,10 @@ func (c *Client) TxnWithoutRetry(ctx context.Context) clientv3.Txn {

// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error,
// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.
func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
func (c *Client) Txn(
ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op,
) (resp *clientv3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error {
var inErr error
Expand All @@ -156,10 +168,14 @@ func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse
}

// Grant delegates request to clientv3.Lease.Grant
func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) {
func (c *Client) Grant(
ctx context.Context, ttl int64,
) (resp *clientv3.LeaseGrantResponse, err error) {
grantCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error {
var inErr error
resp, inErr = c.cli.Grant(ctx, ttl)
resp, inErr = c.cli.Grant(grantCtx, ttl)
return inErr
})
return
Expand Down Expand Up @@ -188,20 +204,28 @@ func isRetryableError(rpcName string) retry.IsRetryable {
}

// Revoke delegates request to clientv3.Lease.Revoke
func (c *Client) Revoke(ctx context.Context, id clientv3.LeaseID) (resp *clientv3.LeaseRevokeResponse, err error) {
func (c *Client) Revoke(
ctx context.Context, id clientv3.LeaseID,
) (resp *clientv3.LeaseRevokeResponse, err error) {
revokeCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
var inErr error
resp, inErr = c.cli.Revoke(ctx, id)
resp, inErr = c.cli.Revoke(revokeCtx, id)
return inErr
})
return
}

// TimeToLive delegates request to clientv3.Lease.TimeToLive
func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption) (resp *clientv3.LeaseTimeToLiveResponse, err error) {
func (c *Client) TimeToLive(
ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption,
) (resp *clientv3.LeaseTimeToLiveResponse, err error) {
timeToLiveCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
var inErr error
resp, inErr = c.cli.TimeToLive(ctx, lease, opts...)
resp, inErr = c.cli.TimeToLive(timeToLiveCtx, lease, opts...)
return inErr
})
return
Expand Down