diff --git a/common/remote/rpc/grpc_client.go b/common/remote/rpc/grpc_client.go index a5373227..90372016 100644 --- a/common/remote/rpc/grpc_client.go +++ b/common/remote/rpc/grpc_client.go @@ -50,8 +50,8 @@ func NewGrpcClient(ctx context.Context, clientName string, nacosServer *nacos_se name: clientName, labels: make(map[string]string, 8), rpcClientStatus: INITIALIZED, - eventChan: make(chan ConnectionEvent), - reconnectionChan: make(chan ReconnectContext), + eventChan: make(chan ConnectionEvent, 1), + reconnectionChan: make(chan ReconnectContext, 1), nacosServer: nacosServer, mux: new(sync.Mutex), }, @@ -161,7 +161,7 @@ func (c *GrpcClient) sendConnectionSetupRequest(grpcConn *GrpcConnection) error csr.ClientAbilities = c.clientAbilities err := grpcConn.biStreamSend(convertRequest(csr)) if err != nil { - logger.Warnf("Send ConnectionSetupRequest error:%+v", err) + logger.Warnf("send connectionSetupRequest error:%v", err) } time.Sleep(100 * time.Millisecond) return err @@ -188,16 +188,16 @@ func (c *GrpcClient) bindBiRequestStream(streamClient nacos_grpc_service.BiReque abandon := grpcConn.getAbandon() if c.IsRunning() && !abandon { if err == io.EOF { - logger.Infof("%s Request stream onCompleted, switch server", grpcConn.getConnectionId()) + logger.Infof("connectionId %s request stream onCompleted, switch server", grpcConn.getConnectionId()) } else { - logger.Errorf("%s Request stream error, switch server, error=%+v", grpcConn.getConnectionId(), err) + logger.Errorf("connectionId %s request stream error, switch server, error=%v", grpcConn.getConnectionId(), err) } if atomic.CompareAndSwapInt32((*int32)(&c.rpcClientStatus), int32(RUNNING), int32(UNHEALTHY)) { c.switchServerAsync(ServerInfo{}, false) return } } else { - logger.Infof("%s received error event, isRunning:%v, isAbandon=%v, error=%+v", grpcConn.getConnectionId(), running, abandon, err) + logger.Infof("connectionId %s received error event, isRunning:%v, isAbandon=%v, error=%v", grpcConn.getConnectionId(), running, abandon, err) return } } else { diff --git a/common/remote/rpc/rpc_client.go b/common/remote/rpc/rpc_client.go index 40496ca3..d30826af 100644 --- a/common/remote/rpc/rpc_client.go +++ b/common/remote/rpc/rpc_client.go @@ -218,7 +218,7 @@ func (r *RpcClient) Start() { startUpRetryTimes-- serverInfo, err := r.nextRpcServer() if err != nil { - logger.Errorf("[RpcClient.nextRpcServer],err:%+v", err) + logger.Errorf("[RpcClient.nextRpcServer],err:%v", err) break } logger.Infof("[RpcClient.Start] %s try to connect to server on start up, server: %+v", r.name, serverInfo) @@ -235,16 +235,14 @@ func (r *RpcClient) Start() { currentConnection.getServerInfo(), currentConnection.getConnectionId()) r.currentConnection = currentConnection atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING)) - r.asyncNotifyConnectionChange(CONNECTED) + r.notifyConnectionChange(CONNECTED) } else { r.switchServerAsync(ServerInfo{}, false) } } -func (r *RpcClient) asyncNotifyConnectionChange(eventType ConnectionStatus) { - go func() { - r.eventChan <- ConnectionEvent{eventType: eventType} - }() +func (r *RpcClient) notifyConnectionChange(eventType ConnectionStatus) { + r.eventChan <- ConnectionEvent{eventType: eventType} } func (r *RpcClient) notifyServerSrvChange() { @@ -345,7 +343,7 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) { } r.currentConnection = connectionNew atomic.StoreInt32((*int32)(&r.rpcClientStatus), (int32)(RUNNING)) - r.asyncNotifyConnectionChange(CONNECTED) + r.notifyConnectionChange(CONNECTED) return } if r.isShutdown() { @@ -371,7 +369,7 @@ func (r *RpcClient) reconnect(serverInfo ServerInfo, onRequestFail bool) { func (r *RpcClient) closeConnection() { if r.currentConnection != nil { r.currentConnection.close() - r.asyncNotifyConnectionChange(DISCONNECTED) + r.notifyConnectionChange(DISCONNECTED) } } @@ -381,7 +379,7 @@ func (r *RpcClient) notifyConnectionEvent(event ConnectionEvent) { if len(listeners) == 0 { return } - logger.Infof("%s notify %s event to listeners.", r.name, event.toString()) + logger.Infof("%s notify %s event to listeners , connectionId=%s", r.name, event.toString(), r.currentConnection.getConnectionId()) for _, v := range listeners { if event.isConnected() { v.OnConnected()