From 654ea58406cc7d44f0af0c9cc18c3697c0d53e15 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Wed, 9 Nov 2016 10:57:19 -0800 Subject: [PATCH 1/5] Automatically reconnect to the stream if a heartbeat is not received in time --- build.gradle | 2 +- .../launchdarkly/client/StreamProcessor.java | 45 ++++++++++++++++++- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index 4b78d6980..37bf67a31 100644 --- a/build.gradle +++ b/build.gradle @@ -32,7 +32,7 @@ dependencies { compile "com.google.guava:guava:19.0" compile "joda-time:joda-time:2.9.3" compile "org.slf4j:slf4j-api:1.7.21" - compile group: "com.launchdarkly", name: "okhttp-eventsource", version: "0.2.3", changing: true + compile group: "com.launchdarkly", name: "okhttp-eventsource", version: "1.0.0-SNAPSHOT", changing: true compile "redis.clients:jedis:2.8.1" testCompile "org.easymock:easymock:3.4" testCompile 'junit:junit:4.12' diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java index bea59696a..690bdc5e4 100644 --- a/src/main/java/com/launchdarkly/client/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -4,13 +4,18 @@ import com.launchdarkly.eventsource.EventHandler; import com.launchdarkly.eventsource.EventSource; import com.launchdarkly.eventsource.MessageEvent; +import com.launchdarkly.eventsource.ReadyState; import okhttp3.Headers; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; class StreamProcessor implements UpdateProcessor { @@ -20,12 +25,15 @@ class StreamProcessor implements UpdateProcessor { private static final String INDIRECT_PUT = "indirect/put"; private static final String INDIRECT_PATCH = "indirect/patch"; private static final Logger logger = LoggerFactory.getLogger(StreamProcessor.class); + private static final int DEAD_CONNECTION_INTERVAL_SECONDS = 300; private final FeatureStore store; private final LDConfig config; private final String sdkKey; private final FeatureRequestor requestor; - private EventSource es; + private final ScheduledExecutorService heartbeatDetectorService; + private DateTime lastHeartbeat; + private volatile EventSource es; private AtomicBoolean initialized = new AtomicBoolean(false); @@ -34,6 +42,8 @@ class StreamProcessor implements UpdateProcessor { this.config = config; this.sdkKey = sdkKey; this.requestor = requestor; + this.heartbeatDetectorService = Executors.newScheduledThreadPool(1); + heartbeatDetectorService.scheduleAtFixedRate(new HeartbeatDetector(), 1, 1, TimeUnit.MINUTES); } @Override @@ -50,7 +60,7 @@ public Future start() { @Override public void onOpen() throws Exception { - + lastHeartbeat = DateTime.now(); } @Override @@ -100,6 +110,12 @@ public void onMessage(String name, MessageEvent event) throws Exception { } } + @Override + public void onComment(String comment) { + logger.debug("Received a heartbeat"); + lastHeartbeat = DateTime.now(); + } + @Override public void onError(Throwable throwable) { logger.error("Encountered EventSource error: " + throwable.getMessage()); @@ -125,6 +141,14 @@ public void close() throws IOException { if (store != null) { store.close(); } + if (heartbeatDetectorService != null) { + heartbeatDetectorService.shutdownNow(); + try { + heartbeatDetectorService.awaitTermination(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.error("Encountered an exception terminating heartbeat detector: " + e.getMessage()); + } + } } @Override @@ -171,4 +195,21 @@ int version() { } } + + private final class HeartbeatDetector implements Runnable { + + @Override + public void run() { + DateTime fiveMinutesAgo = DateTime.now().minusSeconds(DEAD_CONNECTION_INTERVAL_SECONDS); + if (lastHeartbeat.isBefore(fiveMinutesAgo) && es.getState() == ReadyState.OPEN) { + try { + logger.info("Stream stopped receiving heartbeats- reconnecting."); + es.close(); + start(); + } catch (IOException e) { + logger.warn("Encountered exception closing stream connection: " + e.getMessage()); + } + } + } + } } From 3a35a773189e685d7455ec832cc7b61cf66bbbf5 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Wed, 9 Nov 2016 11:32:38 -0800 Subject: [PATCH 2/5] Bump to a non-snapshot release. Name the heartbeat thread. --- build.gradle | 2 +- .../launchdarkly/client/StreamProcessor.java | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/build.gradle b/build.gradle index 37bf67a31..57d04406b 100644 --- a/build.gradle +++ b/build.gradle @@ -32,7 +32,7 @@ dependencies { compile "com.google.guava:guava:19.0" compile "joda-time:joda-time:2.9.3" compile "org.slf4j:slf4j-api:1.7.21" - compile group: "com.launchdarkly", name: "okhttp-eventsource", version: "1.0.0-SNAPSHOT", changing: true + compile group: "com.launchdarkly", name: "okhttp-eventsource", version: "1.0.0", changing: true compile "redis.clients:jedis:2.8.1" testCompile "org.easymock:easymock:3.4" testCompile 'junit:junit:4.12' diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java index 690bdc5e4..4aa797c77 100644 --- a/src/main/java/com/launchdarkly/client/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -1,5 +1,6 @@ package com.launchdarkly.client; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.Gson; import com.launchdarkly.eventsource.EventHandler; import com.launchdarkly.eventsource.EventSource; @@ -12,10 +13,7 @@ import java.io.IOException; import java.net.URI; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; class StreamProcessor implements UpdateProcessor { @@ -32,7 +30,7 @@ class StreamProcessor implements UpdateProcessor { private final String sdkKey; private final FeatureRequestor requestor; private final ScheduledExecutorService heartbeatDetectorService; - private DateTime lastHeartbeat; + private volatile DateTime lastHeartbeat; private volatile EventSource es; private AtomicBoolean initialized = new AtomicBoolean(false); @@ -42,7 +40,10 @@ class StreamProcessor implements UpdateProcessor { this.config = config; this.sdkKey = sdkKey; this.requestor = requestor; - this.heartbeatDetectorService = Executors.newScheduledThreadPool(1); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("LaunchDarkly-HeartbeatDetector-%d") + .build(); + this.heartbeatDetectorService = Executors.newSingleThreadScheduledExecutor(threadFactory); heartbeatDetectorService.scheduleAtFixedRate(new HeartbeatDetector(), 1, 1, TimeUnit.MINUTES); } @@ -200,8 +201,10 @@ private final class HeartbeatDetector implements Runnable { @Override public void run() { - DateTime fiveMinutesAgo = DateTime.now().minusSeconds(DEAD_CONNECTION_INTERVAL_SECONDS); - if (lastHeartbeat.isBefore(fiveMinutesAgo) && es.getState() == ReadyState.OPEN) { + DateTime reconnectThresholdTime = DateTime.now().minusSeconds(DEAD_CONNECTION_INTERVAL_SECONDS); + // We only want to force the reconnect if the ES connection is open. If not, it's already trying to + // connect anyway, or this processor was shut down + if (lastHeartbeat.isBefore(reconnectThresholdTime) && es.getState() == ReadyState.OPEN) { try { logger.info("Stream stopped receiving heartbeats- reconnecting."); es.close(); From c195b64e1d5618361d051757b65ee5ebff9dfc14 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Wed, 9 Nov 2016 12:16:54 -0800 Subject: [PATCH 3/5] Start the new ES connection in a finally block --- src/main/java/com/launchdarkly/client/StreamProcessor.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java index 4aa797c77..2efbf26a6 100644 --- a/src/main/java/com/launchdarkly/client/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -211,6 +211,12 @@ public void run() { start(); } catch (IOException e) { logger.warn("Encountered exception closing stream connection: " + e.getMessage()); + } finally { + if (es.getState() == ReadyState.SHUTDOWN) { + start(); + } else { + logger.warn("Expected ES to be in state SHUTDOWN, but it's currently in state " + es.getState().toString()); + } } } } From 78084f6161bc90967059f6ccdf96c531f39c7ed4 Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Wed, 9 Nov 2016 14:17:55 -0800 Subject: [PATCH 4/5] Log at error level, set the last heartbeat whenever a message is received --- CHANGELOG.md | 4 ++++ build.gradle | 2 +- src/main/java/com/launchdarkly/client/StreamProcessor.java | 7 +++---- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9eb4ca16f..4e7a493b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to the LaunchDarkly Java SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). +## [2.0.5] - 2016-11-09 +### Changed +- The StreamProcessor now listens for heartbeats from the streaming API, and will automatically reconnect if heartbeats are not received. + ## [2.0.4] - 2016-10-12 ### Changed - Updated GSON dependency version to 2.7 diff --git a/build.gradle b/build.gradle index 57d04406b..6dde6d1e5 100644 --- a/build.gradle +++ b/build.gradle @@ -19,7 +19,7 @@ repositories { allprojects { group = 'com.launchdarkly' - version = "2.0.4" + version = "2.0.5" sourceCompatibility = 1.7 targetCompatibility = 1.7 } diff --git a/src/main/java/com/launchdarkly/client/StreamProcessor.java b/src/main/java/com/launchdarkly/client/StreamProcessor.java index 2efbf26a6..20c7d58be 100644 --- a/src/main/java/com/launchdarkly/client/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/client/StreamProcessor.java @@ -61,11 +61,11 @@ public Future start() { @Override public void onOpen() throws Exception { - lastHeartbeat = DateTime.now(); } @Override public void onMessage(String name, MessageEvent event) throws Exception { + lastHeartbeat = DateTime.now(); Gson gson = new Gson(); switch (name) { case PUT: @@ -208,14 +208,13 @@ public void run() { try { logger.info("Stream stopped receiving heartbeats- reconnecting."); es.close(); - start(); } catch (IOException e) { - logger.warn("Encountered exception closing stream connection: " + e.getMessage()); + logger.error("Encountered exception closing stream connection: " + e.getMessage()); } finally { if (es.getState() == ReadyState.SHUTDOWN) { start(); } else { - logger.warn("Expected ES to be in state SHUTDOWN, but it's currently in state " + es.getState().toString()); + logger.error("Expected ES to be in state SHUTDOWN, but it's currently in state " + es.getState().toString()); } } } From 3b8098caaa3bed1f618d5b567af7d5be1fc8a62c Mon Sep 17 00:00:00 2001 From: John Kodumal Date: Wed, 9 Nov 2016 14:59:02 -0800 Subject: [PATCH 5/5] Bump version to snapshot --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 6dde6d1e5..c0521cead 100644 --- a/build.gradle +++ b/build.gradle @@ -19,7 +19,7 @@ repositories { allprojects { group = 'com.launchdarkly' - version = "2.0.5" + version = "2.0.5-SNAPSHOT" sourceCompatibility = 1.7 targetCompatibility = 1.7 }