Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27491: do not clear cache on RejectedExecutionException #4914

Merged
merged 2 commits into from
Dec 9, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ public void run() {
} catch (IOException e) {
// The service itself failed . It may be an error coming from the communication
// layer, but, as well, a functional error raised by the server.
receiveGlobalFailure(multiAction, server, numAttempt, e);
receiveGlobalFailure(multiAction, server, numAttempt, e, true);
return;
} catch (Throwable t) {
// This should not happen. Let's log & retry anyway.
LOG.error("id=" + asyncProcess.id + ", caught throwable. Unexpected."
+ " Retrying. Server=" + server + ", tableName=" + tableName, t);
receiveGlobalFailure(multiAction, server, numAttempt, t);
receiveGlobalFailure(multiAction, server, numAttempt, t, true);
return;
}
if (res.type() == AbstractResponse.ResponseType.MULTI) {
Expand Down Expand Up @@ -520,6 +520,7 @@ private RegionLocations findAllLocationsOrFail(Action action, boolean useCache)
*/
void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, int numAttempt,
List<Action> actionsForReplicaThread, boolean reuseThread) {
boolean clearServerCache = true;
// Run the last item on the same thread if we are already on a send thread.
// We hope most of the time it will be the only item, so we can cut down on threads.
int actionsRemaining = actionsByServer.size();
Expand Down Expand Up @@ -554,14 +555,16 @@ void sendMultiAction(Map<ServerName, MultiAction> actionsByServer, int numAttemp
// let's secure this a little.
LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + " Server="
+ server.getServerName(), t);
// Do not update cache if exception is from failing to submit action to thread pool
clearServerCache = false;
} else {
// see #HBASE-14359 for more details
LOG.warn("Caught unexpected exception/error: ", t);
}
asyncProcess.decTaskCounters(multiAction.getRegions(), server);
// We're likely to fail again, but this will increment the attempt counter,
// so it will finish.
receiveGlobalFailure(multiAction, server, numAttempt, t);
receiveGlobalFailure(multiAction, server, numAttempt, t, clearServerCache);
}
}
}
Expand Down Expand Up @@ -711,21 +714,28 @@ private void failAll(MultiAction actions, ServerName server, int numAttempt,
* @param t the throwable (if any) that caused the resubmit
*/
private void receiveGlobalFailure(MultiAction rsActions, ServerName server, int numAttempt,
Throwable t) {
Throwable t, boolean clearServerCache) {
errorsByServer.reportServerError(server);
Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;

cleanServerCache(server, t);
// Do not update cache if exception is from failing to submit action to thread pool
if (clearServerCache) {
cleanServerCache(server, t);
}

int failed = 0;
int stopped = 0;
List<Action> toReplay = new ArrayList<>();
for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
byte[] regionName = e.getKey();
byte[] row = e.getValue().get(0).getAction().getRow();
// Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction.
updateCachedLocations(server, regionName, row,
ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
// any of the regions in the MultiAction and do not update cache if exception is
// from failing to submit action to thread pool
if (clearServerCache) {
updateCachedLocations(server, regionName, row,
ClientExceptionsUtil.isMetaClearingException(t) ? null : t);
}
for (Action action : e.getValue()) {
Retry retry =
manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server);
Expand Down