From 7fd2b776c164507f7e07f44300a2c9104cc73ffa Mon Sep 17 00:00:00 2001 From: mmsqe Date: Thu, 16 Feb 2023 21:20:23 +0800 Subject: [PATCH] set limit when init client --- rpc/client.go | 37 ++++++++++++++++++++++--------------- rpc/server.go | 5 ++--- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 279cc1fd7575..d71605599557 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -132,7 +132,8 @@ func (c *Client) newClientConn(conn ServerCodec) *clientConn { } // SetBatchLimits set maximum number of requests in a batch and maximum number of bytes returned from calls -func (c *Client) SetBatchLimits(limit int, size int) { +// And update non-http connection with limit +func (c *Client) SetBatchLimits(limit, size int) { c.batchRequestLimit = limit c.batchResponseMaxSize = size select { @@ -254,22 +255,24 @@ func newClient(initctx context.Context, connect reconnectFunc) (*Client, error) return c, nil } -func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { +func initClientWithBatchLimits(conn ServerCodec, idgen func() ID, services *serviceRegistry, limit, size int) *Client { _, isHTTP := conn.(*httpConn) c := &Client{ - isHTTP: isHTTP, - idgen: idgen, - services: services, - writeConn: conn, - close: make(chan struct{}), - closing: make(chan struct{}), - didClose: make(chan struct{}), - reconnected: make(chan ServerCodec), - readOp: make(chan readOp), - readErr: make(chan error), - reqInit: make(chan *requestOp), - reqSent: make(chan error, 1), - reqTimeout: make(chan *requestOp), + isHTTP: isHTTP, + idgen: idgen, + services: services, + writeConn: conn, + close: make(chan struct{}), + closing: make(chan struct{}), + didClose: make(chan struct{}), + reconnected: make(chan ServerCodec), + readOp: make(chan readOp), + readErr: make(chan error), + reqInit: make(chan *requestOp), + reqSent: make(chan error, 1), + reqTimeout: make(chan *requestOp), + batchRequestLimit: limit, + batchResponseMaxSize: size, } if !isHTTP { go c.dispatch(conn) @@ -277,6 +280,10 @@ func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *C return c } +func initClient(conn ServerCodec, idgen func() ID, services *serviceRegistry) *Client { + return initClientWithBatchLimits(conn, idgen, services, 0, 0) +} + // RegisterName creates a service for the given receiver type under the given name. When no // methods on the given receiver match the criteria to be either a RPC method or a // subscription an error is returned. Otherwise a new service is created and added to the diff --git a/rpc/server.go b/rpc/server.go index 74b01dda253a..e0835ff9570c 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -68,7 +68,7 @@ func NewServer() *Server { } // SetBatchLimits set maximum number of requests in a batch and maximum number of bytes returned from calls -func (s *Server) SetBatchLimits(limit int, size int) { +func (s *Server) SetBatchLimits(limit, size int) { s.batchRequestLimit = limit s.batchResponseMaxSize = size } @@ -94,8 +94,7 @@ func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { } defer s.untrackCodec(codec) - c := initClient(codec, s.idgen, &s.services) - c.SetBatchLimits(s.batchRequestLimit, s.batchResponseMaxSize) + c := initClientWithBatchLimits(codec, s.idgen, &s.services, s.batchRequestLimit, s.batchResponseMaxSize) <-codec.closed() c.Close() }