Skip to content

Commit

Permalink
streaming ping/pong
Browse files Browse the repository at this point in the history
  • Loading branch information
dsun0720 committed Apr 3, 2022
1 parent 47e0d4b commit 9bf9a65
Show file tree
Hide file tree
Showing 15 changed files with 71 additions and 52 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ install the sdk in using maven
<dependency>
<groupId>co.featureflags</groupId>
<artifactId>ffc-java-server-sdk</artifactId>
<version>1.0</version>
<version>1.1</version>
</dependency>
</dependencies>
```
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cd4325508fba265bcca897b2ecf02cd9
14b3f42fc5a5bc7011ccb7c88bee94d2
Original file line number Diff line number Diff line change
@@ -1 +1 @@
56499b7743a6330c030983eaccb1f5731756f321
41f38cf2808ef2a762dcd09d4026196ffce62802
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
737916117a1e0c2be85e03d7bdcce9c9
acf599d3b1810459d28ec446027a5818
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3ca9f22d3b6261e495e32678aa2c28604b2696ee
66965db3239416abb77b556e67fd01fa60544d97
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5a818d4220f18200fc7847c8b915144d
d0ead4f334f2397cf6cb4d6d813224ab
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4a4451d87f384b7ee837dece4b7e988916d95ac3
bcac1d0ded04c49e4cf58d90ebe46f3f56cd59db
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@
<version>1.0</version>
<version>1.1</version>
</versions>
<lastUpdated>20220403073452</lastUpdated>
<lastUpdated>20220403195457</lastUpdated>
</versioning>
</metadata>
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4e6a6be06d9a7109a68092cbdf6b3ddc
b6971a0f19dcb7757044eb8c792e3fa0
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2c2e31468977831121b027badda56bc07ca9218f
0e6dbc484637f3ba6e79b10a81e4f9e1af37ba99
52 changes: 27 additions & 25 deletions src/main/java/co/featureflags/server/DataModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,42 +91,53 @@ public Integer getType() {
}
}

static class DataSyncMessage {
final String messageType = "data-sync";
static class StreamingMessage {
static final String DATA_SYNC = "data-sync";
static final String PING = "ping";
static final String PONG = "pong";

protected final String messageType;

StreamingMessage(String messageType) {
this.messageType = messageType;
}

public String getMessageType() {
return messageType;
}
}

static class DataSyncMessage extends StreamingMessage {
final InternalData data;

DataSyncMessage(Long timestamp) {
this.data = new InternalData(timestamp);
super(timestamp == null ? PING : DATA_SYNC);
this.data = timestamp == null ? null : new InternalData(timestamp);
}

private static class InternalData {
static class InternalData {
Long timestamp;

private InternalData(Long timestamp) {
InternalData(Long timestamp) {
this.timestamp = timestamp;
}
}
}

static class All {
private final String messageType;
static class All extends StreamingMessage {
private final Data data;

All(String messageType, Data data) {
this.messageType = messageType;
super(messageType);
this.data = data;
}

public Data data() {
return data;
}

public String getMessageType() {
return messageType;
}

boolean isProcessData() {
return "data-sync".equalsIgnoreCase(messageType) && data != null && ("full".equalsIgnoreCase(data.eventType) || "patch".equalsIgnoreCase(data.eventType));
return DATA_SYNC.equalsIgnoreCase(messageType) && data != null && ("full".equalsIgnoreCase(data.eventType) || "patch".equalsIgnoreCase(data.eventType));
}
}

Expand All @@ -142,9 +153,7 @@ static class Data implements JsonHelper.AfterJsonParseDeserializable {
private final List<TimestampUserTag> userTags;
private Long timestamp;

Data(String eventType, List<FeatureFlag> featureFlags,
List<Segment> segments,
List<TimestampUserTag> userTags) {
Data(String eventType, List<FeatureFlag> featureFlags, List<Segment> segments, List<TimestampUserTag> userTags) {
this.eventType = eventType;
this.featureFlags = featureFlags;
this.segments = segments;
Expand Down Expand Up @@ -196,9 +205,7 @@ Map<DataStoreTypes.Category, Map<String, DataStoreTypes.Item>> toStorageType() {
TimestampData data = userTag.isArchived ? userTag.toArchivedTimestampData() : userTag;
userTags.put(data.getId(), new DataStoreTypes.Item(data));
}
return ImmutableMap.of(DataStoreTypes.FEATURES, flags.build(),
DataStoreTypes.SEGMENTS, segments.build(),
DataStoreTypes.USERTAGS, userTags.build());
return ImmutableMap.of(DataStoreTypes.FEATURES, flags.build(), DataStoreTypes.SEGMENTS, segments.build(), DataStoreTypes.USERTAGS, userTags.build());
}
}

Expand All @@ -210,12 +217,7 @@ static class TimestampUserTag extends UserTag implements TimestampData {

private final Long timestamp;

TimestampUserTag(String id,
Boolean isArchived,
Long timestamp,
String requestProperty,
String source,
String userProperty) {
TimestampUserTag(String id, Boolean isArchived, Long timestamp, String requestProperty, String source, String userProperty) {
super(requestProperty, source, userProperty);
this.id = id;
this.isArchived = isArchived;
Expand Down
51 changes: 34 additions & 17 deletions src/main/java/co/featureflags/server/Streaming.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static co.featureflags.server.DataModel.StreamingMessage.DATA_SYNC;
import static co.featureflags.server.Status.DATA_INVALID_ERROR;
import static co.featureflags.server.Status.NETWORK_ERROR;
import static co.featureflags.server.Status.REQUEST_INVALID_ERROR;
Expand All @@ -56,7 +58,7 @@ final class Streaming implements UpdateProcessor {
private static final Integer GOING_AWAY_CLOSE = 1001;
private static final String JUST_RECONN_REASON_REGISTERED = "reconn";
private static final int MAX_QUEUE_SIZE = 20;
private static final Duration PING_INTERVAL = Duration.ofSeconds(20);
private static final Duration PING_INTERVAL = Duration.ofSeconds(10);
private static final Duration AWAIT_TERMINATION = Duration.ofSeconds(2);
private static final String DEFAULT_STREAMING_PATH = "/streaming";
private static final String AUTH_PARAMS = "?token=%s&type=server&version=2";
Expand All @@ -71,6 +73,7 @@ final class Streaming implements UpdateProcessor {
private final CompletableFuture<Boolean> initFuture = new CompletableFuture<>();
private final StreamingWebSocketListener listener = new DefaultWebSocketListener();
private final ThreadPoolExecutor storageUpdateExecutor;
private final ScheduledThreadPoolExecutor pingScheduledExecutor;
private final Status.DataUpdator updator;
private final BasicConfig basicConfig;
private final HttpConfig httpConfig;
Expand Down Expand Up @@ -100,8 +103,10 @@ final class Streaming implements UpdateProcessor {
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
Utils.createThreadFactory("data-sync-worker-%d", true),
Utils.createThreadFactory("streaming-data-sync-worker-%d", true),
new ThreadPoolExecutor.CallerRunsPolicy());
this.pingScheduledExecutor = new ScheduledThreadPoolExecutor(1,
Utils.createThreadFactory("streaming-periodic-ping-worker-%d", true));
}

@Override
Expand All @@ -111,6 +116,7 @@ public Future<Boolean> start() {
connCount.set(0);
isWSConnected.set(false);
connect();
pingScheduledExecutor.scheduleAtFixedRate(this::ping, 0L, PING_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
return initFuture;
}

Expand All @@ -127,9 +133,18 @@ public void close() {
}
}

private void ping() {
if (webSocket != null) {
// logger.debug("ping");
String json = JsonHelper.serialize(new DataModel.DataSyncMessage(null));
webSocket.send(json);
}
}

private void clearExecutor() {
Loggers.UPDATE_PROCESSOR.debug("streaming processor clean up thread and conn pool");
Utils.shutDownThreadPool("Streaming", storageUpdateExecutor, AWAIT_TERMINATION);
Utils.shutDownThreadPool("streaming-data-sync-worker", storageUpdateExecutor, AWAIT_TERMINATION);
Utils.shutDownThreadPool("streaming-periodic-ping-worker", pingScheduledExecutor, AWAIT_TERMINATION);
Utils.shutdownOKHttpClient("Streaming", okHttpClient);
}

Expand Down Expand Up @@ -172,7 +187,7 @@ private void reconnect(boolean forceToUseMaxRetryDelay) {
@NotNull
private OkHttpClient buildWebOkHttpClient() {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
builder.connectTimeout(httpConfig.connectTime()).pingInterval(PING_INTERVAL).retryOnConnectionFailure(false);
builder.connectTimeout(httpConfig.connectTime()).pingInterval(Duration.ZERO).retryOnConnectionFailure(false);
Utils.buildProxyAndSocketFactoryFor(builder, httpConfig);
return builder.build();
}
Expand Down Expand Up @@ -218,16 +233,19 @@ private final class DefaultWebSocketListener extends StreamingWebSocketListener
// if received data is invalid
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
logger.info("Streaming WebSocket is processing data");
logger.debug(text);
DataModel.All all = JsonHelper.deserialize(text, DataModel.All.class);
if (all.isProcessData()) {
try {
permits.acquire();
CompletableFuture
.supplyAsync(() -> processDateAsync(all.data()), storageUpdateExecutor)
.whenComplete((res, exception) -> permits.release());
} catch (InterruptedException ignore) {
// logger.debug(text);
DataModel.StreamingMessage message = JsonHelper.deserialize(text, DataModel.StreamingMessage.class);
if (DATA_SYNC.equalsIgnoreCase(message.getMessageType())) {
logger.info("Streaming WebSocket is processing data");
DataModel.All all = JsonHelper.deserialize(text, DataModel.All.class);
if (all.isProcessData()) {
try {
permits.acquire();
CompletableFuture
.supplyAsync(() -> processDateAsync(all.data()), storageUpdateExecutor)
.whenComplete((res, exception) -> permits.release());
} catch (InterruptedException ignore) {
}
}
}
}
Expand Down Expand Up @@ -305,10 +323,9 @@ public final void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t,
}
}
}
if(!isReconn && t instanceof IOException){
if (!isReconn && t instanceof IOException) {
errorType = WEBSOCKET_ERROR;
}
else if (errorType == null) {
} else if (errorType == null) {
errorType = UNKNOWN_ERROR;
}
}
Expand Down

0 comments on commit 9bf9a65

Please sign in to comment.