Skip to content

Commit

Permalink
[CELEBORN-1008] Adjust push/fetch timeout checker thread pool and tasks
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Only push/data module needs push-timeout-checker, and data module needs fetch-timeout-checker.
Here make push-timeout-checker not to be created in Master/LifeCycleManager, and fetch-timeout-checker in Worker.
The same goes for related timeout checker schedule tasks.

### Why are the changes needed?
Ditto

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Cluster test

Closes #1940 from onebox-li/checker-dev.

Authored-by: onebox-li <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
onebox-li authored and waitinfuture committed Sep 28, 2023
1 parent b4dfc09 commit ef4fc51
Showing 1 changed file with 51 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,31 +79,49 @@ public TransportResponseHandler(TransportConf conf, Channel channel) {
this.timeOfLastRequestNs = new AtomicLong(0);
this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();

String module = conf.getModuleName();
boolean checkPushTimeout = false;
boolean checkFetchTimeout = false;
if (TransportModuleConstants.DATA_MODULE.equals(module)) {
checkPushTimeout = true;
checkFetchTimeout = true;
} else if (TransportModuleConstants.PUSH_MODULE.equals(module)) {
checkPushTimeout = true;
}
synchronized (TransportResponseHandler.class) {
if (pushTimeoutChecker == null) {
pushTimeoutChecker =
ThreadUtils.newDaemonThreadPoolScheduledExecutor(
"push-timeout-checker", conf.pushDataTimeoutCheckerThreads());
}
if (fetchTimeoutChecker == null) {
fetchTimeoutChecker =
ThreadUtils.newDaemonThreadPoolScheduledExecutor(
"fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads());
if (checkPushTimeout) {
if (pushTimeoutChecker == null) {
pushTimeoutChecker =
ThreadUtils.newDaemonThreadPoolScheduledExecutor(
"push-timeout-checker", conf.pushDataTimeoutCheckerThreads());
}
if (checkFetchTimeout) {
if (fetchTimeoutChecker == null) {
fetchTimeoutChecker =
ThreadUtils.newDaemonThreadPoolScheduledExecutor(
"fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads());
}
}
}
}
pushCheckerScheduleFuture =
pushTimeoutChecker.scheduleAtFixedRate(
() -> failExpiredPushRequest(),
pushTimeoutCheckerInterval,
pushTimeoutCheckerInterval,
TimeUnit.MILLISECONDS);
if (checkPushTimeout) {
pushCheckerScheduleFuture =
pushTimeoutChecker.scheduleAtFixedRate(
() -> failExpiredPushRequest(),
pushTimeoutCheckerInterval,
pushTimeoutCheckerInterval,
TimeUnit.MILLISECONDS);
}

fetchCheckerScheduleFuture =
fetchTimeoutChecker.scheduleAtFixedRate(
() -> failExpiredFetchRequest(),
fetchTimeoutCheckerInterval,
fetchTimeoutCheckerInterval,
TimeUnit.MILLISECONDS);
if (checkFetchTimeout) {
fetchCheckerScheduleFuture =
fetchTimeoutChecker.scheduleAtFixedRate(
() -> failExpiredFetchRequest(),
fetchTimeoutCheckerInterval,
fetchTimeoutCheckerInterval,
TimeUnit.MILLISECONDS);
}
}

public void failExpiredPushRequest() {
Expand Down Expand Up @@ -251,8 +269,12 @@ public void channelInactive() {
remoteAddress);
failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
}
pushCheckerScheduleFuture.cancel(false);
fetchCheckerScheduleFuture.cancel(false);
if (pushCheckerScheduleFuture != null) {
pushCheckerScheduleFuture.cancel(false);
}
if (fetchCheckerScheduleFuture != null) {
fetchCheckerScheduleFuture.cancel(false);
}
}

@Override
Expand All @@ -265,8 +287,12 @@ public void exceptionCaught(Throwable cause) {
remoteAddress);
failOutstandingRequests(cause);
}
pushCheckerScheduleFuture.cancel(false);
fetchCheckerScheduleFuture.cancel(false);
if (pushCheckerScheduleFuture != null) {
pushCheckerScheduleFuture.cancel(false);
}
if (fetchCheckerScheduleFuture != null) {
fetchCheckerScheduleFuture.cancel(false);
}
}

@Override
Expand Down

0 comments on commit ef4fc51

Please sign in to comment.