Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve message sender / handler robustness in resync scenarios #13733

Merged
merged 21 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.vaadin.client.UILifecycle.UIState;
import com.vaadin.client.ValueMap;
import com.vaadin.client.WidgetUtil;
import com.vaadin.client.communication.MessageSender.ResynchronizationState;
import com.vaadin.client.flow.ConstantPool;
import com.vaadin.client.flow.StateNode;
import com.vaadin.client.flow.StateTree;
Expand Down Expand Up @@ -217,7 +218,18 @@ public void handleMessage(final ValueMap json) {
protected void handleJSON(final ValueMap valueMap) {
final int serverId = getServerId(valueMap);

if (isResynchronize(valueMap) && !isNextExpectedMessage(serverId)) {
boolean hasResynchronize = isResynchronize(valueMap);

if (!hasResynchronize && registry.getMessageSender()
.getResynchronizationState() == ResynchronizationState.WAITING_FOR_RESPONSE) {
Console.warn(
"Ignoring message from the server as a resync request is ongoing.");
return;
}

registry.getMessageSender().clearResynchronizationState();

if (hasResynchronize && !isNextExpectedMessage(serverId)) {
// Resynchronize request. We must remove any old pending
// messages and ensure this is handled next. Otherwise we
// would keep waiting for an older message forever (if this
Expand Down Expand Up @@ -566,13 +578,6 @@ private int getExpectedServerId() {
}

private void forceMessageHandling() {
// Clear previous request if it exists. Otherwise resyncrhonize can
// trigger
// "Trying to start a new request while another is active" exception and
// fail.
if (registry.getRequestResponseTracker().hasActiveRequest()) {
registry.getRequestResponseTracker().endRequest();
}
if (!responseHandlingLocks.isEmpty()) {
// Lock which was never release -> bug in locker or things just
// too slow
Expand All @@ -593,6 +598,19 @@ private void forceMessageHandling() {
// has been lost
// Drop pending messages and resynchronize
pendingUIDLMessages.clear();

// Inform the message sender that resynchronize is desired already
// since endRequest may already send out a next request
registry.getMessageSender().requestResynchronize();

// Clear previous request if it exists.
if (registry.getRequestResponseTracker().hasActiveRequest()) {
registry.getRequestResponseTracker().endRequest();
}

// Call resynchronize to make sure a resynchronize request is sent
// in
// case endRequest did not already do this.
registry.getMessageSender().resynchronize();
}
}
Expand Down Expand Up @@ -819,5 +837,4 @@ public void setNextResponseSessionExpiredHandler(
Command nextResponseSessionExpiredHandler) {
this.nextResponseSessionExpiredHandler = nextResponseSessionExpiredHandler;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
*/
public class MessageSender {

public enum ResynchronizationState {
NOT_ACTIVE, SEND_TO_SERVER, WAITING_FOR_RESPONSE
}

/**
* Counter for the messages send to the server. First sent message has id 0.
*/
Expand All @@ -46,6 +50,8 @@ public class MessageSender {
private final Registry registry;
private final PushConnectionFactory pushConnectionFactory;

private ResynchronizationState resynchronizationState = ResynchronizationState.NOT_ACTIVE;

/**
* Creates a new instance connected to the given registry.
*
Expand Down Expand Up @@ -88,15 +94,17 @@ public void sendInvocationsToServer() {
private void doSendInvocationsToServer() {

ServerRpcQueue serverRpcQueue = registry.getServerRpcQueue();
if (serverRpcQueue.isEmpty()) {
if (serverRpcQueue.isEmpty()
&& resynchronizationState != ResynchronizationState.SEND_TO_SERVER) {
return;
}

boolean showLoadingIndicator = serverRpcQueue.showLoadingIndicator();
JsonArray reqJson = serverRpcQueue.toJson();
serverRpcQueue.clear();

if (reqJson.length() == 0) {
if (reqJson.length() == 0
&& resynchronizationState != ResynchronizationState.SEND_TO_SERVER) {
// Nothing to send, all invocations were filtered out (for
// non-existing connectors)
Console.warn(
Expand All @@ -105,6 +113,11 @@ private void doSendInvocationsToServer() {
}

JsonObject extraJson = Json.createObject();
if (resynchronizationState == ResynchronizationState.SEND_TO_SERVER) {
resynchronizationState = ResynchronizationState.WAITING_FOR_RESPONSE;
Console.log("Resynchronizing from server");
extraJson.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
}
if (showLoadingIndicator) {
ConnectionIndicator.setState(ConnectionIndicator.LOADING);
}
Expand Down Expand Up @@ -218,10 +231,9 @@ public String getCommunicationMethodName() {
* state from the server
*/
public void resynchronize() {
Console.log("Resynchronizing from server");
JsonObject resyncParam = Json.createObject();
resyncParam.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
send(Json.createArray(), resyncParam);
if (requestResynchronize()) {
sendInvocationsToServer();
}
}

/**
Expand Down Expand Up @@ -263,4 +275,37 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) {
// Do nothing as they will arrive eventually
}
}

/**
* Modifies the resynchronize state to indicate that resynchronization is
* desired
*
* @return true if the resynchronize request still needs to be sent; false
* otherwise
*/
boolean requestResynchronize() {
switch (resynchronizationState) {
case NOT_ACTIVE:
Console.log("Resynchronize from server requested");
resynchronizationState = ResynchronizationState.SEND_TO_SERVER;
return true;
case SEND_TO_SERVER:
// Resynchronize has already been requested, but hasn't been sent
// yet
return true;
case WAITING_FOR_RESPONSE:
default:
// Resynchronize has already been requested, but response hasn't
// been received yet
return false;
}
}

void clearResynchronizationState() {
resynchronizationState = ResynchronizationState.NOT_ACTIVE;
}

ResynchronizationState getResynchronizationState() {
return resynchronizationState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.web.bindery.event.shared.EventBus;
import com.google.web.bindery.event.shared.HandlerRegistration;

import com.vaadin.client.communication.MessageSender.ResynchronizationState;
import com.vaadin.client.ConnectionIndicator;
import com.vaadin.client.Registry;
import com.vaadin.client.gwt.com.google.web.bindery.event.shared.SimpleEventBus;
Expand Down Expand Up @@ -108,8 +109,10 @@ public void endRequest() {
// the call.
hasActiveRequest = false;

if (registry.getUILifecycle().isRunning()
&& registry.getServerRpcQueue().isFlushPending()) {
if ((registry.getUILifecycle().isRunning()
&& registry.getServerRpcQueue().isFlushPending())
|| registry.getMessageSender()
.getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER) {
mshabarov marked this conversation as resolved.
Show resolved Hide resolved
// Send the pending RPCs immediately.
// This might be an unnecessary optimization as ServerRpcQueue has a
// finally scheduled command which trigger the send if we do not do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.vaadin.client.UILifecycle.UIState;
import com.vaadin.client.ValueMap;
import com.vaadin.client.communication.MessageHandler;
import com.vaadin.client.communication.MessageSender;
import com.vaadin.client.communication.RequestResponseTracker;
import com.vaadin.client.communication.ServerRpcQueue;

Expand All @@ -29,6 +30,7 @@ protected void gwtSetUp() throws Exception {
set(RequestResponseTracker.class,
new RequestResponseTracker(this));
set(MessageHandler.class, new MessageHandler(this));
set(MessageSender.class, new MessageSender(this));
set(ServerRpcQueue.class, new ServerRpcQueue(this));
set(DependencyLoader.class, new DependencyLoader(this));
set(ResourceLoader.class, new ResourceLoader(this, false));
Expand Down