Skip to content

Commit

Permalink
p2p(ticdc): use a larger max receive message size (pingcap#8005)
Browse files Browse the repository at this point in the history
close pingcap#8004

Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Jan 6, 2023
1 parent 38f1d35 commit 1bcd5bd
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 21 deletions.
25 changes: 12 additions & 13 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/log"

"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/netutil"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"

"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/sorter/unified"
Expand All @@ -48,6 +37,15 @@ import (
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/tcpserver"
p2pProto "github.com/pingcap/tiflow/proto/p2p"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/netutil"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
)

const (
Expand Down Expand Up @@ -99,9 +97,10 @@ func NewServer(pdEndpoints []string) (*Server, error) {
return nil, errors.Trace(err)
}

debugConfig := config.GetGlobalServerConfig().Debug
s := &Server{
pdEndpoints: pdEndpoints,
grpcService: p2p.NewServerWrapper(),
grpcService: p2p.NewServerWrapper(debugConfig.Messages.ToMessageServerConfig()),
tcpServer: tcpServer,
}

Expand Down Expand Up @@ -273,7 +272,7 @@ func (s *Server) run(ctx context.Context) (err error) {

conf := config.GetGlobalServerConfig()
if conf.Debug.EnableNewScheduler {
grpcServer := grpc.NewServer()
grpcServer := grpc.NewServer(s.grpcService.ServerOptions()...)
p2pProto.RegisterCDCPeerToPeerServer(grpcServer, s.grpcService)

wg.Go(func() error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func TestParseCfg(t *testing.T) {
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
},
},
}, o.serverConfig)
Expand Down Expand Up @@ -273,6 +274,7 @@ client-retry-rate-limit = 100.0
server-max-pending-message-count = 1024
server-ack-interval = "1s"
server-worker-pool-size = 16
max-recv-msg-size = 4
`, dataDir)
err := os.WriteFile(configPath, []byte(configContent), 0o644)
require.Nil(t, err)
Expand Down Expand Up @@ -354,6 +356,7 @@ server-worker-pool-size = 16
ServerMaxPendingMessageCount: 1024,
ServerAckInterval: config.TomlDuration(1 * time.Second),
ServerWorkerPoolSize: 16,
MaxRecvMsgSize: 4,
},
},
}, o.serverConfig)
Expand Down Expand Up @@ -497,6 +500,7 @@ cert-allowed-cn = ["dd","ee"]
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
},
},
}, o.serverConfig)
Expand Down Expand Up @@ -557,6 +561,7 @@ unknown3 = 3
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: config.TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: 256 * 1024 * 1024,
},
}, o.serverConfig.Debug)
}
3 changes: 2 additions & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ const (
"client-retry-rate-limit": 1,
"server-max-pending-message-count": 102400,
"server-ack-interval": 100000000,
"server-worker-pool-size": 4
"server-worker-pool-size": 4,
"max-recv-msg-size": 268435456
}
}
}`
Expand Down
17 changes: 17 additions & 0 deletions pkg/config/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/pingcap/tiflow/pkg/p2p"
)

const defaultMaxRecvMsgSize = 256 * 1024 * 1024 // 256MB

// MessagesConfig configs MessageServer and MessageClient.
type MessagesConfig struct {
ClientMaxBatchInterval TomlDuration `toml:"client-max-batch-interval" json:"client-max-batch-interval"`
Expand All @@ -30,6 +32,9 @@ type MessagesConfig struct {
ServerMaxPendingMessageCount int `toml:"server-max-pending-message-count" json:"server-max-pending-message-count"`
ServerAckInterval TomlDuration `toml:"server-ack-interval" json:"server-ack-interval"`
ServerWorkerPoolSize int `toml:"server-worker-pool-size" json:"server-worker-pool-size"`

// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
MaxRecvMsgSize int `toml:"max-recv-msg-size" json:"max-recv-msg-size"`
}

// read only
Expand All @@ -42,6 +47,7 @@ var defaultMessageConfig = &MessagesConfig{
ServerMaxPendingMessageCount: 102400,
ServerAckInterval: TomlDuration(time.Millisecond * 100),
ServerWorkerPoolSize: 4,
MaxRecvMsgSize: defaultMaxRecvMsgSize,
}

const (
Expand Down Expand Up @@ -121,6 +127,14 @@ func (c *MessagesConfig) ValidateAndAdjust() error {
return cerrors.ErrInvalidServerOption.GenWithStackByArgs("server-worker-pool-size is larger than 32")
}

if c.MaxRecvMsgSize == 0 {
c.MaxRecvMsgSize = defaultMaxRecvMsgSize
}
if c.MaxRecvMsgSize < 0 {
return cerrors.ErrInvalidServerOption.GenWithStackByArgs(
"max-recv-msg-size must be larger than 0")
}

return nil
}

Expand All @@ -134,6 +148,7 @@ func (c *MessagesConfig) Clone() *MessagesConfig {
ServerMaxPendingMessageCount: c.ServerMaxPendingMessageCount,
ServerAckInterval: c.ServerAckInterval,
ServerWorkerPoolSize: c.ServerWorkerPoolSize,
MaxRecvMsgSize: c.MaxRecvMsgSize,
}
}

Expand All @@ -146,6 +161,7 @@ func (c *MessagesConfig) ToMessageClientConfig() *p2p.MessageClientConfig {
MaxBatchCount: c.ClientMaxBatchCount,
RetryRateLimitPerSecond: c.ClientRetryRateLimit,
DialTimeout: clientDialTimeout,
MaxRecvMsgSize: c.MaxRecvMsgSize,
}
}

