Skip to content

Commit

Permalink
feat: Add async mode to flush operation and API to turn it on (#12378)
Browse files Browse the repository at this point in the history
In async mode data fetching is wrapped in future.

Fixes #1066
Fixes #10709
Fixes #12342
  • Loading branch information
TatuLund authored Jan 25, 2022
1 parent 32fe6c9 commit 03efa05
Show file tree
Hide file tree
Showing 2 changed files with 366 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand All @@ -33,6 +35,7 @@

import com.vaadin.flow.component.Component;
import com.vaadin.flow.component.ComponentUtil;
import com.vaadin.flow.component.UI;
import com.vaadin.flow.data.provider.ArrayUpdater.Update;
import com.vaadin.flow.data.provider.DataChangeEvent.DataRefreshEvent;
import com.vaadin.flow.dom.Element;
Expand All @@ -44,6 +47,7 @@
import com.vaadin.flow.internal.Range;
import com.vaadin.flow.internal.StateNode;
import com.vaadin.flow.shared.Registration;
import com.vaadin.flow.shared.communication.PushMode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -126,6 +130,10 @@ public class DataCommunicator<T> implements Serializable {

private boolean fetchEnabled;

private transient Executor executor = null;
private transient CompletableFuture<Activation> future;
private UI ui;

/**
* In-memory data provider with no items.
* <p>
Expand Down Expand Up @@ -334,6 +342,26 @@ public void setRequestedRange(int start, int length) {
requestFlush();
}

/**
* Control whether DataCommunicator should push data updates to the
* component asynchronously or not. By default the executor service is not
* defined and updates are done synchronously. Setting to null will disable
* the feature.
* <p>
* Note: This works only with Grid component. If set to true, Push needs to
* be enabled and set to PushMode.AUTOMATIC in order this to work.
*
* @param executor
* The Executor used for async updates.
*/
public void enablePushUpdates(Executor executor) {
if (this.executor != null && future != null) {
future.cancel(true);
future = null;
}
this.executor = executor;
}

/**
* Resets all the data.
* <p>
Expand Down Expand Up @@ -1026,6 +1054,7 @@ private String getInvalidContractMessage(String method) {
}

private void handleAttach() {
ui = UI.getCurrent();
if (dataProviderUpdateRegistration != null) {
dataProviderUpdateRegistration.remove();
}
Expand All @@ -1048,6 +1077,11 @@ protected void handleDataRefreshEvent(DataRefreshEvent<T> event) {
}

private void handleDetach() {
ui = null;
if (future != null) {
future.cancel(true);
future = null;
}
dataGenerator.destroyAllData();
if (dataProviderUpdateRegistration != null) {
dataProviderUpdateRegistration.remove();
Expand Down Expand Up @@ -1108,9 +1142,40 @@ private void flush() {
resendEntireRange |= !(previousActive.intersects(effectiveRequested)
|| (previousActive.isEmpty() && effectiveRequested.isEmpty()));

Activation activation = collectKeysToFlush(previousActive,
effectiveRequested);
if (executor != null) {
// In async mode wrap fetching data in future, collectKeysToFlush
// will perform fetch from data provider with given range.
if (ui.getPushConfiguration().getPushMode() != PushMode.AUTOMATIC) {
throw new IllegalStateException(
"Asynchronous DataCommunicator updates require Push to be enabled and PushMode.AUTOMATIC");
}
if (future != null) {
future.cancel(true);
}
future = CompletableFuture
.supplyAsync(() -> collectKeysToFlush(previousActive,
effectiveRequested), executor);
future.thenAccept(activation -> {
if (ui == null) {
return;
}
ui.access(() -> {
performUpdate(oldActive, effectiveRequested, previousActive,
activation);
});
});
} else {

Activation activation = collectKeysToFlush(previousActive,
effectiveRequested);

performUpdate(oldActive, effectiveRequested, previousActive,
activation);
}
}

private void performUpdate(Set<String> oldActive, Range effectiveRequested,
final Range previousActive, Activation activation) {
// In case received less items than what was expected, adjust size
if (activation.isSizeRecheckNeeded()) {
if (definedSize) {
Expand All @@ -1132,7 +1197,8 @@ private void flush() {
if (assumedSize != 0 && activation.getActiveKeys().isEmpty()) {
int delta = requestedRange.length();
// Request the items from a bit behind the current range
// at the next call to backend, and check that the requested
// at the next call to backend, and check that the
// requested
// range doesn't intersect the 0 point.
requestedRange = requestedRange.offsetBy(-delta)
.restrictTo(Range.withLength(0, assumedSize));
Expand Down
Loading

0 comments on commit 03efa05

Please sign in to comment.