Skip to content

Commit

Permalink
capture(ticdc): fix the problem that openapi is blocked when pd is ab…
Browse files Browse the repository at this point in the history
…normal (pingcap#4788)

close pingcap#4778
  • Loading branch information
CharlesCheung96 committed Apr 28, 2022
1 parent 4091e2c commit 06db40b
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 36 deletions.
18 changes: 10 additions & 8 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,17 @@ func NewCapture4Test() *Capture {
}

func (c *Capture) reset(ctx context.Context) error {
conf := config.GetGlobalServerConfig()
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create capture session")
}

c.captureMu.Lock()
defer c.captureMu.Unlock()
conf := config.GetGlobalServerConfig()
c.info = &model.CaptureInfo{
ID: uuid.New().String(),
AdvertiseAddr: conf.AdvertiseAddr,
Expand All @@ -128,13 +136,7 @@ func (c *Capture) reset(ctx context.Context) error {
// It can't be handled even after it fails, so we ignore it.
_ = c.session.Close()
}
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create capture session")
}

c.session = sess
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey)

Expand Down
62 changes: 62 additions & 0 deletions cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package capture

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/pkg/etcd"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
)

func TestReset(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

// init etcd mocker
clientURL, etcdServer, err := etcd.SetupEmbedEtcd(t.TempDir())
require.Nil(t, err)
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{clientURL.String()},
Context: ctx,
DialTimeout: 3 * time.Second,
})
require.NoError(t, err)
client := etcd.NewCDCEtcdClient(ctx, etcdCli)
// Close the client before the test function exits to prevent possible
// ctx leaks.
// Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229
defer client.Close()

cp := NewCapture4Test()
cp.etcdClient = &client

// simulate network isolation scenarios
etcdServer.Close()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
err = cp.reset(ctx)
require.Regexp(t, ".*context canceled.*", err)
wg.Done()
}()
time.Sleep(100 * time.Millisecond)
info := cp.Info()
require.NotNil(t, info)
cancel()
wg.Wait()
}
14 changes: 2 additions & 12 deletions cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand All @@ -41,8 +40,6 @@ const (
apiOpVarCaptureID = "capture_id"
// forWardFromCapture is a header to be set when a request is forwarded from another capture
forWardFromCapture = "TiCDC-ForwardFromCapture"
// getOwnerRetryMaxTime is the retry max time to get an owner
getOwnerRetryMaxTime = 3
)

// HTTPHandler is a HTTPHandler of capture
Expand Down Expand Up @@ -751,16 +748,9 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {

var owner *model.CaptureInfo
// get owner
err := retry.Do(ctx, func() error {
o, err := h.capture.GetOwner(ctx)
if err != nil {
log.Info("get owner failed, retry later", zap.Error(err))
return err
}
owner = o
return nil
}, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime))
owner, err := h.capture.GetOwner(ctx)
if err != nil {
log.Info("get owner failed", zap.Error(err))
_ = c.Error(err)
return
}
Expand Down
56 changes: 40 additions & 16 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ const (
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
// etcdClientTimeoutDuration represents the timeout duration for
// etcd client to execute a remote call
etcdClientTimeoutDuration = 30 * time.Second
)

var (
Expand Down Expand Up @@ -103,20 +103,28 @@ func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) e
}

// Put delegates request to clientv3.KV.Put
func (c *Client) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (resp *clientv3.PutResponse, err error) {
func (c *Client) Put(
ctx context.Context, key, val string, opts ...clientv3.OpOption,
) (resp *clientv3.PutResponse, err error) {
putCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdPut, c.metrics[EtcdPut], func() error {
var inErr error
resp, inErr = c.cli.Put(ctx, key, val, opts...)
resp, inErr = c.cli.Put(putCtx, key, val, opts...)
return inErr
})
return
}

// Get delegates request to clientv3.KV.Get
func (c *Client) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.GetResponse, err error) {
func (c *Client) Get(
ctx context.Context, key string, opts ...clientv3.OpOption,
) (resp *clientv3.GetResponse, err error) {
getCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdGet, c.metrics[EtcdGet], func() error {
var inErr error
resp, inErr = c.cli.Get(ctx, key, opts...)
resp, inErr = c.cli.Get(getCtx, key, opts...)
return inErr
})
return
Expand All @@ -127,14 +135,18 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti
if metric, ok := c.metrics[EtcdDel]; ok {
metric.Inc()
}
delCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
// We don't retry on delete operation. It's dangerous.
return c.cli.Delete(ctx, key, opts...)
return c.cli.Delete(delCtx, key, opts...)
}

// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error,
// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.
func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
func (c *Client) Txn(
ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op,
) (resp *clientv3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error {
var inErr error
Expand All @@ -145,10 +157,14 @@ func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse
}

// Grant delegates request to clientv3.Lease.Grant
func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) {
func (c *Client) Grant(
ctx context.Context, ttl int64,
) (resp *clientv3.LeaseGrantResponse, err error) {
grantCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error {
var inErr error
resp, inErr = c.cli.Grant(ctx, ttl)
resp, inErr = c.cli.Grant(grantCtx, ttl)
return inErr
})
return
Expand Down Expand Up @@ -177,20 +193,28 @@ func isRetryableError(rpcName string) retry.IsRetryable {
}

// Revoke delegates request to clientv3.Lease.Revoke
func (c *Client) Revoke(ctx context.Context, id clientv3.LeaseID) (resp *clientv3.LeaseRevokeResponse, err error) {
func (c *Client) Revoke(
ctx context.Context, id clientv3.LeaseID,
) (resp *clientv3.LeaseRevokeResponse, err error) {
revokeCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
var inErr error
resp, inErr = c.cli.Revoke(ctx, id)
resp, inErr = c.cli.Revoke(revokeCtx, id)
return inErr
})
return
}

// TimeToLive delegates request to clientv3.Lease.TimeToLive
func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption) (resp *clientv3.LeaseTimeToLiveResponse, err error) {
func (c *Client) TimeToLive(
ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption,
) (resp *clientv3.LeaseTimeToLiveResponse, err error) {
timeToLiveCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
var inErr error
resp, inErr = c.cli.TimeToLive(ctx, lease, opts...)
resp, inErr = c.cli.TimeToLive(timeToLiveCtx, lease, opts...)
return inErr
})
return
Expand Down

0 comments on commit 06db40b

Please sign in to comment.