diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java index 7282fa9130..78120a2c14 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java +++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java @@ -79,31 +79,49 @@ public TransportResponseHandler(TransportConf conf, Channel channel) { this.timeOfLastRequestNs = new AtomicLong(0); this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs(); this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs(); + + String module = conf.getModuleName(); + boolean checkPushTimeout = false; + boolean checkFetchTimeout = false; + if (TransportModuleConstants.DATA_MODULE.equals(module)) { + checkPushTimeout = true; + checkFetchTimeout = true; + } else if (TransportModuleConstants.PUSH_MODULE.equals(module)) { + checkPushTimeout = true; + } synchronized (TransportResponseHandler.class) { - if (pushTimeoutChecker == null) { - pushTimeoutChecker = - ThreadUtils.newDaemonThreadPoolScheduledExecutor( - "push-timeout-checker", conf.pushDataTimeoutCheckerThreads()); - } - if (fetchTimeoutChecker == null) { - fetchTimeoutChecker = - ThreadUtils.newDaemonThreadPoolScheduledExecutor( - "fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads()); + if (checkPushTimeout) { + if (pushTimeoutChecker == null) { + pushTimeoutChecker = + ThreadUtils.newDaemonThreadPoolScheduledExecutor( + "push-timeout-checker", conf.pushDataTimeoutCheckerThreads()); + } + if (checkFetchTimeout) { + if (fetchTimeoutChecker == null) { + fetchTimeoutChecker = + ThreadUtils.newDaemonThreadPoolScheduledExecutor( + "fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads()); + } + } } } - pushCheckerScheduleFuture = - pushTimeoutChecker.scheduleAtFixedRate( - () -> failExpiredPushRequest(), - pushTimeoutCheckerInterval, - pushTimeoutCheckerInterval, - TimeUnit.MILLISECONDS); + if (checkPushTimeout) { + pushCheckerScheduleFuture = + pushTimeoutChecker.scheduleAtFixedRate( + () -> failExpiredPushRequest(), + pushTimeoutCheckerInterval, + pushTimeoutCheckerInterval, + TimeUnit.MILLISECONDS); + } - fetchCheckerScheduleFuture = - fetchTimeoutChecker.scheduleAtFixedRate( - () -> failExpiredFetchRequest(), - fetchTimeoutCheckerInterval, - fetchTimeoutCheckerInterval, - TimeUnit.MILLISECONDS); + if (checkFetchTimeout) { + fetchCheckerScheduleFuture = + fetchTimeoutChecker.scheduleAtFixedRate( + () -> failExpiredFetchRequest(), + fetchTimeoutCheckerInterval, + fetchTimeoutCheckerInterval, + TimeUnit.MILLISECONDS); + } } public void failExpiredPushRequest() { @@ -251,8 +269,12 @@ public void channelInactive() { remoteAddress); failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed")); } - pushCheckerScheduleFuture.cancel(false); - fetchCheckerScheduleFuture.cancel(false); + if (pushCheckerScheduleFuture != null) { + pushCheckerScheduleFuture.cancel(false); + } + if (fetchCheckerScheduleFuture != null) { + fetchCheckerScheduleFuture.cancel(false); + } } @Override @@ -265,8 +287,12 @@ public void exceptionCaught(Throwable cause) { remoteAddress); failOutstandingRequests(cause); } - pushCheckerScheduleFuture.cancel(false); - fetchCheckerScheduleFuture.cancel(false); + if (pushCheckerScheduleFuture != null) { + pushCheckerScheduleFuture.cancel(false); + } + if (fetchCheckerScheduleFuture != null) { + fetchCheckerScheduleFuture.cancel(false); + } } @Override