Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve message sender / handler robustness in resync scenarios #13733

Merged
merged 21 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class MessageHandler {
*/
private static final int UNDEFINED_SYNC_ID = -1;

private boolean resyncInProgress;
mshabarov marked this conversation as resolved.
Show resolved Hide resolved

/**
* If responseHandlingLocks contains any objects, response handling is
* suspended until the collection is empty or a timeout has occurred.
Expand Down Expand Up @@ -217,7 +219,17 @@ public void handleMessage(final ValueMap json) {
protected void handleJSON(final ValueMap valueMap) {
final int serverId = getServerId(valueMap);

if (isResynchronize(valueMap) && !isNextExpectedMessage(serverId)) {
boolean hasResynchronize = isResynchronize(valueMap);

if (!hasResynchronize && resyncInProgress) {
Console.warn(
"Dropping the response of a request before a resync request.");
TatuLund marked this conversation as resolved.
Show resolved Hide resolved
return;
}

resyncInProgress = false;

if (hasResynchronize && !isNextExpectedMessage(serverId)) {
// Resynchronize request. We must remove any old pending
// messages and ensure this is handled next. Otherwise we
// would keep waiting for an older message forever (if this
Expand Down Expand Up @@ -820,4 +832,7 @@ public void setNextResponseSessionExpiredHandler(
this.nextResponseSessionExpiredHandler = nextResponseSessionExpiredHandler;
}

public void onResynchronize() {
resyncInProgress = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class MessageSender {
private final Registry registry;
private final PushConnectionFactory pushConnectionFactory;

private boolean resynchronizeRequested = false;
mshabarov marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates a new instance connected to the given registry.
*
Expand Down Expand Up @@ -88,15 +90,15 @@ public void sendInvocationsToServer() {
private void doSendInvocationsToServer() {

ServerRpcQueue serverRpcQueue = registry.getServerRpcQueue();
if (serverRpcQueue.isEmpty()) {
if (serverRpcQueue.isEmpty() && !resynchronizeRequested) {
return;
}

boolean showLoadingIndicator = serverRpcQueue.showLoadingIndicator();
JsonArray reqJson = serverRpcQueue.toJson();
serverRpcQueue.clear();

if (reqJson.length() == 0) {
if (reqJson.length() == 0 && !resynchronizeRequested) {
// Nothing to send, all invocations were filtered out (for
// non-existing connectors)
Console.warn(
Expand All @@ -105,6 +107,12 @@ private void doSendInvocationsToServer() {
}

JsonObject extraJson = Json.createObject();
if (resynchronizeRequested) {
registry.getMessageHandler().onResynchronize();
Console.log("Resynchronizing from server");
extraJson.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
resynchronizeRequested = false;
}
if (showLoadingIndicator) {
ConnectionIndicator.setState(ConnectionIndicator.LOADING);
}
Expand Down Expand Up @@ -218,10 +226,9 @@ public String getCommunicationMethodName() {
* state from the server
*/
public void resynchronize() {
Console.log("Resynchronizing from server");
JsonObject resyncParam = Json.createObject();
resyncParam.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
send(Json.createArray(), resyncParam);
Console.log("Resynchronize from server requested");
resynchronizeRequested = true;
sendInvocationsToServer();
}

/**
Expand Down