Skip to content

Commit

Permalink
[Improvement](rpc) set grpc channel's keepAliveTime and remove proxy … (
Browse files Browse the repository at this point in the history
#38380)

…on InterruptedExcep… (#37304)

## Proposed changes
1. set grpc channel's keepAliveTime
2. remove proxy on InterruptedException/TimeoutException to avoid
channel unavailable

pick from #37304
## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->
  • Loading branch information
BiteTheDDDDt authored Jul 26, 2024
1 parent f13363a commit 71c7c77
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,14 @@ public class Config extends ConfigBase {
@ConfField
public static int grpc_threadmgr_threads_nums = 4096;

/**
* sets the time without read activity before sending a keepalive ping
* the smaller the value, the sooner the channel is unavailable, but it will increase network io
*/
@ConfField(description = { "设置grpc连接发送 keepalive ping 之前没有数据传输的时间。",
"The time without grpc read activity before sending a keepalive ping" })
public static int grpc_keep_alive_second = 10;

/**
* Used to set minimal number of replication per tablet.
*/
Expand Down
2 changes: 2 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1001,13 +1001,15 @@ private void waitPipelineRpc(List<Triple<PipelineExecContexts, BackendServicePro
} catch (InterruptedException e) {
exception = e;
code = TStatusCode.INTERNAL_ERROR;
triple.getMiddle().removeProxy(triple.getLeft().brpcAddr);
} catch (TimeoutException e) {
exception = e;
errMsg = String.format(
"timeout when waiting for %s rpc, query timeout:%d, left timeout for this operation:%d",
operation, queryOptions.getExecutionTimeout(), timeoutMs / 1000);
LOG.warn("Query {} {}", DebugUtil.printId(queryId), errMsg);
code = TStatusCode.TIMEOUT;
triple.getMiddle().removeProxy(triple.getLeft().brpcAddr);
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class BackendServiceClient {
public BackendServiceClient(TNetworkAddress address, Executor executor) {
this.address = address;
channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort())
.executor(executor)
.executor(executor).keepAliveTime(Config.grpc_keep_alive_second, TimeUnit.SECONDS)
.flowControlWindow(Config.grpc_max_message_size_bytes)
.keepAliveWithoutCalls(true)
.maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
Expand Down

0 comments on commit 71c7c77

Please sign in to comment.