Skip to content

Commit

Permalink
[ISSUE #12103] Enhance ClientWorker support grpc request timeout para…
Browse files Browse the repository at this point in the history
…m. (#12619)

* [ISSUE #12103] Enhance ClientWorker support grpc request timeout param.

* test case.
  • Loading branch information
XiaZhouxx authored Sep 18, 2024
1 parent 3a9003b commit 12f474e
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
2 changes: 2 additions & 0 deletions api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class PropertyKeyConst {

public static final String CONFIG_RETRY_TIME = "configRetryTime";

public static final String CONFIG_REQUEST_TIMEOUT = "configRequestTimeout";

public static final String CLIENT_WORKER_MAX_THREAD_COUNT = "clientWorkerMaxThreadCount";

public static final String CLIENT_WORKER_THREAD_COUNT = "clientWorkerThreadCount";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public class ClientWorker implements Closeable {

private long timeout;

private long requestTimeout;

private final ConfigRpcTransportClient agent;

private int taskPenaltyTime;
Expand Down Expand Up @@ -405,7 +407,7 @@ public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant
cache.setTaskId(taskId);
// fix issue # 1317
if (enableRemoteSyncConfig) {
ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L, false);
ConfigResponse response = getServerConfig(dataId, group, tenant, requestTimeout, false);
cache.setEncryptedDataKey(response.getEncryptedDataKey());
cache.setContent(response.getContent());
}
Expand Down Expand Up @@ -510,6 +512,8 @@ private int initWorkerThreadCount(NacosClientProperties properties) {

private void init(NacosClientProperties properties) {

requestTimeout = ConvertUtils.toLong(properties.getProperty(PropertyKeyConst.CONFIG_REQUEST_TIMEOUT, "-1"));

timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT),
Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);

Expand Down Expand Up @@ -927,7 +931,7 @@ private void refreshContentAndCheck(RpcClient rpcClient, CacheData cacheData, bo
try {

ConfigResponse response = this.queryConfigInner(rpcClient, cacheData.dataId, cacheData.group,
cacheData.tenant, 3000L, notify);
cacheData.tenant, requestTimeout, notify);
cacheData.setEncryptedDataKey(response.getEncryptedDataKey());
cacheData.setContent(response.getContent());
if (null != response.getConfigType()) {
Expand Down Expand Up @@ -1198,7 +1202,7 @@ ConfigResponse queryConfigInner(RpcClient rpcClient, String dataId, String group
}

private Response requestProxy(RpcClient rpcClientInner, Request request) throws NacosException {
return requestProxy(rpcClientInner, request, 3000L);
return requestProxy(rpcClientInner, request, requestTimeout);
}

private Response requestProxy(RpcClient rpcClientInner, Request request, long timeoutMills)
Expand All @@ -1217,6 +1221,9 @@ private Response requestProxy(RpcClient rpcClientInner, Request request, long ti
throw new NacosException(NacosException.CLIENT_OVER_THRESHOLD,
"More than client-side current limit threshold");
}
if (timeoutMills < 0) {
return rpcClientInner.request(request);
}
return rpcClientInner.request(request, timeoutMills);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ void testPublishConfigSuccess() throws NacosException {
String casMd5 = "1111";

String type = "properties";
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong()))
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class)))
.thenReturn(new ConfigPublishResponse());
boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5,
type);
Expand Down Expand Up @@ -261,7 +261,7 @@ void testPublishConfigFail() throws NacosException {
String casMd5 = "1111";

String type = "properties";
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong()))
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class)))
.thenReturn(ConfigPublishResponse.buildFailResponse(503, "over limit"));
boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5,
type);
Expand Down Expand Up @@ -290,7 +290,7 @@ void testPublishConfigException() throws NacosException {
String casMd5 = "1111";

String type = "properties";
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class), anyLong())).thenThrow(new NacosException());
Mockito.when(rpcClient.request(any(ConfigPublishRequest.class))).thenThrow(new NacosException());
boolean b = clientWorker.publishConfig(dataId, group, tenant, appName, tag, betaIps, content, null, casMd5,
type);
assertFalse(b);
Expand All @@ -313,7 +313,7 @@ void testRemoveConfig() throws NacosException {

String tag = "tag";
try {
Mockito.when(rpcClient.request(any(ConfigRemoveRequest.class), anyLong()))
Mockito.when(rpcClient.request(any(ConfigRemoveRequest.class)))
.thenThrow(new NacosException(503, "overlimit"));

clientWorker.removeConfig(dataId, group, tenant, tag);
Expand Down Expand Up @@ -562,13 +562,13 @@ public void receiveConfigInfo(String configInfo) {
() -> RpcClientFactory.createClient(anyString(), any(ConnectionType.class), any(Map.class),
any(RpcClientTlsConfig.class))).thenReturn(rpcClientInner);
// mock listen and remove listen request
Mockito.when(rpcClientInner.request(any(ConfigBatchListenRequest.class), anyLong()))
Mockito.when(rpcClientInner.request(any(ConfigBatchListenRequest.class)))
.thenReturn(response, response);
// mock query changed config
ConfigQueryResponse configQueryResponse = new ConfigQueryResponse();
configQueryResponse.setContent("content" + System.currentTimeMillis());
configQueryResponse.setContentType(ConfigType.JSON.getType());
Mockito.when(rpcClientInner.request(any(ConfigQueryRequest.class), anyLong())).thenReturn(configQueryResponse);
Mockito.when(rpcClientInner.request(any(ConfigQueryRequest.class))).thenReturn(configQueryResponse);
(clientWorker.getAgent()).executeConfigListen();
//assert
//use local cache.
Expand Down

0 comments on commit 12f474e

Please sign in to comment.