getAll(DataStoreTypes.Category category) {
- return null;
+ return ImmutableMap.of();
}
@Override
diff --git a/src/main/java/co/featureflags/server/Status.java b/src/main/java/co/featureflags/server/Status.java
index 8f8f5a2..0ed608f 100644
--- a/src/main/java/co/featureflags/server/Status.java
+++ b/src/main/java/co/featureflags/server/Status.java
@@ -20,13 +20,14 @@ public abstract class Status {
public static final String RUNTIME_ERROR = "Runtime error";
public static final String UNKNOWN_ERROR = "Unknown error";
public static final String UNKNOWN_CLOSE_CODE = "Unknown close code";
+ public static final String WEBSOCKET_ERROR = "WebSocket error";
/**
* possible values for {@link co.featureflags.server.exterior.UpdateProcessor}
*/
public enum StateType {
/**
- * The initial state of the data source when the SDK is being initialized.
+ * The initial state of the update processing when the SDK is being initialized.
*
* If it encounters an error that requires it to retry initialization, the state will remain at
* {@link #INITIALIZING} until it either succeeds and becomes {@link #OK}, or permanently fails and
@@ -215,15 +216,14 @@ public interface DataUpdator {
*
* If {@code newState} is different from the previous state, and/or {@code newError} is non-null, the
* SDK will start returning the new status (adding a timestamp for the change) from
- * {@link DataUpdateStatusProvider#getState()}, and will trigger status change events to any
- * registered listeners.
+ * {@link DataUpdateStatusProvider#getState()}.
*
* A special case is that if {@code newState} is {@link StateType#INTERRUPTED},
* but the previous state was {@link StateType#INITIALIZING}, the state will remain at {@link StateType#INITIALIZING}
* because {@link StateType#INTERRUPTED} is only meaningful after a successful startup.
*
- * @param newState the data storage state
- * @param message the data source state
+ * @param newState the new state of {@link co.featureflags.server.exterior.UpdateProcessor}
+ * @param message an error message or null
*/
void updateStatus(StateType newState, ErrorInfo message);
@@ -298,14 +298,15 @@ public void updateStatus(StateType newState, ErrorInfo message) {
}
synchronized (lockObject) {
StateType oldOne = currentState.getStateType();
+ StateType newState1 = newState;
// interruped state is only meaningful after initialization
- if (newState == StateType.INTERRUPTED && oldOne == StateType.INITIALIZING) {
- newState = StateType.INITIALIZING;
+ if (newState1 == StateType.INTERRUPTED && oldOne == StateType.INITIALIZING) {
+ newState1 = StateType.INITIALIZING;
}
- if (newState != oldOne || message != null) {
- Instant stateSince = newState == oldOne ? currentState.getStateSince() : Instant.now();
- currentState = new State(newState, stateSince, message);
+ if (newState1 != oldOne || message != null) {
+ Instant stateSince = newState1 == oldOne ? currentState.getStateSince() : Instant.now();
+ currentState = new State(newState1, stateSince, message);
lockObject.notifyAll();
}
}
@@ -374,8 +375,7 @@ public interface DataUpdateStatusProvider {
* whenever they successfully initialize, encounter an error, or recover after an error.
*
* For a custom implementation, it is the responsibility of the data source to report its status via {@link DataUpdator};
- * if it does not do so, the status will always be reported as
- * {@link StateType#INITIALIZING}.
+ * if it does not do so, the status will always be reported as {@link StateType#INITIALIZING}.
*
* @return the latest status; will never be null
*/
diff --git a/src/main/java/co/featureflags/server/Streaming.java b/src/main/java/co/featureflags/server/Streaming.java
index b7d522b..7e3d270 100644
--- a/src/main/java/co/featureflags/server/Streaming.java
+++ b/src/main/java/co/featureflags/server/Streaming.java
@@ -21,12 +21,12 @@
import org.slf4j.Logger;
import java.io.EOFException;
+import java.io.IOException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -42,6 +42,7 @@
import static co.featureflags.server.Status.RUNTIME_ERROR;
import static co.featureflags.server.Status.UNKNOWN_CLOSE_CODE;
import static co.featureflags.server.Status.UNKNOWN_ERROR;
+import static co.featureflags.server.Status.WEBSOCKET_ERROR;
final class Streaming implements UpdateProcessor {
@@ -176,43 +177,40 @@ private OkHttpClient buildWebOkHttpClient() {
return builder.build();
}
- private Callable processDateAsync(final DataModel.Data data) {
- return () -> {
- boolean opOK = false;
- String eventType = data.getEventType();
- Long version = data.getTimestamp();
- Map> updatedData = data.toStorageType();
- if (FULL_OPS.equalsIgnoreCase(eventType)) {
- boolean fullOK = updator.init(updatedData, version);
- opOK = fullOK;
- } else if (PATCH_OPS.equalsIgnoreCase(eventType)) {
- // streaming patch is a real time update
- // patch data contains only one item in just one category.
- // no data update is considered as a good operation
- boolean patchOK = true;
- for (Map.Entry> entry : updatedData.entrySet()) {
- DataStoreTypes.Category category = entry.getKey();
- for (Map.Entry keyItem : entry.getValue().entrySet()) {
- patchOK = updator.upsert(category, keyItem.getKey(), keyItem.getValue(), version);
- }
+ private Boolean processDateAsync(final DataModel.Data data) {
+ boolean opOK = false;
+ String eventType = data.getEventType();
+ Long version = data.getTimestamp();
+ Map> updatedData = data.toStorageType();
+ if (FULL_OPS.equalsIgnoreCase(eventType)) {
+ boolean fullOK = updator.init(updatedData, version);
+ opOK = fullOK;
+ } else if (PATCH_OPS.equalsIgnoreCase(eventType)) {
+ // streaming patch is a real time update
+ // patch data contains only one item in just one category.
+ // no data update is considered as a good operation
+ boolean patchOK = true;
+ for (Map.Entry> entry : updatedData.entrySet()) {
+ DataStoreTypes.Category category = entry.getKey();
+ for (Map.Entry keyItem : entry.getValue().entrySet()) {
+ patchOK = updator.upsert(category, keyItem.getKey(), keyItem.getValue(), version);
}
- opOK = patchOK;
}
- if (opOK) {
- if (initialized.compareAndSet(false, true)) {
- initFuture.complete(true);
- }
- logger.info("processing data is well done");
- updator.updateStatus(Status.StateType.OK, null);
- } else {
- // reconnect to server to get back data after data storage failed
- // the reason is gathered by DataUpdator
- // close code 1001 means peer going away
- webSocket.close(GOING_AWAY_CLOSE, JUST_RECONN_REASON_REGISTERED);
+ opOK = patchOK;
+ }
+ if (opOK) {
+ if (initialized.compareAndSet(false, true)) {
+ initFuture.complete(true);
}
- permits.release();
- return opOK;
- };
+ logger.info("processing data is well done");
+ updator.updateStatus(Status.StateType.OK, null);
+ } else {
+ // reconnect to server to get back data after data storage failed
+ // the reason is gathered by DataUpdator
+ // close code 1001 means peer going away
+ webSocket.close(GOING_AWAY_CLOSE, JUST_RECONN_REASON_REGISTERED);
+ }
+ return opOK;
}
private final class DefaultWebSocketListener extends StreamingWebSocketListener {
@@ -225,7 +223,9 @@ public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
if (all.isProcessData()) {
try {
permits.acquire();
- storageUpdateExecutor.submit(processDateAsync(all.data()));
+ CompletableFuture
+ .supplyAsync(() -> processDateAsync(all.data()), storageUpdateExecutor)
+ .whenComplete((res, exception) -> permits.release());
} catch (InterruptedException ignore) {
}
}
@@ -304,7 +304,10 @@ public final void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t,
}
}
}
- if (errorType == null) {
+ if(!isReconn && t instanceof IOException){
+ errorType = WEBSOCKET_ERROR;
+ }
+ else if (errorType == null) {
errorType = UNKNOWN_ERROR;
}
}
diff --git a/src/main/java/co/featureflags/server/exterior/DataStoreTypes.java b/src/main/java/co/featureflags/server/exterior/DataStoreTypes.java
index 262bd82..5b930d4 100644
--- a/src/main/java/co/featureflags/server/exterior/DataStoreTypes.java
+++ b/src/main/java/co/featureflags/server/exterior/DataStoreTypes.java
@@ -27,6 +27,10 @@ public abstract class DataStoreTypes {
"/api/public/sdk/latest-feature-flags",
"/streaming");
+ public final static Category SEGMENTS = new Category("segments",
+ "/api/public/sdk/latest-feature-flags",
+ "/streaming");
+
/**
* An enumeration of all supported {@link Category} types.
*
diff --git a/src/test/java/co/featureflags/server/Demos.java b/src/test/java/co/featureflags/server/Demos.java
index 63616ad..520adce 100644
--- a/src/test/java/co/featureflags/server/Demos.java
+++ b/src/test/java/co/featureflags/server/Demos.java
@@ -56,7 +56,7 @@ public static void main(String[] args) throws IOException {
}
try {
String[] words = line.split("/");
- user = new FFCUser.Builder(words[0]).build();
+ user = new FFCUser.Builder(words[0]).userName(words[0]).build();
Instant start = Instant.now();
FlagState res = client.variationDetail(words[1], user, "Not Found");
@@ -109,7 +109,7 @@ public static void main(String[] args) throws InterruptedException, IOException
}
try {
String[] words = line.split("/");
- user = new FFCUser.Builder(words[0]).build();
+ user = new FFCUser.Builder(words[0]).userName(words[0]).build();
Instant start = Instant.now();
FlagState res = client.variationDetail(words[1], user, "Not Found");
Instant end = Instant.now();
@@ -147,7 +147,7 @@ public static void main(String[] args) throws IOException {
}
try {
String[] words = line.split("/");
- user = new FFCUser.Builder(words[0]).build();
+ user = new FFCUser.Builder(words[0]).userName(words[0]).build();
VariationParams params = VariationParams.of(words[1], user);
String jsonBody = params.jsonfy();
System.out.println(jsonBody);
@@ -182,7 +182,7 @@ public static void main(String[] args) throws IOException {
break;
}
try {
- user = new FFCUser.Builder(userkey).build();
+ user = new FFCUser.Builder(userkey).userName(userkey).build();
VariationParams params = VariationParams.of(null, user);
String jsonBody = params.jsonfy();
System.out.println(jsonBody);