diff --git a/cdc/server/server.go b/cdc/server/server.go index bc9f4e8d414..4ba942e9721 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -121,7 +121,7 @@ func New(pdEndpoints []string) (*server, error) { s := &server{ pdEndpoints: pdEndpoints, - grpcService: p2p.NewServerWrapper(), + grpcService: p2p.NewServerWrapper(debugConfig.Messages.ToMessageServerConfig()), tcpServer: tcpServer, useEventSortEngine: useEventSortEngine, @@ -377,7 +377,7 @@ func (s *server) run(ctx context.Context) (err error) { } if conf.Debug.EnableNewScheduler { - grpcServer := grpc.NewServer() + grpcServer := grpc.NewServer(s.grpcService.ServerOptions()...) p2pProto.RegisterCDCPeerToPeerServer(grpcServer, s.grpcService) wg.Go(func() error { diff --git a/engine/pkg/p2p/message_sender.go b/engine/pkg/p2p/message_sender.go index 95030b58ce9..1d9eebbd4a7 100644 --- a/engine/pkg/p2p/message_sender.go +++ b/engine/pkg/p2p/message_sender.go @@ -82,8 +82,9 @@ var defaultClientConfig = &p2p.MessageClientConfig{ BatchSendInterval: 100 * time.Millisecond, // essentially disables flushing MaxBatchBytes: 8 * 1024 * 1024, // 8MB MaxBatchCount: 4096, - RetryRateLimitPerSecond: 1.0, // once per second - ClientVersion: "v5.4.0", // a fake version + RetryRateLimitPerSecond: 1.0, // once per second + ClientVersion: "v5.4.0", // a fake version + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } // NewMessageRouter creates a new MessageRouter instance via tiflow p2p API diff --git a/engine/pkg/p2p/server_integration_test.go b/engine/pkg/p2p/server_integration_test.go index 8875926ebab..98897f4ee8a 100644 --- a/engine/pkg/p2p/server_integration_test.go +++ b/engine/pkg/p2p/server_integration_test.go @@ -46,6 +46,7 @@ var clientConfigForUnitTesting = &p2pImpl.MessageClientConfig{ RetryRateLimitPerSecond: 999.0, ClientVersion: "v5.4.0", // a fake version AdvertisedAddr: "fake-addr:8300", + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } func TestMessageRPCServiceBasics(t *testing.T) { diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 683b8b852ce..8b52081a62d 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -205,6 +205,7 @@ func TestParseCfg(t *testing.T) { ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: 256 * 1024 * 1024, }, Scheduler: &config.SchedulerConfig{ HeartbeatTick: 2, @@ -280,6 +281,7 @@ client-retry-rate-limit = 100.0 server-max-pending-message-count = 1024 server-ack-interval = "1s" server-worker-pool-size = 16 +max-recv-msg-size = 4 [debug.scheduler] heartbeat-tick = 3 max-task-concurrency = 11 @@ -362,6 +364,7 @@ check-balance-interval = "10s" ServerMaxPendingMessageCount: 1024, ServerAckInterval: config.TomlDuration(1 * time.Second), ServerWorkerPoolSize: 16, + MaxRecvMsgSize: 4, }, Scheduler: &config.SchedulerConfig{ HeartbeatTick: 3, @@ -511,6 +514,7 @@ cert-allowed-cn = ["dd","ee"] ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: 256 * 1024 * 1024, }, Scheduler: &config.SchedulerConfig{ HeartbeatTick: 2, @@ -577,6 +581,7 @@ unknown3 = 3 ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: 256 * 1024 * 1024, }, Scheduler: &config.SchedulerConfig{ HeartbeatTick: 2, diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index a3fbfa4ed47..e856b6a3eca 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -130,7 +130,8 @@ const ( "client-retry-rate-limit": 1, "server-max-pending-message-count": 102400, "server-ack-interval": 100000000, - "server-worker-pool-size": 4 + "server-worker-pool-size": 4, + "max-recv-msg-size": 268435456 }, "scheduler": { "heartbeat-tick": 2, diff --git a/pkg/config/messages.go b/pkg/config/messages.go index af14190e257..d4057f72ce9 100644 --- a/pkg/config/messages.go +++ b/pkg/config/messages.go @@ -20,6 +20,8 @@ import ( "github.com/pingcap/tiflow/pkg/p2p" ) +const defaultMaxRecvMsgSize = 256 * 1024 * 1024 // 256MB + // MessagesConfig configs MessageServer and MessageClient. type MessagesConfig struct { ClientMaxBatchInterval TomlDuration `toml:"client-max-batch-interval" json:"client-max-batch-interval"` @@ -30,6 +32,9 @@ type MessagesConfig struct { ServerMaxPendingMessageCount int `toml:"server-max-pending-message-count" json:"server-max-pending-message-count"` ServerAckInterval TomlDuration `toml:"server-ack-interval" json:"server-ack-interval"` ServerWorkerPoolSize int `toml:"server-worker-pool-size" json:"server-worker-pool-size"` + + // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. + MaxRecvMsgSize int `toml:"max-recv-msg-size" json:"max-recv-msg-size"` } // read only @@ -42,6 +47,7 @@ var defaultMessageConfig = &MessagesConfig{ ServerMaxPendingMessageCount: 102400, ServerAckInterval: TomlDuration(time.Millisecond * 100), ServerWorkerPoolSize: 4, + MaxRecvMsgSize: defaultMaxRecvMsgSize, } const ( @@ -121,6 +127,14 @@ func (c *MessagesConfig) ValidateAndAdjust() error { return cerrors.ErrInvalidServerOption.GenWithStackByArgs("server-worker-pool-size is larger than 32") } + if c.MaxRecvMsgSize == 0 { + c.MaxRecvMsgSize = defaultMaxRecvMsgSize + } + if c.MaxRecvMsgSize < 0 { + return cerrors.ErrInvalidServerOption.GenWithStackByArgs( + "max-recv-msg-size must be larger than 0") + } + return nil } @@ -134,6 +148,7 @@ func (c *MessagesConfig) Clone() *MessagesConfig { ServerMaxPendingMessageCount: c.ServerMaxPendingMessageCount, ServerAckInterval: c.ServerAckInterval, ServerWorkerPoolSize: c.ServerWorkerPoolSize, + MaxRecvMsgSize: c.MaxRecvMsgSize, } } @@ -146,6 +161,7 @@ func (c *MessagesConfig) ToMessageClientConfig() *p2p.MessageClientConfig { MaxBatchCount: c.ClientMaxBatchCount, RetryRateLimitPerSecond: c.ClientRetryRateLimit, DialTimeout: clientDialTimeout, + MaxRecvMsgSize: c.MaxRecvMsgSize, } } @@ -160,5 +176,6 @@ func (c *MessagesConfig) ToMessageServerConfig() *p2p.MessageServerConfig { MaxPeerCount: maxPeerCount, WaitUnregisterHandleTimeoutThreshold: unregisterHandleTimeout, SendRateLimitPerStream: serverSendRateLimit, + MaxRecvMsgSize: c.MaxRecvMsgSize, } } diff --git a/pkg/config/messages_test.go b/pkg/config/messages_test.go index b2fa1f9188e..53d3f068d04 100644 --- a/pkg/config/messages_test.go +++ b/pkg/config/messages_test.go @@ -33,6 +33,7 @@ func TestDefaultMessageServerConfig(t *testing.T) { require.Greater(t, serverConfig.SendRateLimitPerStream, 0.1) require.Greater(t, serverConfig.MaxPeerCount, 0) require.Greater(t, serverConfig.WaitUnregisterHandleTimeoutThreshold, time.Duration(0)) + require.EqualValues(t, serverConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize) } func TestDefaultMessageClientConfig(t *testing.T) { @@ -43,6 +44,7 @@ func TestDefaultMessageClientConfig(t *testing.T) { require.Greater(t, clientConfig.MaxBatchCount, 0) require.Greater(t, clientConfig.RetryRateLimitPerSecond, 0.1) require.Greater(t, clientConfig.DialTimeout, time.Duration(0)) + require.EqualValues(t, clientConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize) } func TestMessagesConfigClone(t *testing.T) { @@ -73,4 +75,9 @@ func TestMessagesConfigValidateAndAdjust(t *testing.T) { err = illegalConfig.ValidateAndAdjust() require.Error(t, err) require.Regexp(t, ".*ErrInvalidServerOption.*", err.Error()) + + illegalConfig = defaultMessageConfig.Clone() + illegalConfig.MaxRecvMsgSize = -1 + err = illegalConfig.ValidateAndAdjust() + require.Error(t, err) } diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index 72989a67cfc..60a0b8ad89d 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -54,6 +54,8 @@ type MessageClientConfig struct { // The version of the client for compatibility check. // It should be in semver format. Empty string means no check. ClientVersion string + // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. + MaxRecvMsgSize int } // MessageClient is a client used to send peer messages. @@ -140,10 +142,11 @@ func (c *MessageClient) Run( } gRPCClient, release, err := c.connector.Connect(clientConnectOptions{ - network: network, - addr: addr, - credential: credential, - timeout: c.config.DialTimeout, + network: network, + addr: addr, + credential: credential, + timeout: c.config.DialTimeout, + maxRecvMsgSize: c.config.MaxRecvMsgSize, }) if err != nil { log.Warn("peer-message client: failed to connect to server", diff --git a/pkg/p2p/client_connector.go b/pkg/p2p/client_connector.go index 8cc1a15b397..a840d49df8a 100644 --- a/pkg/p2p/client_connector.go +++ b/pkg/p2p/client_connector.go @@ -35,7 +35,8 @@ type clientConnectOptions struct { // credential is used to setup the connection to the gRPC server. credential *security.Credential // timeout specifies the DialTimeout of the connection. - timeout time.Duration + timeout time.Duration + maxRecvMsgSize int } type cancelFn = func() @@ -63,6 +64,7 @@ func (c *clientConnectorImpl) Connect(opts clientConnectOptions) (proto.CDCPeerT conn, err := grpc.Dial( opts.addr, securityOption, + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.maxRecvMsgSize)), grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { return net.DialTimeout(opts.network, s, opts.timeout) }), diff --git a/pkg/p2p/client_test.go b/pkg/p2p/client_test.go index fab827582fd..e4228cdeae3 100644 --- a/pkg/p2p/client_test.go +++ b/pkg/p2p/client_test.go @@ -67,6 +67,7 @@ var clientConfigForUnitTesting = &MessageClientConfig{ RetryRateLimitPerSecond: 999.0, ClientVersion: "v5.4.0", // a fake version AdvertisedAddr: "fake-addr:8300", + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } func TestMessageClientBasics(t *testing.T) { diff --git a/pkg/p2p/message_router_test.go b/pkg/p2p/message_router_test.go index ed843042dbf..ac1196f2393 100644 --- a/pkg/p2p/message_router_test.go +++ b/pkg/p2p/message_router_test.go @@ -45,6 +45,7 @@ var clientConfig4TestingMessageRouter = &MessageClientConfig{ MaxBatchBytes: 8192, RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing DialTimeout: time.Second * 3, + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } func newMessageRouterTestSuite() *messageRouterTestSuite { diff --git a/pkg/p2p/mock_cluster.go b/pkg/p2p/mock_cluster.go index 2315ef7994b..a679f270e25 100644 --- a/pkg/p2p/mock_cluster.go +++ b/pkg/p2p/mock_cluster.go @@ -64,6 +64,7 @@ var clientConfig4MockCluster = &MessageClientConfig{ MaxBatchBytes: 8192, RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing DialTimeout: time.Second * 3, + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } func newMockNode(t *testing.T, id NodeID) *MockNode { diff --git a/pkg/p2p/server.go b/pkg/p2p/server.go index 37fbae39306..815b11333f6 100755 --- a/pkg/p2p/server.go +++ b/pkg/p2p/server.go @@ -57,6 +57,8 @@ type MessageServerConfig struct { MaxPeerCount int // Semver of the server. Empty string means no version check. ServerVersion string + // MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive. + MaxRecvMsgSize int // The maximum time duration to wait before forcefully removing a handler. // diff --git a/pkg/p2p/server_client_integration_test.go b/pkg/p2p/server_client_integration_test.go index c446fddfe21..7f35a9b6bd4 100644 --- a/pkg/p2p/server_client_integration_test.go +++ b/pkg/p2p/server_client_integration_test.go @@ -41,6 +41,7 @@ var clientConfig4Testing = &MessageClientConfig{ MaxBatchBytes: 8192, RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing DialTimeout: time.Second * 3, + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB } type serverConfigOpt = func(config *MessageServerConfig) diff --git a/pkg/p2p/server_wrapper.go b/pkg/p2p/server_wrapper.go index da4bb50db83..7cbd80bbdae 100644 --- a/pkg/p2p/server_wrapper.go +++ b/pkg/p2p/server_wrapper.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/proto/p2p" "go.uber.org/zap" + "google.golang.org/grpc" "google.golang.org/grpc/codes" gRPCPeer "google.golang.org/grpc/peer" "google.golang.org/grpc/status" @@ -52,18 +53,25 @@ func (w *streamWrapper) Context() context.Context { type ServerWrapper struct { rwMu sync.RWMutex innerServer p2p.CDCPeerToPeerServer + cfg *MessageServerConfig wrappedStreamsMu sync.Mutex wrappedStreams map[*streamWrapper]struct{} } // NewServerWrapper creates a new ServerWrapper -func NewServerWrapper() *ServerWrapper { +func NewServerWrapper(cfg *MessageServerConfig) *ServerWrapper { return &ServerWrapper{ wrappedStreams: map[*streamWrapper]struct{}{}, + cfg: cfg, } } +// ServerOptions returns server option for creating grpc servers. +func (s *ServerWrapper) ServerOptions() []grpc.ServerOption { + return []grpc.ServerOption{grpc.MaxRecvMsgSize(s.cfg.MaxRecvMsgSize)} +} + // SendMessage implements p2p.CDCPeerToPeerServer func (s *ServerWrapper) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error { s.rwMu.RLock() diff --git a/pkg/p2p/server_wrapper_test.go b/pkg/p2p/server_wrapper_test.go index d83976a6c6d..af517ab24f0 100644 --- a/pkg/p2p/server_wrapper_test.go +++ b/pkg/p2p/server_wrapper_test.go @@ -68,7 +68,10 @@ func newServerWrapperForTesting(t *testing.T) (server *ServerWrapper, newClient var opts []grpc.ServerOption grpcServer := grpc.NewServer(opts...) - server = NewServerWrapper() + cfg := &MessageServerConfig{ + MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB + } + server = NewServerWrapper(cfg) p2p.RegisterCDCPeerToPeerServer(grpcServer, server) var wg sync.WaitGroup