Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve message sender / handler robustness in resync scenarios #13733

Merged
merged 21 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.vaadin.client.UILifecycle.UIState;
import com.vaadin.client.ValueMap;
import com.vaadin.client.WidgetUtil;
import com.vaadin.client.communication.MessageSender.ResynchronizationState;
import com.vaadin.client.flow.ConstantPool;
import com.vaadin.client.flow.StateNode;
import com.vaadin.client.flow.StateTree;
Expand Down Expand Up @@ -217,7 +218,19 @@ public void handleMessage(final ValueMap json) {
protected void handleJSON(final ValueMap valueMap) {
final int serverId = getServerId(valueMap);

if (isResynchronize(valueMap) && !isNextExpectedMessage(serverId)) {
boolean hasResynchronize = isResynchronize(valueMap);

if (!hasResynchronize && registry.getMessageSender()
.getResynchronizationState() == ResynchronizationState.WAITING_FOR_RESPONSE) {
Console.warn(
"Ignoring message from the server as a resync request is ongoing.");
return;
}

registry.getMessageSender()
.setResynchronizationState(ResynchronizationState.NOT_ACTIVE);

if (hasResynchronize && !isNextExpectedMessage(serverId)) {
// Resynchronize request. We must remove any old pending
// messages and ensure this is handled next. Otherwise we
// would keep waiting for an older message forever (if this
Expand Down Expand Up @@ -819,5 +832,4 @@ public void setNextResponseSessionExpiredHandler(
Command nextResponseSessionExpiredHandler) {
this.nextResponseSessionExpiredHandler = nextResponseSessionExpiredHandler;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
*/
public class MessageSender {

public enum ResynchronizationState {
NOT_ACTIVE, SEND_TO_SERVER, WAITING_FOR_RESPONSE
}

/**
* Counter for the messages send to the server. First sent message has id 0.
*/
Expand All @@ -46,6 +50,8 @@ public class MessageSender {
private final Registry registry;
private final PushConnectionFactory pushConnectionFactory;

private ResynchronizationState resynchronizationState = ResynchronizationState.NOT_ACTIVE;

/**
* Creates a new instance connected to the given registry.
*
Expand Down Expand Up @@ -88,15 +94,17 @@ public void sendInvocationsToServer() {
private void doSendInvocationsToServer() {

ServerRpcQueue serverRpcQueue = registry.getServerRpcQueue();
if (serverRpcQueue.isEmpty()) {
if (serverRpcQueue.isEmpty()
&& resynchronizationState != ResynchronizationState.SEND_TO_SERVER) {
return;
}

boolean showLoadingIndicator = serverRpcQueue.showLoadingIndicator();
JsonArray reqJson = serverRpcQueue.toJson();
serverRpcQueue.clear();

if (reqJson.length() == 0) {
if (reqJson.length() == 0
&& resynchronizationState != ResynchronizationState.SEND_TO_SERVER) {
// Nothing to send, all invocations were filtered out (for
// non-existing connectors)
Console.warn(
Expand All @@ -105,6 +113,11 @@ private void doSendInvocationsToServer() {
}

JsonObject extraJson = Json.createObject();
if (resynchronizationState == ResynchronizationState.SEND_TO_SERVER) {
resynchronizationState = ResynchronizationState.WAITING_FOR_RESPONSE;
Console.log("Resynchronizing from server");
extraJson.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
}
if (showLoadingIndicator) {
ConnectionIndicator.setState(ConnectionIndicator.LOADING);
}
Expand Down Expand Up @@ -218,10 +231,9 @@ public String getCommunicationMethodName() {
* state from the server
*/
public void resynchronize() {
Console.log("Resynchronizing from server");
JsonObject resyncParam = Json.createObject();
resyncParam.put(ApplicationConstants.RESYNCHRONIZE_ID, true);
send(Json.createArray(), resyncParam);
Console.log("Resynchronize from server requested");
resynchronizationState = ResynchronizationState.SEND_TO_SERVER;
sendInvocationsToServer();
}

/**
Expand Down Expand Up @@ -263,4 +275,12 @@ public void setClientToServerMessageId(int nextExpectedId, boolean force) {
// Do nothing as they will arrive eventually
}
}

void setResynchronizationState(ResynchronizationState state) {
resynchronizationState = state;
}

ResynchronizationState getResynchronizationState() {
return resynchronizationState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.web.bindery.event.shared.EventBus;
import com.google.web.bindery.event.shared.HandlerRegistration;

import com.vaadin.client.communication.MessageSender.ResynchronizationState;
import com.vaadin.client.ConnectionIndicator;
import com.vaadin.client.Registry;
import com.vaadin.client.gwt.com.google.web.bindery.event.shared.SimpleEventBus;
Expand Down Expand Up @@ -108,8 +109,10 @@ public void endRequest() {
// the call.
hasActiveRequest = false;

if (registry.getUILifecycle().isRunning()
&& registry.getServerRpcQueue().isFlushPending()) {
if ((registry.getUILifecycle().isRunning()
&& registry.getServerRpcQueue().isFlushPending())
|| registry.getMessageSender()
.getResynchronizationState() == ResynchronizationState.SEND_TO_SERVER) {
mshabarov marked this conversation as resolved.
Show resolved Hide resolved
// Send the pending RPCs immediately.
// This might be an unnecessary optimization as ServerRpcQueue has a
// finally scheduled command which trigger the send if we do not do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.vaadin.client.UILifecycle.UIState;
import com.vaadin.client.ValueMap;
import com.vaadin.client.communication.MessageHandler;
import com.vaadin.client.communication.MessageSender;
import com.vaadin.client.communication.RequestResponseTracker;
import com.vaadin.client.communication.ServerRpcQueue;

Expand All @@ -29,6 +30,7 @@ protected void gwtSetUp() throws Exception {
set(RequestResponseTracker.class,
new RequestResponseTracker(this));
set(MessageHandler.class, new MessageHandler(this));
set(MessageSender.class, new MessageSender(this));
set(ServerRpcQueue.class, new ServerRpcQueue(this));
set(DependencyLoader.class, new DependencyLoader(this));
set(ResourceLoader.class, new ResourceLoader(this, false));
Expand Down