Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4788
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Apr 2, 2022
1 parent 2817747 commit 3ba67f6
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 12 deletions.
13 changes: 12 additions & 1 deletion cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,17 @@ func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *kv.CDC
}

func (c *Capture) reset(ctx context.Context) error {
conf := config.GetGlobalServerConfig()
sess, err := concurrency.NewSession(c.EtcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create capture session")
}

c.captureMu.Lock()
defer c.captureMu.Unlock()
conf := config.GetGlobalServerConfig()
c.info = &model.CaptureInfo{
ID: uuid.New().String(),
AdvertiseAddr: conf.AdvertiseAddr,
Expand All @@ -93,11 +101,14 @@ func (c *Capture) reset(ctx context.Context) error {
// It can't be handled even after it fails, so we ignore it.
_ = c.session.Close()
}
<<<<<<< HEAD
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}
=======
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788))
c.session = sess
c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey)

Expand Down
68 changes: 68 additions & 0 deletions cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package capture

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/pkg/etcd"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func TestReset(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

// init etcd mocker
clientURL, etcdServer, err := etcd.SetupEmbedEtcd(t.TempDir())
require.Nil(t, err)
logConfig := logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel)
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{clientURL.String()},
Context: ctx,
LogConfig: &logConfig,
DialTimeout: 3 * time.Second,
})
require.NoError(t, err)
client := etcd.NewCDCEtcdClient(ctx, etcdCli)
// Close the client before the test function exits to prevent possible
// ctx leaks.
// Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229
defer client.Close()

cp := NewCapture4Test(nil)
cp.EtcdClient = &client

// simulate network isolation scenarios
etcdServer.Close()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
err = cp.reset(ctx)
require.Regexp(t, ".*context canceled.*", err)
wg.Done()
}()
time.Sleep(100 * time.Millisecond)
info := cp.Info()
require.NotNil(t, info)
cancel()
wg.Wait()
}
8 changes: 5 additions & 3 deletions cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand All @@ -42,8 +41,6 @@ const (
apiOpVarCaptureID = "capture_id"
// forWardFromCapture is a header to be set when a request is forwarded from another capture
forWardFromCapture = "TiCDC-ForwardFromCapture"
// getOwnerRetryMaxTime is the retry max time to get an owner
getOwnerRetryMaxTime = 3
)

// HTTPHandler is a HTTPHandler of capture
Expand Down Expand Up @@ -698,6 +695,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {

var owner *model.CaptureInfo
// get owner
<<<<<<< HEAD:cdc/capture/http_handler.go
err := retry.Do(ctx, func() error {
o, err := h.capture.GetOwner(ctx)
if err != nil {
Expand All @@ -707,7 +705,11 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {
owner = o
return nil
}, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime))
=======
owner, err := h.capture.GetOwnerCaptureInfo(ctx)
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)):cdc/api/open.go
if err != nil {
log.Info("get owner failed", zap.Error(err))
_ = c.Error(err)
return
}
Expand Down
12 changes: 12 additions & 0 deletions cdc/capture/main_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
<<<<<<< HEAD
// Copyright 2021 PingCAP, Inc.
=======
// Copyright 2022 PingCAP, Inc.
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788))
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,6 +20,7 @@ package capture
import (
"testing"

<<<<<<< HEAD
"go.uber.org/goleak"
)

Expand All @@ -27,4 +32,11 @@ func TestMain(m *testing.M) {
}

goleak.VerifyTestMain(m, opts...)
=======
"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788))
}
75 changes: 67 additions & 8 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ const (
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
// etcdClientTimeoutDuration represents the timeout duration for
// etcd client to execute a remote call
etcdClientTimeoutDuration = 30 * time.Second
)

var (
Expand Down Expand Up @@ -105,21 +105,39 @@ func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) e
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), retry.WithMaxTries(maxTries), retry.WithIsRetryableErr(isRetryableError(rpcName)))
}

