From 1bcd5bddfe1260f63730f96ea054c99080047778 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 3 Jan 2023 17:12:19 +0800 Subject: [PATCH] p2p(ticdc): use a larger max receive message size (#8005) close pingcap/tiflow#8004 Signed-off-by: Neil Shen --- cdc/server.go | 25 +++++++++++------------ pkg/cmd/server/server_test.go | 5 +++++ pkg/config/config_test_data.go | 3 ++- pkg/config/messages.go | 17 +++++++++++++++ pkg/config/messages_test.go | 7 +++++++ pkg/p2p/client.go | 11 ++++++---- pkg/p2p/client_connector.go | 4 +++- pkg/p2p/client_test.go | 1 + pkg/p2p/message_router_test.go | 1 + pkg/p2p/mock_cluster.go | 1 + pkg/p2p/server.go | 2 ++ pkg/p2p/server_client_integration_test.go | 1 + pkg/p2p/server_wrapper.go | 10 ++++++++- pkg/p2p/server_wrapper_test.go | 5 ++++- 14 files changed, 72 insertions(+), 21 deletions(-) diff --git a/cdc/server.go b/cdc/server.go index 54010a54df6..555cd2b0143 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -26,17 +26,6 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/errors" "github.com/pingcap/log" - - "github.com/prometheus/client_golang/prometheus" - "go.etcd.io/etcd/client/pkg/v3/logutil" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "golang.org/x/net/netutil" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" - "github.com/pingcap/tiflow/cdc/capture" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/sorter/unified" @@ -48,6 +37,15 @@ import ( "github.com/pingcap/tiflow/pkg/p2p" "github.com/pingcap/tiflow/pkg/tcpserver" p2pProto "github.com/pingcap/tiflow/proto/p2p" + "github.com/prometheus/client_golang/prometheus" + "go.etcd.io/etcd/client/pkg/v3/logutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "golang.org/x/net/netutil" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" ) const ( @@ -99,9 +97,10 @@ func NewServer(pdEndpoints []string) (*Server, error) { return nil, errors.Trace(err) } + debugConfig := config.GetGlobalServerConfig().Debug s := &Server{ pdEndpoints: pdEndpoints, - grpcService: p2p.NewServerWrapper(), + grpcService: p2p.NewServerWrapper(debugConfig.Messages.ToMessageServerConfig()), tcpServer: tcpServer, } @@ -273,7 +272,7 @@ func (s *Server) run(ctx context.Context) (err error) { conf := config.GetGlobalServerConfig() if conf.Debug.EnableNewScheduler { - grpcServer := grpc.NewServer() + grpcServer := grpc.NewServer(s.grpcService.ServerOptions()...) p2pProto.RegisterCDCPeerToPeerServer(grpcServer, s.grpcService) wg.Go(func() error { diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 5012be7c59d..9d114915b65 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -208,6 +208,7 @@ func TestParseCfg(t *testing.T) { ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: 256 * 1024 * 1024, }, }, }, o.serverConfig) @@ -273,6 +274,7 @@ client-retry-rate-limit = 100.0 server-max-pending-message-count = 1024 server-ack-interval = "1s" server-worker-pool-size = 16 +max-recv-msg-size = 4 `, dataDir) err := os.WriteFile(configPath, []byte(configContent), 0o644) require.Nil(t, err) @@ -354,6 +356,7 @@ server-worker-pool-size = 16 ServerMaxPendingMessageCount: 1024, ServerAckInterval: config.TomlDuration(1 * time.Second), ServerWorkerPoolSize: 16, + MaxRecvMsgSize: 4, }, }, }, o.serverConfig) @@ -497,6 +500,7 @@ cert-allowed-cn = ["dd","ee"] ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: 256 * 1024 * 1024, }, }, }, o.serverConfig) @@ -557,6 +561,7 @@ unknown3 = 3 ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: 256 * 1024 * 1024, }, }, o.serverConfig.Debug) } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 46f2fca3cbb..50a5ebef18d 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -136,7 +136,8 @@ const ( "client-retry-rate-limit": 1, "server-max-pending-message-count": 102400, "server-ack-interval": 100000000, - "server-worker-pool-size": 4 + "server-worker-pool-size": 4, + "max-recv-msg-size": 268435456 } } }` diff --git a/pkg/config/messages.go b/pkg/config/messages.go index af14190e257..d4057f72ce9 100644 --- a/pkg/config/messages.go +++ b/pkg/config/messages.go @@ -20,6 +20,8 @@ import ( "github.com/pingcap/tiflow/pkg/p2p" ) +const defaultMaxRecvMsgSize = 256 * 1024 * 1024 // 256MB + // MessagesConfig configs MessageServer and MessageClient. type MessagesConfig struct { ClientMaxBatchInterval TomlDuration `toml:"client-max-batch-interval" json:"client-max-batch-interval"` @@ -30,6 +32,9 @@ type MessagesConfig struct { ServerMaxPendingMessageCount int `toml:"server-max-pending-message-count" json:"server-max-pending-message-count"` ServerAckInterval TomlDuration `toml:"server-ack-interval" json:"server-ack-interval"` ServerWorkerPoolSize int `toml:"server-worker-pool-size" json:"server-worker-pool-size"` + + // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. + MaxRecvMsgSize int `toml:"max-recv-msg-size" json:"max-recv-msg-size"` } // read only @@ -42,6 +47,7 @@ var defaultMessageConfig = &MessagesConfig{ ServerMaxPendingMessageCount: 102400, ServerAckInterval: TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: defaultMaxRecvMsgSize, } const ( @@ -121,6 +127,14 @@ func (c *MessagesConfig) ValidateAndAdjust() error { return cerrors.ErrInvalidServerOption.GenWithStackByArgs("server-worker-pool-size is larger than 32") } + if c.MaxRecvMsgSize == 0 { + c.MaxRecvMsgSize = defaultMaxRecvMsgSize + } + if c.MaxRecvMsgSize < 0 { + return cerrors.ErrInvalidServerOption.GenWithStackByArgs( + "max-recv-msg-size must be larger than 0") + } + return nil } @@ -134,6 +148,7 @@ func (c *MessagesConfig) Clone() *MessagesConfig { ServerMaxPendingMessageCount: c.ServerMaxPendingMessageCount, ServerAckInterval: c.ServerAckInterval, ServerWorkerPoolSize: c.ServerWorkerPoolSize, + MaxRecvMsgSize: c.MaxRecvMsgSize, } } @@ -146,6 +161,7 @@ func (c *MessagesConfig) ToMessageClientConfig() *p2p.MessageClientConfig { MaxBatchCount: c.ClientMaxBatchCount, RetryRateLimitPerSecond: c.ClientRetryRateLimit, DialTimeout: clientDialTimeout, + MaxRecvMsgSize: c.MaxRecvMsgSize, } } @@ -160,5 +176,6 @@ func (c *MessagesConfig) ToMessageServerConfig() *p2p.MessageServerConfig { MaxPeerCount: maxPeerCount, WaitUnregisterHandleTimeoutThreshold: unregisterHandleTimeout, SendRateLimitPerStream: serverSendRateLimit, + MaxRecvMsgSize: c.MaxRecvMsgSize, } } diff --git a/pkg/config/messages_test.go b/pkg/config/messages_test.go index b2fa1f9188e..53d3f068d04 100644 --- a/pkg/config/messages_test.go +++ b/pkg/config/messages_test.go @@ -33,6 +33,7 @@ func TestDefaultMessageServerConfig(t *testing.T) { require.Greater(t, serverConfig.SendRateLimitPerStream, 0.1) require.Greater(t, serverConfig.MaxPeerCount, 0) require.Greater(t, serverConfig.WaitUnregisterHandleTimeoutThreshold, time.Duration(0)) + require.EqualValues(t, serverConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize) } func TestDefaultMessageClientConfig(t *testing.T) { @@ -43,6 +44,7 @@ func TestDefaultMessageClientConfig(t *testing.T) { require.Greater(t, clientConfig.MaxBatchCount, 0) require.Greater(t, clientConfig.RetryRateLimitPerSecond, 0.1) require.Greater(t, clientConfig.DialTimeout, time.Duration(0)) + require.EqualValues(t, clientConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize) } func TestMessagesConfigClone(t *testing.T) { @@ -73,4 +75,9 @@ func TestMessagesConfigValidateAndAdjust(t *testing.T) { err = illegalConfig.ValidateAndAdjust() require.Error(t, err) require.Regexp(t, ".*ErrInvalidServerOption.*", err.Error()) + + illegalConfig = defaultMessageConfig.Clone() + illegalConfig.MaxRecvMsgSize = -1 + err = illegalConfig.ValidateAndAdjust() + require.Error(t, err) } diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index da0586706fa..9f5833537a8 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -54,6 +54,8 @@ type MessageClientConfig struct { // The version of the client for compatibility check. // It should be in semver format. Empty string means no check. ClientVersion string + // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. + MaxRecvMsgSize int } // MessageClient is a client used to send peer messages. @@ -140,10 +142,11 @@ func (c *MessageClient) Run( } gRPCClient, release, err := c.connector.Connect(clientConnectOptions{ - network: network, - addr: addr, - credential: credential, - timeout: c.config.DialTimeout, + network: network, + addr: addr, + credential: credential, + timeout: c.config.DialTimeout, + maxRecvMsgSize: c.config.MaxRecvMsgSize, }) if err != nil { log.Warn("peer-message client: failed to connect to server", diff --git a/pkg/p2p/client_connector.go b/pkg/p2p/client_connector.go index 8cc1a15b397..a840d49df8a 100644 --- a/pkg/p2p/client_connector.go +++ b/pkg/p2p/client_connector.go @@ -35,7 +35,8 @@ type clientConnectOptions struct { // credential is used to setup the connection to the gRPC server. credential *security.Credential // timeout specifies the DialTimeout of the connection. - timeout time.Duration + timeout time.Duration + maxRecvMsgSize int } type cancelFn = func() @@ -63,6 +64,7 @@ func (c *clientConnectorImpl) Connect(opts clientConnectOptions) (proto.CDCPeerT conn, err := grpc.Dial( opts.addr, securityOption, + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.maxRecvMsgSize)), grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { return net.DialTimeout(opts.network, s, opts.timeout) }), diff --git a/pkg/p2p/client_test.go b/pkg/p2p/client_test.go index fab827582fd..e4228cdeae3 100644 --- a/pkg/p2p/client_test.go +++ b/pkg/p2p/client_test.go @@ -67,6 +67,7 @@ var clientConfigForUnitTesting = &MessageClientConfig{ RetryRateLimitPerSecond: 999.0, ClientVersion: "v5.4.0", // a fake version AdvertisedAddr: "fake-addr:8300", + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } func TestMessageClientBasics(t *testing.T) { diff --git a/pkg/p2p/message_router_test.go b/pkg/p2p/message_router_test.go index ed843042dbf..ac1196f2393 100644 --- a/pkg/p2p/message_router_test.go +++ b/pkg/p2p/message_router_test.go @@ -45,6 +45,7 @@ var clientConfig4TestingMessageRouter = &MessageClientConfig{ MaxBatchBytes: 8192, RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing DialTimeout: time.Second * 3, + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } func newMessageRouterTestSuite() *messageRouterTestSuite { diff --git a/pkg/p2p/mock_cluster.go b/pkg/p2p/mock_cluster.go index 2315ef7994b..a679f270e25 100644 --- a/pkg/p2p/mock_cluster.go +++ b/pkg/p2p/mock_cluster.go @@ -64,6 +64,7 @@ var clientConfig4MockCluster = &MessageClientConfig{ MaxBatchBytes: 8192, RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing DialTimeout: time.Second * 3, + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } func newMockNode(t *testing.T, id NodeID) *MockNode { diff --git a/pkg/p2p/server.go b/pkg/p2p/server.go index e80ecb47dff..b40da50c8fc 100755 --- a/pkg/p2p/server.go +++ b/pkg/p2p/server.go @@ -58,6 +58,8 @@ type MessageServerConfig struct { MaxPeerCount int // Semver of the server. Empty string means no version check. ServerVersion string + // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. + MaxRecvMsgSize int // The maximum time duration to wait before forcefully removing a handler. // diff --git a/pkg/p2p/server_client_integration_test.go b/pkg/p2p/server_client_integration_test.go index b5f300414d8..99cfc912f88 100644 --- a/pkg/p2p/server_client_integration_test.go +++ b/pkg/p2p/server_client_integration_test.go @@ -41,6 +41,7 @@ var clientConfig4Testing = &MessageClientConfig{ MaxBatchBytes: 8192, RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing DialTimeout: time.Second * 3, + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } type serverConfigOpt = func(config *MessageServerConfig) diff --git a/pkg/p2p/server_wrapper.go b/pkg/p2p/server_wrapper.go index bf9b5b803a1..596c824d29f 100644 --- a/pkg/p2p/server_wrapper.go +++ b/pkg/p2p/server_wrapper.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/proto/p2p" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/codes" gRPCPeer "google.golang.org/grpc/peer" "google.golang.org/grpc/status" @@ -52,18 +53,25 @@ func (w *streamWrapper) Context() context.Context { type ServerWrapper struct { rwMu sync.RWMutex innerServer p2p.CDCPeerToPeerServer + cfg *MessageServerConfig wrappedStreamsMu sync.Mutex wrappedStreams map[*streamWrapper]struct{} } // NewServerWrapper creates a new ServerWrapper -func NewServerWrapper() *ServerWrapper { +func NewServerWrapper(cfg *MessageServerConfig) *ServerWrapper { return &ServerWrapper{ wrappedStreams: map[*streamWrapper]struct{}{}, + cfg: cfg, } } +// ServerOptions returns server option for creating grpc servers. +func (s *ServerWrapper) ServerOptions() []grpc.ServerOption { + return []grpc.ServerOption{grpc.MaxRecvMsgSize(s.cfg.MaxRecvMsgSize)} +} + // SendMessage implements p2p.CDCPeerToPeerServer func (s *ServerWrapper) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error { s.rwMu.RLock() diff --git a/pkg/p2p/server_wrapper_test.go b/pkg/p2p/server_wrapper_test.go index d83976a6c6d..af517ab24f0 100644 --- a/pkg/p2p/server_wrapper_test.go +++ b/pkg/p2p/server_wrapper_test.go @@ -68,7 +68,10 @@ func newServerWrapperForTesting(t *testing.T) (server *ServerWrapper, newClient var opts []grpc.ServerOption grpcServer := grpc.NewServer(opts...) - server = NewServerWrapper() + cfg := &MessageServerConfig{ + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB + } + server = NewServerWrapper(cfg) p2p.RegisterCDCPeerToPeerServer(grpcServer, server) var wg sync.WaitGroup