Skip to content

Commit

Permalink
fix: Improve message sender / handler robustness in resync scenarios (#…
Browse files Browse the repository at this point in the history
…13733) (#13805)

Process re-sync messages via normal message queue and use semaphore to protect re-sync process (i.e. do not allow other messages while performing re-sync).

This PR is adopted from similar fixes for Vaadin 8.

vaadin/framework#11791
vaadin/framework#12043
vaadin/framework#12178

This also changes the method `forceMessageHandling` in a way that the desire to resynchronise is registered before calling `endRequest`. If this is not done, `endRequest` may end up sending out a request itself and that then causes the re-sync request to fail because a request is already in flight. This ends up throwing an IllegalStateException at com/vaadin/client/communication/RequestResponseTracker.java.

Fixes #13726

Co-authored-by: Artur <[email protected]>
Co-authored-by: Pepijn Van Eeckhoudt @pepijnve

Co-authored-by: Tatu Lund <[email protected]>
Co-authored-by: Artur <[email protected]>
  • Loading branch information
3 people authored and mshabarov committed Jun 10, 2022
1 parent f81024b commit 527a1d3
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.vaadin.client.UILifecycle.UIState;
import com.vaadin.client.ValueMap;
import com.vaadin.client.WidgetUtil;
import com.vaadin.client.communication.MessageSender.ResynchronizationState;
import com.vaadin.client.flow.ConstantPool;
import com.vaadin.client.flow.StateNode;
import com.vaadin.client.flow.StateTree;
Expand Down Expand Up @@ -217,7 +218,18 @@ public void handleMessage(final ValueMap json) {
protected void handleJSON(final ValueMap valueMap) {
final int serverId = getServerId(valueMap);

if (isResynchronize(valueMap) && !isNextExpectedMessage(serverId)) {
boolean hasResynchronize = isResynchronize(valueMap);

if (!hasResynchronize && registry.getMessageSender()
.getResynchronizationState() == ResynchronizationState.WAITING_FOR_RESPONSE) {
Console.warn(
"Ignoring message from the server as a resync request is ongoing.");
return;
}

registry.getMessageSender().clearResynchronizationState();

if (hasResynchronize && !isNextExpectedMessage(serverId)) {
// Resynchronize request. We must remove any old pending
// messages and ensure this is handled next. Otherwise we
// would keep waiting for an older message forever (if this
Expand Down Expand Up @@ -553,11 +565,6 @@ private int getExpectedServerId() {
}

private void forceMessageHandling() {
// Clear previous request if it exists. Otherwise resyncrhonize can trigger
// "Trying to start a new request while another is active" exception and fail.
if (registry.getRequestResponseTracker().hasActiveRequest()) {
registry.getRequestResponseTracker().endRequest();
}
if (!responseHandlingLocks.isEmpty()) {
// Lock which was never release -> bug in locker or things just
// too slow
Expand All @@ -578,6 +585,19 @@ private void forceMessageHandling() {
// has been lost
// Drop pending messages and resynchronize
pendingUIDLMessages.clear();

// Inform the message sender that resynchronize is desired already
// since endRequest may already send out a next request
registry.getMessageSender().requestResynchronize();

// Clear previous request if it exists.
if (registry.getRequestResponseTracker().hasActiveRequest()) {
registry.getRequestResponseTracker().endRequest();
}

// Call resynchronize to make sure a resynchronize request is sent
// in
// case endRequest did not already do this.
registry.getMessageSender().resynchronize();
}
}
Expand Down Expand Up @@ -804,5 +824,4 @@ public void setNextResponseSessionExpiredHandler(
Command nextResponseSessionExpiredHandler) {
this.nextResponseSessionExpiredHandler = nextResponseSessionExpiredHandler;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
*/
public class MessageSender {

public enum ResynchronizationState {
NOT_ACTIVE, SEND_TO_SERVER, WAITING_FOR_RESPONSE
}

/**
* Counter for the messages send to the server. First sent message has id 0.
*/
Expand All @@ -45,6 +49,8 @@ public class MessageSender {
private final Registry registry;
private final PushConnectionFactory pushConnectionFactory;

private ResynchronizationState resynchronizationState = ResynchronizationState.NOT_ACTIVE;

/**
* Creates a new instance connected to the given registry.
*
Expand Down Expand Up @@ -87,15 +93,17 @@ public void sendInvocationsToServer() {
private void doSendInvocationsToServer() {

ServerRpcQueue serverRpcQueue = registry.getServerRpcQueue();
if (serverRpcQueue.isEmpty()) {
if (serverRpcQueue.isEmpty()
&& resynchronizationState != ResynchronizationState.SEND_TO_SERVER) {
return;
}

boolean showLoadingIndicator = serverRpcQueue.showLoadingIndicator();
JsonArray reqJson = serverRpcQueue.toJson();
serverRpcQueue.clear();

if (reqJson.length() == 0) {
if (reqJson.length() == 0
&& resynchronizationState != ResynchronizationState.SEND_TO_SERVER) {
// Nothing to send, all invocations were filtered out (for
// non-existing connectors)
Console.warn(
Expand All @@ -104,6 +112,11 @@ private void doSendInvocationsToServer() {
}

JsonObject extraJson = Json.createObject();
if (resynchronizationState == ResynchronizationState.SEND_TO_SERVER) {
resynchronizationState = ResynchronizationState.WAITING_FOR_RESPONSE;
Console.log("Resynchronizing from server");
extraJson.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
}
if (showLoadingIndicator) {
registry.getLoadingIndicator().trigger();
}
Expand Down Expand Up @@ -217,10 +230,9 @@ public String getCommunicationMethodName() {
* state from the server
*/
public void resynchronize() {
Console.log("Resynchronizing from server");
JsonObject resyncParam = Json.createObject();
resyncParam.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
send(Json.createArray(), resyncParam);
if (requestResynchronize()) {
sendInvocationsToServer();
}
}

/**
Expand Down Expand Up @@ -262,4 +274,37 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) {
// Do nothing as they will arrive eventually
}
}

/**
* Modifies the resynchronize state to indicate that resynchronization is
* desired
*
* @return true if the resynchronize request still needs to be sent; false
* otherwise
*/
boolean requestResynchronize() {
switch (resynchronizationState) {
case NOT_ACTIVE:
Console.log("Resynchronize from server requested");
resynchronizationState = ResynchronizationState.SEND_TO_SERVER;
return true;
case SEND_TO_SERVER:
// Resynchronize has already been requested, but hasn't been sent
// yet
return true;
case WAITING_FOR_RESPONSE:
default:
// Resynchronize has already been requested, but response hasn't
// been received yet
return false;
}
}

void clearResynchronizationState() {
resynchronizationState = ResynchronizationState.NOT_ACTIVE;
}

ResynchronizationState getResynchronizationState() {
return resynchronizationState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.web.bindery.event.shared.Event;
import com.google.web.bindery.event.shared.EventBus;
import com.google.web.bindery.event.shared.HandlerRegistration;

import com.vaadin.client.communication.MessageSender.ResynchronizationState;
import com.vaadin.client.Registry;
import com.vaadin.client.gwt.com.google.web.bindery.event.shared.SimpleEventBus;

Expand Down Expand Up @@ -106,8 +108,10 @@ public void endRequest() {
// the call.
hasActiveRequest = false;

if (registry.getUILifecycle().isRunning()
&& registry.getServerRpcQueue().isFlushPending()) {
if ((registry.getUILifecycle().isRunning()
&& registry.getServerRpcQueue().isFlushPending())
|| registry.getMessageSender()
.getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER) {
// Send the pending RPCs immediately.
// This might be an unnecessary optimization as ServerRpcQueue has a
// finally scheduled command which trigger the send if we do not do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.vaadin.client.UILifecycle.UIState;
import com.vaadin.client.ValueMap;
import com.vaadin.client.communication.MessageHandler;
import com.vaadin.client.communication.MessageSender;
import com.vaadin.client.communication.RequestResponseTracker;
import com.vaadin.client.communication.ServerRpcQueue;

Expand All @@ -29,6 +30,7 @@ protected void gwtSetUp() throws Exception {
set(RequestResponseTracker.class,
new RequestResponseTracker(this));
set(MessageHandler.class, new MessageHandler(this));
set(MessageSender.class, new MessageSender(this));
set(ServerRpcQueue.class, new ServerRpcQueue(this));
set(DependencyLoader.class, new DependencyLoader(this));
set(ResourceLoader.class, new ResourceLoader(this, false));
Expand Down

0 comments on commit 527a1d3

Please sign in to comment.