<<<<<<< HEAD
// Put delegates request to clientv3.KV.Put
func (c *Client) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (resp *clientv3.PutResponse, err error) {
=======
// Put delegates request to clientV3.KV.Put
func (c *Client) Put(
ctx context.Context, key, val string, opts ...clientV3.OpOption,
) (resp *clientV3.PutResponse, err error) {
putCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788))
err = retryRPC(EtcdPut, c.metrics[EtcdPut], func() error {
var inErr error
resp, inErr = c.cli.Put(ctx, key, val, opts...)
resp, inErr = c.cli.Put(putCtx, key, val, opts...)
return inErr
})
return
}

<<<<<<< HEAD
// Get delegates request to clientv3.KV.Get
func (c *Client) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.GetResponse, err error) {
=======
// Get delegates request to clientV3.KV.Get
func (c *Client) Get(
ctx context.Context, key string, opts ...clientV3.OpOption,
) (resp *clientV3.GetResponse, err error) {
getCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788))
err = retryRPC(EtcdGet, c.metrics[EtcdGet], func() error {
var inErr error
resp, inErr = c.cli.Get(ctx, key, opts...)
resp, inErr = c.cli.Get(getCtx, key, opts...)
return inErr
})
return
Expand All @@ -130,8 +148,15 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti
if metric, ok := c.metrics[EtcdTxn]; ok {
metric.Inc()
}
<<<<<<< HEAD
// We don't retry on delete operatoin. It's dangerous.
return c.cli.Delete(ctx, key, opts...)
=======
delCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
// We don't retry on delete operation. It's dangerous.
return c.cli.Delete(delCtx, key, opts...)
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788))
}

// TxnWithoutRetry delegates request to clientv3.KV.Txn
Expand All @@ -144,8 +169,15 @@ func (c *Client) TxnWithoutRetry(ctx context.Context) clientv3.Txn {

// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error,
// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.
<<<<<<< HEAD
func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
=======
func (c *Client) Txn(
ctx context.Context, cmps []clientV3.Cmp, opsThen, opsElse []clientV3.Op,
) (resp *clientV3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788))
defer cancel()
err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error {
var inErr error
Expand All @@ -155,11 +187,20 @@ func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse
return
}

<<<<<<< HEAD
// Grant delegates request to clientv3.Lease.Grant
func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) {
=======
// Grant delegates request to clientV3.Lease.Grant
func (c *Client) Grant(
ctx context.Context, ttl int64,
) (resp *clientV3.LeaseGrantResponse, err error) {
grantCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788))
err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error {
var inErr error
resp, inErr = c.cli.Grant(ctx, ttl)
resp, inErr = c.cli.Grant(grantCtx, ttl)
return inErr
})
return
Expand Down Expand Up @@ -187,21 +228,39 @@ func isRetryableError(rpcName string) retry.IsRetryable {
}
}

<<<<<<< HEAD
// Revoke delegates request to clientv3.Lease.Revoke
func (c *Client) Revoke(ctx context.Context, id clientv3.LeaseID) (resp *clientv3.LeaseRevokeResponse, err error) {
=======
// Revoke delegates request to clientV3.Lease.Revoke
func (c *Client) Revoke(
ctx context.Context, id clientV3.LeaseID,
) (resp *clientV3.LeaseRevokeResponse, err error) {
revokeCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788))
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
var inErr error
resp, inErr = c.cli.Revoke(ctx, id)
resp, inErr = c.cli.Revoke(revokeCtx, id)
return inErr
})
return
}

<<<<<<< HEAD
// TimeToLive delegates request to clientv3.Lease.TimeToLive
func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption) (resp *clientv3.LeaseTimeToLiveResponse, err error) {
=======
// TimeToLive delegates request to clientV3.Lease.TimeToLive
func (c *Client) TimeToLive(
ctx context.Context, lease clientV3.LeaseID, opts ...clientV3.LeaseOption,
) (resp *clientV3.LeaseTimeToLiveResponse, err error) {
timeToLiveCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788))
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
var inErr error
resp, inErr = c.cli.TimeToLive(ctx, lease, opts...)
resp, inErr = c.cli.TimeToLive(timeToLiveCtx, lease, opts...)
return inErr
})
return
Expand Down

0 comments on commit 3ba67f6

Please sign in to comment.