Expand All @@ -160,5 +176,6 @@ func (c *MessagesConfig) ToMessageServerConfig() *p2p.MessageServerConfig {
MaxPeerCount: maxPeerCount,
WaitUnregisterHandleTimeoutThreshold: unregisterHandleTimeout,
SendRateLimitPerStream: serverSendRateLimit,
MaxRecvMsgSize: c.MaxRecvMsgSize,
}
}
7 changes: 7 additions & 0 deletions pkg/config/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestDefaultMessageServerConfig(t *testing.T) {
require.Greater(t, serverConfig.SendRateLimitPerStream, 0.1)
require.Greater(t, serverConfig.MaxPeerCount, 0)
require.Greater(t, serverConfig.WaitUnregisterHandleTimeoutThreshold, time.Duration(0))
require.EqualValues(t, serverConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize)
}

func TestDefaultMessageClientConfig(t *testing.T) {
Expand All @@ -43,6 +44,7 @@ func TestDefaultMessageClientConfig(t *testing.T) {
require.Greater(t, clientConfig.MaxBatchCount, 0)
require.Greater(t, clientConfig.RetryRateLimitPerSecond, 0.1)
require.Greater(t, clientConfig.DialTimeout, time.Duration(0))
require.EqualValues(t, clientConfig.MaxRecvMsgSize, defaultMaxRecvMsgSize)
}

func TestMessagesConfigClone(t *testing.T) {
Expand Down Expand Up @@ -73,4 +75,9 @@ func TestMessagesConfigValidateAndAdjust(t *testing.T) {
err = illegalConfig.ValidateAndAdjust()
require.Error(t, err)
require.Regexp(t, ".*ErrInvalidServerOption.*", err.Error())

illegalConfig = defaultMessageConfig.Clone()
illegalConfig.MaxRecvMsgSize = -1
err = illegalConfig.ValidateAndAdjust()
require.Error(t, err)
}
11 changes: 7 additions & 4 deletions pkg/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type MessageClientConfig struct {
// The version of the client for compatibility check.
// It should be in semver format. Empty string means no check.
ClientVersion string
// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
MaxRecvMsgSize int
}

// MessageClient is a client used to send peer messages.
Expand Down Expand Up @@ -140,10 +142,11 @@ func (c *MessageClient) Run(
}

gRPCClient, release, err := c.connector.Connect(clientConnectOptions{
network: network,
addr: addr,
credential: credential,
timeout: c.config.DialTimeout,
network: network,
addr: addr,
credential: credential,
timeout: c.config.DialTimeout,
maxRecvMsgSize: c.config.MaxRecvMsgSize,
})
if err != nil {
log.Warn("peer-message client: failed to connect to server",
Expand Down
4 changes: 3 additions & 1 deletion pkg/p2p/client_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type clientConnectOptions struct {
// credential is used to setup the connection to the gRPC server.
credential *security.Credential
// timeout specifies the DialTimeout of the connection.
timeout time.Duration
timeout time.Duration
maxRecvMsgSize int
}

type cancelFn = func()
Expand Down Expand Up @@ -63,6 +64,7 @@ func (c *clientConnectorImpl) Connect(opts clientConnectOptions) (proto.CDCPeerT
conn, err := grpc.Dial(
opts.addr,
securityOption,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(opts.maxRecvMsgSize)),
grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
return net.DialTimeout(opts.network, s, opts.timeout)
}),
Expand Down
1 change: 1 addition & 0 deletions pkg/p2p/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ var clientConfigForUnitTesting = &MessageClientConfig{
RetryRateLimitPerSecond: 999.0,
ClientVersion: "v5.4.0", // a fake version
AdvertisedAddr: "fake-addr:8300",
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

func TestMessageClientBasics(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/p2p/message_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var clientConfig4TestingMessageRouter = &MessageClientConfig{
MaxBatchBytes: 8192,
RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing
DialTimeout: time.Second * 3,
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

func newMessageRouterTestSuite() *messageRouterTestSuite {
Expand Down
1 change: 1 addition & 0 deletions pkg/p2p/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var clientConfig4MockCluster = &MessageClientConfig{
MaxBatchBytes: 8192,
RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing
DialTimeout: time.Second * 3,
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

func newMockNode(t *testing.T, id NodeID) *MockNode {
Expand Down
2 changes: 2 additions & 0 deletions pkg/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type MessageServerConfig struct {
MaxPeerCount int
// Semver of the server. Empty string means no version check.
ServerVersion string
// MaxRecvMsgSize is the maximum message size in bytes TiCDC can receive.
MaxRecvMsgSize int

// The maximum time duration to wait before forcefully removing a handler.
//
Expand Down
1 change: 1 addition & 0 deletions pkg/p2p/server_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var clientConfig4Testing = &MessageClientConfig{
MaxBatchBytes: 8192,
RetryRateLimitPerSecond: 10.0, // using 10.0 instead of 1.0 to accelerate testing
DialTimeout: time.Second * 3,
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}

type serverConfigOpt = func(config *MessageServerConfig)
Expand Down
10 changes: 9 additions & 1 deletion pkg/p2p/server_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/proto/p2p"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
gRPCPeer "google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -52,18 +53,25 @@ func (w *streamWrapper) Context() context.Context {
type ServerWrapper struct {
rwMu sync.RWMutex
innerServer p2p.CDCPeerToPeerServer
cfg *MessageServerConfig

wrappedStreamsMu sync.Mutex
wrappedStreams map[*streamWrapper]struct{}
}

// NewServerWrapper creates a new ServerWrapper
func NewServerWrapper() *ServerWrapper {
func NewServerWrapper(cfg *MessageServerConfig) *ServerWrapper {
return &ServerWrapper{
wrappedStreams: map[*streamWrapper]struct{}{},
cfg: cfg,
}
}

// ServerOptions returns server option for creating grpc servers.
func (s *ServerWrapper) ServerOptions() []grpc.ServerOption {
return []grpc.ServerOption{grpc.MaxRecvMsgSize(s.cfg.MaxRecvMsgSize)}
}

// SendMessage implements p2p.CDCPeerToPeerServer
func (s *ServerWrapper) SendMessage(stream p2p.CDCPeerToPeer_SendMessageServer) error {
s.rwMu.RLock()
Expand Down
5 changes: 4 additions & 1 deletion pkg/p2p/server_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ func newServerWrapperForTesting(t *testing.T) (server *ServerWrapper, newClient
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)

server = NewServerWrapper()
cfg := &MessageServerConfig{
MaxRecvMsgSize: 4 * 1024 * 1024, // 4MB
}
server = NewServerWrapper(cfg)
p2p.RegisterCDCPeerToPeerServer(grpcServer, server)

var wg sync.WaitGroup
Expand Down

0 comments on commit 1bcd5bd

Please sign in to comment.