Skip to content

Commit

Permalink
capture(ticdc): fix the problem that openapi is blocked when pd is ab…
Browse files Browse the repository at this point in the history
…normal (pingcap#4778)
  • Loading branch information
CharlesCheung96 committed Mar 14, 2022
1 parent 2b2fd58 commit cb3035e
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 19 deletions.
19 changes: 11 additions & 8 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,17 @@ func NewCapture4Test(o owner.Owner) *Capture {
}

func (c *Capture) reset(ctx context.Context) error {
conf := config.GetGlobalServerConfig()
sess, err := concurrency.NewSession(c.EtcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create capture session")
}

c.captureMu.Lock()
defer c.captureMu.Unlock()
conf := config.GetGlobalServerConfig()
c.info = &model.CaptureInfo{
ID: uuid.New().String(),
AdvertiseAddr: conf.AdvertiseAddr,
Expand All @@ -131,13 +139,6 @@ func (c *Capture) reset(ctx context.Context) error {
// It can't be handled even after it fails, so we ignore it.
_ = c.session.Close()
}
sess, err := concurrency.NewSession(c.EtcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create capture session")
}
c.session = sess
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey)

Expand Down Expand Up @@ -605,6 +606,8 @@ func (c *Capture) WriteDebugInfo(ctx context.Context, w io.Writer) {
if c.processorManager != nil {
fmt.Fprintf(w, "\n\n*** processors info ***:\n\n")
c.processorManager.WriteDebugInfo(ctx, w, doneM)
} else {
close(doneM)
}
// wait the debug info printed
wait(doneM)
Expand Down
68 changes: 68 additions & 0 deletions cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package capture

import (
"context"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/pkg/etcd"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func TestReset(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

// init etcd mocker
clientURL, etcdServer, err := etcd.SetupEmbedEtcd(t.TempDir())
require.Nil(t, err)
logConfig := logutil.DefaultZapLoggerConfig
logConfig.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel)
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{clientURL.String()},
Context: ctx,
LogConfig: &logConfig,
DialTimeout: 3 * time.Second,
})
require.NoError(t, err)
client := etcd.NewCDCEtcdClient(ctx, etcdCli)
// Close the client before the test function exits to prevent possible
// ctx leaks.
// Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229
defer client.Close()

cp := NewCapture4Test(nil)
cp.EtcdClient = &client

// simulate network isolation scenarios
etcdServer.Close()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
err = cp.reset(ctx)
require.Regexp(t, ".*context canceled.*", err)
wg.Done()
}()
time.Sleep(100 * time.Millisecond)
info := cp.Info()
require.NotNil(t, info)
cancel()
wg.Wait()
}
24 changes: 24 additions & 0 deletions cdc/capture/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package capture

import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
)

func TestMain(m *testing.M) {
leakutil.SetUpLeakTest(m)
}
37 changes: 26 additions & 11 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ const (
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
// etcdClientTimeoutWithRetry represents the timeout duration for
// etcd client to perform Get, Put and Txn operations
etcdClientTimeoutWithRetry = 5 * time.Second
// etcdClientTimeoutWithoutRetry represents the timeout duration for
// etcd client to perform Del operations
etcdClientTimeoutWithoutRetry = 30 * time.Second
)

var (
Expand Down Expand Up @@ -111,8 +114,10 @@ func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) e
func (c *Client) Put(ctx context.Context, key, val string,
opts ...clientV3.OpOption) (resp *clientV3.PutResponse, err error) {
err = retryRPC(EtcdPut, c.metrics[EtcdPut], func() error {
putCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithRetry)
defer cancel()
var inErr error
resp, inErr = c.cli.Put(ctx, key, val, opts...)
resp, inErr = c.cli.Put(putCtx, key, val, opts...)
return inErr
})
return
Expand All @@ -122,8 +127,10 @@ func (c *Client) Put(ctx context.Context, key, val string,
func (c *Client) Get(ctx context.Context, key string,
opts ...clientV3.OpOption) (resp *clientV3.GetResponse, err error) {
err = retryRPC(EtcdGet, c.metrics[EtcdGet], func() error {
getCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithRetry)
defer cancel()
var inErr error
resp, inErr = c.cli.Get(ctx, key, opts...)
resp, inErr = c.cli.Get(getCtx, key, opts...)
return inErr
})
return
Expand All @@ -135,17 +142,19 @@ func (c *Client) Delete(ctx context.Context, key string,
if metric, ok := c.metrics[EtcdDel]; ok {
metric.Inc()
}
delCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithoutRetry)
defer cancel()
// We don't retry on delete operation. It's dangerous.
return c.cli.Delete(ctx, key, opts...)
return c.cli.Delete(delCtx, key, opts...)
}

// Txn delegates request to clientV3.KV.Txn. The error returned can only be a non-retryable error,
// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.
func (c *Client) Txn(ctx context.Context,
cmps []clientV3.Cmp, opsThen, opsElse []clientV3.Op) (resp *clientV3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
defer cancel()
err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error {
txnCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithRetry)
defer cancel()
var inErr error
resp, inErr = c.cli.Txn(txnCtx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit()
return inErr
Expand All @@ -157,8 +166,10 @@ func (c *Client) Txn(ctx context.Context,
func (c *Client) Grant(ctx context.Context,
ttl int64) (resp *clientV3.LeaseGrantResponse, err error) {
err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error {
grantCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithoutRetry)
defer cancel()
var inErr error
resp, inErr = c.cli.Grant(ctx, ttl)
resp, inErr = c.cli.Grant(grantCtx, ttl)
return inErr
})
return
Expand Down Expand Up @@ -190,8 +201,10 @@ func isRetryableError(rpcName string) retry.IsRetryable {
func (c *Client) Revoke(ctx context.Context,
id clientV3.LeaseID) (resp *clientV3.LeaseRevokeResponse, err error) {
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
revokeCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithoutRetry)
defer cancel()
var inErr error
resp, inErr = c.cli.Revoke(ctx, id)
resp, inErr = c.cli.Revoke(revokeCtx, id)
return inErr
})
return
Expand All @@ -201,8 +214,10 @@ func (c *Client) Revoke(ctx context.Context,
func (c *Client) TimeToLive(ctx context.Context, lease clientV3.LeaseID,
opts ...clientV3.LeaseOption) (resp *clientV3.LeaseTimeToLiveResponse, err error) {
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
timeToLiveCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithoutRetry)
defer cancel()
var inErr error
resp, inErr = c.cli.TimeToLive(ctx, lease, opts...)
resp, inErr = c.cli.TimeToLive(timeToLiveCtx, lease, opts...)
return inErr
})
return
Expand Down

0 comments on commit cb3035e

Please sign in to comment.