From 12f474e4e780a5242830cc6926db267ca5b1bef7 Mon Sep 17 00:00:00 2001 From: Xz <34960122+XiaZhouxx@users.noreply.github.com> Date: Wed, 18 Sep 2024 10:38:55 +0800 Subject: [PATCH] [ISSUE #12103] Enhance ClientWorker support grpc request timeout param. (#12619) * [ISSUE #12103] Enhance ClientWorker support grpc request timeout param. * test case. --- .../com/alibaba/nacos/api/PropertyKeyConst.java | 2 ++ .../nacos/client/config/impl/ClientWorker.java | 13 ++++++++++--- .../nacos/client/config/impl/ClientWorkerTest.java | 12 ++++++------ 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java b/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java index fe27f27ca7b..424319f7a4a 100644 --- a/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java +++ b/api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java @@ -63,6 +63,8 @@ public class PropertyKeyConst { public static final String CONFIG_RETRY_TIME = "configRetryTime"; + public static final String CONFIG_REQUEST_TIMEOUT = "configRequestTimeout"; + public static final String CLIENT_WORKER_MAX_THREAD_COUNT = "clientWorkerMaxThreadCount"; public static final String CLIENT_WORKER_THREAD_COUNT = "clientWorkerThreadCount"; diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index 465d597fb5c..cb2da874065 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -138,6 +138,8 @@ public class ClientWorker implements Closeable { private long timeout; + private long requestTimeout; + private final ConfigRpcTransportClient agent; private int taskPenaltyTime; @@ -405,7 +407,7 @@ public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant cache.setTaskId(taskId); // fix issue # 1317 if (enableRemoteSyncConfig) { - ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L, false); + ConfigResponse response = getServerConfig(dataId, group, tenant, requestTimeout, false); cache.setEncryptedDataKey(response.getEncryptedDataKey()); cache.setContent(response.getContent()); } @@ -510,6 +512,8 @@ private int initWorkerThreadCount(NacosClientProperties properties) { private void init(NacosClientProperties properties) { + requestTimeout = ConvertUtils.toLong(properties.getProperty(PropertyKeyConst.CONFIG_REQUEST_TIMEOUT, "-1")); + timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT), Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT); @@ -927,7 +931,7 @@ private void refreshContentAndCheck(RpcClient rpcClient, CacheData cacheData, bo try { ConfigResponse response = this.queryConfigInner(rpcClient, cacheData.dataId, cacheData.group, - cacheData.tenant, 3000L, notify); + cacheData.tenant, requestTimeout, notify); cacheData.setEncryptedDataKey(response.getEncryptedDataKey()); cacheData.setContent(response.getContent()); if (null != response.getConfigType()) { @@ -1198,7 +1202,7 @@ ConfigResponse queryConfigInner(RpcClient rpcClient, String dataId, String group } private Response requestProxy(RpcClient rpcClientInner, Request request) throws NacosException { - return requestProxy(rpcClientInner, request, 3000L); + return requestProxy(rpcClientInner, request, requestTimeout); } private Response requestProxy(RpcClient rpcClientInner, Request request, long timeoutMills) @@ -1217,6 +1221,9 @@ private Response requestProxy(RpcClient rpcClientInner, Request request, long ti throw new NacosException(NacosException.CLIENT_OVER_THRESHOLD, "More than client-side current limit threshold"); } + if (timeoutMills < 0) { + return rpcClientInner.request(request); + } return rpcClientInner.request(request, timeoutMills); } diff --git a/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java b/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java index 0b60d704224..fa9687e41eb 100644 --- a/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/config/impl/ClientWorkerTest.java @@ -232,7 +232,7 @@ void testPublishConfigSuccess() throws NacosException { String casMd5 = "1111"; String type = "properties"; - Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong())) + Mockito.when(rpcClient.request(any(ConfigPublishRequest.class))) .thenReturn(new ConfigPublishResponse()); boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5, type); @@ -261,7 +261,7 @@ void testPublishConfigFail() throws NacosException { String casMd5 = "1111"; String type = "properties"; - Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong())) + Mockito.when(rpcClient.request(any(ConfigPublishRequest.class))) .thenReturn(ConfigPublishResponse.buildFailResponse(503, "over limit")); boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5, type); @@ -290,7 +290,7 @@ void testPublishConfigException() throws NacosException { String casMd5 = "1111"; String type = "properties"; - Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong())).thenThrow(new NacosException()); + Mockito.when(rpcClient.request(any(ConfigPublishRequest.class))).thenThrow(new NacosException()); boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5, type); assertFalse(b); @@ -313,7 +313,7 @@ void testRemoveConfig() throws NacosException { String tag = "tag"; try { - Mockito.when(rpcClient.request(any(ConfigRemoveRequest.class), anyLong())) + Mockito.when(rpcClient.request(any(ConfigRemoveRequest.class))) .thenThrow(new NacosException(503, "overlimit")); clientWorker.removeConfig(dataId, group, tenant, tag); @@ -562,13 +562,13 @@ public void receiveConfigInfo(String configInfo) { () -> RpcClientFactory.createClient(anyString(), any(ConnectionType.class), any(Map.class), any(RpcClientTlsConfig.class))).thenReturn(rpcClientInner); // mock listen and remove listen request - Mockito.when(rpcClientInner.request(any(ConfigBatchListenRequest.class), anyLong())) + Mockito.when(rpcClientInner.request(any(ConfigBatchListenRequest.class))) .thenReturn(response, response); // mock query changed config ConfigQueryResponse configQueryResponse = new ConfigQueryResponse(); configQueryResponse.setContent("content" + System.currentTimeMillis()); configQueryResponse.setContentType(ConfigType.JSON.getType()); - Mockito.when(rpcClientInner.request(any(ConfigQueryRequest.class), anyLong())).thenReturn(configQueryResponse); + Mockito.when(rpcClientInner.request(any(ConfigQueryRequest.class))).thenReturn(configQueryResponse); (clientWorker.getAgent()).executeConfigListen(); //assert //use local cache.