diff --git a/build.gradle b/build.gradle index acac74318..3fdc6704a 100644 --- a/build.gradle +++ b/build.gradle @@ -75,7 +75,7 @@ ext.versions = [ "launchdarklyJavaSdkCommon": "1.3.0", "launchdarklyLogging": "1.1.0", "okhttp": "4.9.3", // specify this for the SDK build instead of relying on the transitive dependency from okhttp-eventsource - "okhttpEventsource": "2.6.2", + "okhttpEventsource": "4.0.0", "slf4j": "1.7.21", "snakeyaml": "1.32", "jedis": "2.9.0" diff --git a/contract-tests/service/build.gradle b/contract-tests/service/build.gradle index 634428f64..5793f10be 100644 --- a/contract-tests/service/build.gradle +++ b/contract-tests/service/build.gradle @@ -6,9 +6,15 @@ plugins { repositories { mavenCentral() + mavenLocal() maven { url "https://oss.sonatype.org/content/groups/public/" } } +configurations.all { + // check for updates every build for dependencies with: 'changing: true' + resolutionStrategy.cacheChangingModulesFor 0, 'seconds' +} + allprojects { sourceCompatibility = 1.8 targetCompatibility = 1.8 diff --git a/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java b/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java index a5b731784..030859965 100644 --- a/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java @@ -3,12 +3,18 @@ import com.google.common.annotations.VisibleForTesting; import com.google.gson.JsonParseException; import com.google.gson.stream.JsonReader; -import com.launchdarkly.eventsource.ConnectionErrorHandler; -import com.launchdarkly.eventsource.ConnectionErrorHandler.Action; -import com.launchdarkly.eventsource.EventHandler; +import com.launchdarkly.eventsource.ConnectStrategy; +import com.launchdarkly.eventsource.ErrorStrategy; import com.launchdarkly.eventsource.EventSource; +import com.launchdarkly.eventsource.FaultEvent; +import com.launchdarkly.eventsource.HttpConnectStrategy; import com.launchdarkly.eventsource.MessageEvent; -import com.launchdarkly.eventsource.UnsuccessfulResponseException; +import com.launchdarkly.eventsource.StreamClosedByCallerException; +import com.launchdarkly.eventsource.StreamClosedByServerException; +import com.launchdarkly.eventsource.StreamEvent; +import com.launchdarkly.eventsource.StreamException; +import com.launchdarkly.eventsource.StreamHttpErrorException; +import com.launchdarkly.eventsource.StreamIOException; import com.launchdarkly.logging.LDLogger; import com.launchdarkly.logging.LogValues; import com.launchdarkly.sdk.server.StreamProcessorEvents.DeleteData; @@ -31,6 +37,7 @@ import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -41,7 +48,6 @@ import static com.launchdarkly.sdk.server.Util.httpErrorDescription; import okhttp3.Headers; -import okhttp3.OkHttpClient; /** * Implementation of the streaming data source, not including the lower-level SSE implementation which is in @@ -89,8 +95,6 @@ final class StreamProcessor implements DataSource { private volatile boolean lastStoreUpdateFailed = false; private final LDLogger logger; - ConnectionErrorHandler connectionErrorHandler = createDefaultConnectionErrorHandler(); // exposed for testing - StreamProcessor( HttpConfiguration httpConfig, DataSourceUpdates dataSourceUpdates, @@ -129,52 +133,16 @@ private void onStoreStatusChanged(DataStoreStatusProvider.Status newStatus) { EventSource stream = es; if (stream != null) { logger.warn("Restarting stream to refresh data after data store outage"); - stream.restart(); + stream.interrupt(); } } } } - private ConnectionErrorHandler createDefaultConnectionErrorHandler() { - return (Throwable t) -> { - recordStreamInit(true); - - if (t instanceof UnsuccessfulResponseException) { - int status = ((UnsuccessfulResponseException)t).getCode(); - ErrorInfo errorInfo = ErrorInfo.fromHttpError(status); - - boolean recoverable = checkIfErrorIsRecoverableAndLog(logger, httpErrorDescription(status), - ERROR_CONTEXT_MESSAGE, status, WILL_RETRY_MESSAGE); - if (recoverable) { - dataSourceUpdates.updateStatus(State.INTERRUPTED, errorInfo); - esStarted = System.currentTimeMillis(); - return Action.PROCEED; - } else { - dataSourceUpdates.updateStatus(State.OFF, errorInfo); - return Action.SHUTDOWN; - } - } - - checkIfErrorIsRecoverableAndLog(logger, t.toString(), ERROR_CONTEXT_MESSAGE, 0, WILL_RETRY_MESSAGE); - ErrorInfo errorInfo = ErrorInfo.fromException(t instanceof IOException ? ErrorKind.NETWORK_ERROR : ErrorKind.UNKNOWN, t); - dataSourceUpdates.updateStatus(State.INTERRUPTED, errorInfo); - return Action.PROCEED; - }; - } - @Override public Future start() { final CompletableFuture initFuture = new CompletableFuture<>(); - ConnectionErrorHandler wrappedConnectionErrorHandler = (Throwable t) -> { - Action result = connectionErrorHandler.onConnectionError(t); - if (result == Action.SHUTDOWN) { - initFuture.complete(null); // if client is initializing, make it stop waiting; has no effect if already inited - } - return result; - }; - - EventHandler handler = new StreamEventHandler(initFuture); URI endpointUri = concatenateUriPath(streamUri, StandardEndpoints.STREAMING_REQUEST_PATH); // Notes about the configuration of the EventSource below: @@ -188,26 +156,45 @@ public Future start() { // smaller one there because we don't expect long delays within any *non*-streaming response that the // LD client gets. A read timeout on the stream will result in the connection being cycled, so we set // this to be slightly more than the expected interval between heartbeat signals. - - EventSource.Builder builder = new EventSource.Builder(handler, endpointUri) - .threadPriority(threadPriority) - .logger(new EventSourceLoggerAdapter()) + + HttpConnectStrategy eventSourceHttpConfig = ConnectStrategy.http(endpointUri) + .headers(headers) + .clientBuilderActions(clientBuilder -> { + configureHttpClientBuilder(httpConfig, clientBuilder); + }) + // Set readTimeout last, to ensure that this hard-coded value overrides any other read + // timeout that might have been set by httpProperties (see comment about readTimeout above). + .readTimeout(DEAD_CONNECTION_INTERVAL.toMillis(), TimeUnit.MILLISECONDS); + EventSource.Builder builder = new EventSource.Builder(eventSourceHttpConfig) + .errorStrategy(ErrorStrategy.alwaysContinue()) + // alwaysContinue means we want EventSource to give us a FaultEvent rather + // than throwing an exception if the stream fails + .logger(logger) .readBufferSize(5000) .streamEventData(true) - .expectFields("event") - .clientBuilderActions(new EventSource.Builder.ClientConfigurer() { - public void configure(OkHttpClient.Builder builder) { - configureHttpClientBuilder(httpConfig, builder); - } - }) - .connectionErrorHandler(wrappedConnectionErrorHandler) - .headers(headers) - .reconnectTime(initialReconnectDelay) - .readTimeout(DEAD_CONNECTION_INTERVAL); - + .expectFields("event") + .retryDelay(initialReconnectDelay.toMillis(), TimeUnit.MILLISECONDS); es = builder.build(); - esStarted = System.currentTimeMillis(); - es.start(); + + Thread thread = new Thread(() -> { + esStarted = System.currentTimeMillis(); + + // We are deliberately not calling es.start() here, but just iterating over es.anyEvents(). + // EventSource will start the stream connection either way, but if we called start(), it + // would swallow any FaultEvents that happened during the initial conection attempt; we + // want to know about those. + for (StreamEvent event: es.anyEvents()) { + if (!handleEvent(event, initFuture)) { + // handleEvent returns false if we should fall through and end the thread + break; + } + } + }); + thread.setName("LaunchDarkly-streaming"); + thread.setDaemon(true); + thread.setPriority(threadPriority); + thread.start(); + return initFuture; } @@ -234,118 +221,138 @@ public boolean isInitialized() { return initialized.get(); } - private class StreamEventHandler implements EventHandler { - private final CompletableFuture initFuture; - - StreamEventHandler(CompletableFuture initFuture) { - this.initFuture = initFuture; - } - - @Override - public void onOpen() throws Exception { - } - - @Override - public void onClosed() throws Exception { + // Handles a single StreamEvent and returns true if we should keep the stream alive, + // or false if we should shut down permanently. + private boolean handleEvent(StreamEvent event, CompletableFuture initFuture) { + logger.debug("Received StreamEvent: {}", event); + if (event instanceof MessageEvent) { + handleMessage((MessageEvent)event, initFuture); + } else if (event instanceof FaultEvent) { + return handleError(((FaultEvent)event).getCause(), initFuture); } - - @Override - public void onMessage(String eventName, MessageEvent event) throws Exception { - try { - switch (eventName) { - case PUT: - handlePut(event.getDataReader()); - break; - - case PATCH: - handlePatch(event.getDataReader()); - break; - - case DELETE: - handleDelete(event.getDataReader()); - break; - - default: - logger.warn("Unexpected event found in stream: {}", eventName); - break; - } - lastStoreUpdateFailed = false; - dataSourceUpdates.updateStatus(State.VALID, null); - } catch (StreamInputException e) { - logger.error("LaunchDarkly service request failed or received invalid data: {}", - LogValues.exceptionSummary(e)); - logger.debug(LogValues.exceptionTrace(e)); - - ErrorInfo errorInfo = new ErrorInfo( - e.getCause() instanceof IOException ? ErrorKind.NETWORK_ERROR : ErrorKind.INVALID_DATA, - 0, - e.getCause() == null ? e.getMessage() : e.getCause().toString(), - Instant.now() - ); - dataSourceUpdates.updateStatus(State.INTERRUPTED, errorInfo); + return true; + } + + private void handleMessage(MessageEvent event, CompletableFuture initFuture) { + try { + switch (event.getEventName()) { + case PUT: + handlePut(event.getDataReader(), initFuture); + break; + + case PATCH: + handlePatch(event.getDataReader()); + break; + + case DELETE: + handleDelete(event.getDataReader()); + break; + + default: + logger.warn("Unexpected event found in stream: {}", event.getEventName()); + break; + } + lastStoreUpdateFailed = false; + dataSourceUpdates.updateStatus(State.VALID, null); + } catch (StreamInputException e) { + logger.error("LaunchDarkly service request failed or received invalid data: {}", + LogValues.exceptionSummary(e)); + logger.debug(LogValues.exceptionTrace(e)); - es.restart(); - } catch (StreamStoreException e) { - // See item 2 in error handling comments at top of class - if (statusListener == null) { - if (!lastStoreUpdateFailed) { - logger.warn("Restarting stream to ensure that we have the latest data"); - } - es.restart(); + ErrorInfo errorInfo = new ErrorInfo( + e.getCause() instanceof IOException ? ErrorKind.NETWORK_ERROR : ErrorKind.INVALID_DATA, + 0, + e.getCause() == null ? e.getMessage() : e.getCause().toString(), + Instant.now() + ); + dataSourceUpdates.updateStatus(State.INTERRUPTED, errorInfo); + + es.interrupt(); + } catch (StreamStoreException e) { + // See item 2 in error handling comments at top of class + if (statusListener == null) { + if (!lastStoreUpdateFailed) { + logger.warn("Restarting stream to ensure that we have the latest data"); } - lastStoreUpdateFailed = true; - } catch (Exception e) { - logger.warn("Unexpected error from stream processor: {}", LogValues.exceptionSummary(e)); - logger.debug(LogValues.exceptionTrace(e)); + es.interrupt(); } + lastStoreUpdateFailed = true; + } catch (Exception e) { + logger.warn("Unexpected error from stream processor: {}", LogValues.exceptionSummary(e)); + logger.debug(LogValues.exceptionTrace(e)); } + } - private void handlePut(Reader eventData) throws StreamInputException, StreamStoreException { - recordStreamInit(false); - esStarted = 0; - PutData putData = parseStreamJson(StreamProcessorEvents::parsePutData, eventData); - if (!dataSourceUpdates.init(putData.data)) { - throw new StreamStoreException(); - } - if (!initialized.getAndSet(true)) { - initFuture.complete(null); - logger.info("Initialized LaunchDarkly client."); - } + private void handlePut(Reader eventData, CompletableFuture initFuture) + throws StreamInputException, StreamStoreException { + recordStreamInit(false); + esStarted = 0; + PutData putData = parseStreamJson(StreamProcessorEvents::parsePutData, eventData); + if (!dataSourceUpdates.init(putData.data)) { + throw new StreamStoreException(); + } + if (!initialized.getAndSet(true)) { + initFuture.complete(null); + logger.info("Initialized LaunchDarkly client."); } + } - private void handlePatch(Reader eventData) throws StreamInputException, StreamStoreException { - PatchData data = parseStreamJson(StreamProcessorEvents::parsePatchData, eventData); - if (data.kind == null) { - return; - } - if (!dataSourceUpdates.upsert(data.kind, data.key, data.item)) { - throw new StreamStoreException(); - } + private void handlePatch(Reader eventData) throws StreamInputException, StreamStoreException { + PatchData data = parseStreamJson(StreamProcessorEvents::parsePatchData, eventData); + if (data.kind == null) { + return; + } + if (!dataSourceUpdates.upsert(data.kind, data.key, data.item)) { + throw new StreamStoreException(); } + } - private void handleDelete(Reader eventData) throws StreamInputException, StreamStoreException { - DeleteData data = parseStreamJson(StreamProcessorEvents::parseDeleteData, eventData); - if (data.kind == null) { - return; - } - ItemDescriptor placeholder = new ItemDescriptor(data.version, null); - if (!dataSourceUpdates.upsert(data.kind, data.key, placeholder)) { - throw new StreamStoreException(); - } + private void handleDelete(Reader eventData) throws StreamInputException, StreamStoreException { + DeleteData data = parseStreamJson(StreamProcessorEvents::parseDeleteData, eventData); + if (data.kind == null) { + return; + } + ItemDescriptor placeholder = new ItemDescriptor(data.version, null); + if (!dataSourceUpdates.upsert(data.kind, data.key, placeholder)) { + throw new StreamStoreException(); } + } - @Override - public void onComment(String comment) { - logger.debug("Received a heartbeat"); + private boolean handleError(StreamException e, CompletableFuture initFuture) { + boolean streamFailed = true; + if (e instanceof StreamClosedByCallerException) { + // This indicates that we ourselves deliberately restarted the stream, so we don't + // treat that as a failure in our analytics. + streamFailed = false; + } else { + logger.warn("Encountered EventSource error: {}", LogValues.exceptionSummary(e)); } + recordStreamInit(streamFailed); + + if (e instanceof StreamHttpErrorException) { + int status = ((StreamHttpErrorException)e).getCode(); + ErrorInfo errorInfo = ErrorInfo.fromHttpError(status); - @Override - public void onError(Throwable throwable) { - logger.warn("Encountered EventSource error: {}", LogValues.exceptionSummary(throwable)); - logger.debug(LogValues.exceptionTrace(throwable)); - } - } + boolean recoverable = checkIfErrorIsRecoverableAndLog(logger, httpErrorDescription(status), + ERROR_CONTEXT_MESSAGE, status, WILL_RETRY_MESSAGE); + if (recoverable) { + dataSourceUpdates.updateStatus(State.INTERRUPTED, errorInfo); + esStarted = System.currentTimeMillis(); + return true; // allow reconnect + } else { + dataSourceUpdates.updateStatus(State.OFF, errorInfo); + initFuture.complete(null); // if client is initializing, make it stop waiting; has no effect if already inited + return false; // don't reconnect + } + } + boolean isNetworkError = e instanceof StreamIOException || e instanceof StreamClosedByServerException; + checkIfErrorIsRecoverableAndLog(logger, e.toString(), ERROR_CONTEXT_MESSAGE, 0, WILL_RETRY_MESSAGE); + ErrorInfo errorInfo = ErrorInfo.fromException(isNetworkError ? ErrorKind.NETWORK_ERROR : ErrorKind.UNKNOWN, e); + dataSourceUpdates.updateStatus(State.INTERRUPTED, errorInfo); + return true; // allow reconnect + } + private static T parseStreamJson(Function parser, Reader r) throws StreamInputException { try { try (JsonReader jr = new JsonReader(r)) { @@ -372,31 +379,4 @@ public StreamInputException(Throwable cause) { // This exception class indicates that the data store failed to persist an update. @SuppressWarnings("serial") private static final class StreamStoreException extends Exception {} - - private final class EventSourceLoggerAdapter implements com.launchdarkly.eventsource.Logger { - @Override - public void debug(String format, Object param) { - logger.debug(format, param); - } - - @Override - public void debug(String format, Object param1, Object param2) { - logger.debug(format, param1, param2); - } - - @Override - public void info(String message) { - logger.info(message); - } - - @Override - public void warn(String message) { - logger.warn(message); - } - - @Override - public void error(String message) { - logger.error(message); - } - } } diff --git a/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java b/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java index a8b4cd356..1078b653c 100644 --- a/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java +++ b/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java @@ -1,6 +1,5 @@ package com.launchdarkly.sdk.server; -import com.launchdarkly.eventsource.ConnectionErrorHandler; import com.launchdarkly.eventsource.MessageEvent; import com.launchdarkly.sdk.server.DataModel.FeatureFlag; import com.launchdarkly.sdk.server.DataModel.Segment; @@ -19,6 +18,7 @@ import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.interfaces.DataStoreTypes.ItemDescriptor; import com.launchdarkly.sdk.server.interfaces.HttpConfiguration; +import com.launchdarkly.testhelpers.ConcurrentHelpers; import com.launchdarkly.testhelpers.httptest.Handler; import com.launchdarkly.testhelpers.httptest.Handlers; import com.launchdarkly.testhelpers.httptest.HttpServer; @@ -28,7 +28,6 @@ import org.junit.Before; import org.junit.Test; -import java.io.EOFException; import java.net.URI; import java.time.Duration; import java.util.Map; @@ -57,7 +56,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -93,6 +91,18 @@ private static Handler closableStreamResponse(String data, Semaphore closeSignal ); } + private static Handler streamThatSendsEventsAndThenStaysOpen(String... events) { + return Handlers.all( + Handlers.SSE.start(), + ctx -> { + for (String event: events) { + Handlers.SSE.event(event).apply(ctx); + } + Handlers.SSE.leaveOpen().apply(ctx); + } + ); + } + private static Handler streamResponseFromQueue(BlockingQueue events) { return Handlers.all( Handlers.SSE.start(), @@ -147,9 +157,11 @@ public void builderHasDefaultConfiguration() throws Exception { public void builderCanSpecifyConfiguration() throws Exception { URI streamUri = URI.create("http://fake"); DataSourceFactory f = Components.streamingDataSource() - .baseURI(streamUri) .initialReconnectDelay(Duration.ofMillis(5555)); - try (StreamProcessor sp = (StreamProcessor)f.createDataSource(clientContext(SDK_KEY, LDConfig.DEFAULT), + LDConfig config = new LDConfig.Builder() + .serviceEndpoints(Components.serviceEndpoints().streaming(streamUri)) + .build(); + try (StreamProcessor sp = (StreamProcessor)f.createDataSource(clientContext(SDK_KEY, config), dataSourceUpdates(dataStore))) { assertThat(sp.initialReconnectDelay, equalTo(Duration.ofMillis(5555))); assertThat(sp.streamUri, equalTo(streamUri)); @@ -609,16 +621,21 @@ private void verifyEventCausesStreamRestart(String eventName, String eventData, BlockingQueue events = new LinkedBlockingQueue<>(); events.add(EMPTY_DATA_EVENT); - try (HttpServer server = HttpServer.start(streamResponseFromQueue(events))) { + Handler responses = Handlers.sequential( + streamResponseFromQueue(events), // use a queue for the first request so we can control it below + streamThatSendsEventsAndThenStaysOpen(EMPTY_DATA_EVENT) // second request just gets a "put" + ); + try (HttpServer server = HttpServer.start(responses)) { try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { sp.start(); dataSourceUpdates.awaitInit(); server.getRecorder().requireRequest(); + // first connection succeeds and gets the "put" requireDataSourceStatus(statuses, State.VALID); - + + // now, cause a problematic event to appear events.add(makeEvent(eventName, eventData)); - events.add(EMPTY_DATA_EVENT); server.getRecorder().requireRequest(); dataSourceUpdates.awaitInit(); @@ -635,44 +652,39 @@ private void verifyEventCausesStreamRestart(String eventName, String eventData, @Test public void testSpecialHttpConfigurations() throws Exception { Handler handler = streamResponse(EMPTY_DATA_EVENT); - + + BlockingQueue statuses = new LinkedBlockingQueue<>(); + dataSourceUpdates.register(statuses::add); + TestHttpUtil.testWithSpecialHttpConfigurations(handler, (targetUri, goodHttpConfig) -> { LDConfig config = new LDConfig.Builder().http(goodHttpConfig).build(); - ConnectionErrorSink errorSink = new ConnectionErrorSink(); + + statuses.clear(); try (StreamProcessor sp = createStreamProcessor(config, targetUri)) { - sp.connectionErrorHandler = errorSink; - startAndWait(sp); - assertNull(errorSink.errors.peek()); + sp.start(); + + Status status = ConcurrentHelpers.awaitValue(statuses, 1, TimeUnit.SECONDS); + assertEquals(State.VALID, status.getState()); } }, (targetUri, badHttpConfig) -> { LDConfig config = new LDConfig.Builder().http(badHttpConfig).build(); - ConnectionErrorSink errorSink = new ConnectionErrorSink(); + + statuses.clear(); try (StreamProcessor sp = createStreamProcessor(config, targetUri)) { - sp.connectionErrorHandler = errorSink; - startAndWait(sp); + sp.start(); - Throwable error = errorSink.errors.peek(); - assertNotNull(error); + Status status = ConcurrentHelpers.awaitValue(statuses, 1, TimeUnit.SECONDS); + assertNotNull(status.getLastError()); + assertEquals(ErrorKind.NETWORK_ERROR, status.getLastError().getKind()); } } ); } - static class ConnectionErrorSink implements ConnectionErrorHandler { - final BlockingQueue errors = new LinkedBlockingQueue<>(); - - public Action onConnectionError(Throwable t) { - if (!(t instanceof EOFException)) { - errors.add(t); - } - return Action.SHUTDOWN; - } - } - private void testUnrecoverableHttpError(int statusCode) throws Exception { Handler errorResp = Handlers.status(statusCode);