From 862a9aaea511bc2e9bbdb9075995b3ef49e45d88 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 14 May 2024 19:17:42 +0800 Subject: [PATCH] pdClient (ticdc): Enable the pdClient forwarding function to make cdc more stable during network isolation between the PD leader (#11076) (#11086) close pingcap/tiflow#10849 --- cdc/api/v2/api_helpers.go | 3 ++- cdc/server/server.go | 3 ++- pkg/config/server_config.go | 5 +++++ pkg/migrate/migrate.go | 4 +++- pkg/upstream/upstream.go | 4 +++- 5 files changed, 15 insertions(+), 4 deletions(-) diff --git a/cdc/api/v2/api_helpers.go b/cdc/api/v2/api_helpers.go index e9c958cf216..1813ea6fd01 100644 --- a/cdc/api/v2/api_helpers.go +++ b/cdc/api/v2/api_helpers.go @@ -464,7 +464,8 @@ func (APIV2HelpersImpl) getPDClient(ctx context.Context, }, MinConnectTimeout: 3 * time.Second, }), - )) + ), + pd.WithForwardingOption(config.EnablePDForwarding)) if err != nil { return nil, cerror.WrapError(cerror.ErrAPIGetPDClientFailed, errors.Trace(err)) } diff --git a/cdc/server/server.go b/cdc/server/server.go index dde6fdd578f..b51d29c8877 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -151,7 +151,8 @@ func (s *server) prepare(ctx context.Context) error { }, MinConnectTimeout: 3 * time.Second, }), - )) + ), + pd.WithForwardingOption(config.EnablePDForwarding)) if err != nil { return errors.Trace(err) } diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index 3b1c8fd0efd..64ef092d824 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -48,6 +48,11 @@ const ( // DisableMemoryLimit is the default max memory percentage for TiCDC server. // 0 means no memory limit. DisableMemoryLimit = 0 + + // EnablePDForwarding is the value of whether to enable PD client forwarding function. + // The PD client will forward the requests throughthe follower + // If there is a network partition problem between TiCDC and PD leader. + EnablePDForwarding = true ) var ( diff --git a/pkg/migrate/migrate.go b/pkg/migrate/migrate.go index 6a082320bf0..c39aa01e87e 100644 --- a/pkg/migrate/migrate.go +++ b/pkg/migrate/migrate.go @@ -147,7 +147,9 @@ func createPDClient(ctx context.Context, }, MinConnectTimeout: 3 * time.Second, }), - )) + ), + pd.WithForwardingOption(config.EnablePDForwarding), + ) } // Note: we do not use etcd transaction to migrate key diff --git a/pkg/upstream/upstream.go b/pkg/upstream/upstream.go index 5b0e1de4c74..29d302837ef 100644 --- a/pkg/upstream/upstream.go +++ b/pkg/upstream/upstream.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/log" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tiflow/cdc/kv" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/etcd" @@ -152,7 +153,8 @@ func initUpstream(ctx context.Context, up *Upstream, cfg CaptureTopologyCfg) err }, MinConnectTimeout: 3 * time.Second, }), - )) + ), + pd.WithForwardingOption(config.EnablePDForwarding)) if err != nil { up.err.Store(err) return errors.Trace(err)