diff --git a/build.gradle b/build.gradle index 3fdc6704a..f81289a15 100644 --- a/build.gradle +++ b/build.gradle @@ -75,7 +75,7 @@ ext.versions = [ "launchdarklyJavaSdkCommon": "1.3.0", "launchdarklyLogging": "1.1.0", "okhttp": "4.9.3", // specify this for the SDK build instead of relying on the transitive dependency from okhttp-eventsource - "okhttpEventsource": "4.0.0", + "okhttpEventsource": "4.1.0", "slf4j": "1.7.21", "snakeyaml": "1.32", "jedis": "2.9.0" diff --git a/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java b/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java index bd78bd6c5..6ba5304ed 100644 --- a/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java +++ b/src/main/java/com/launchdarkly/sdk/server/StreamProcessor.java @@ -2,6 +2,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.gson.JsonParseException; +import com.google.gson.JsonSyntaxException; import com.google.gson.stream.JsonReader; import com.launchdarkly.eventsource.ConnectStrategy; import com.launchdarkly.eventsource.ErrorStrategy; @@ -11,6 +12,7 @@ import com.launchdarkly.eventsource.MessageEvent; import com.launchdarkly.eventsource.StreamClosedByCallerException; import com.launchdarkly.eventsource.StreamClosedByServerException; +import com.launchdarkly.eventsource.StreamClosedWithIncompleteMessageException; import com.launchdarkly.eventsource.StreamEvent; import com.launchdarkly.eventsource.StreamException; import com.launchdarkly.eventsource.StreamHttpErrorException; @@ -276,6 +278,15 @@ private void handleMessage(MessageEvent event, CompletableFuture initFutur lastStoreUpdateFailed = false; dataSourceUpdates.updateStatus(State.VALID, null); } catch (StreamInputException e) { + if (exceptionHasCause(e, StreamClosedWithIncompleteMessageException.class)) { + // JSON parsing failed because the event was cut off prematurely-- because the + // stream got closed. In this case we should simply throw the event away; the + // closing of the stream will be handled separately on our next pass through + // the loop, and is logged separately. There's no point in logging an error + // about invalid JSON when the real problem is a broken connection; invalid + // JSON is significant only if we think we have a complete message. + return; + } logger.error("LaunchDarkly service request failed or received invalid data: {}", LogValues.exceptionSummary(e)); logger.debug(LogValues.exceptionTrace(e)); @@ -304,6 +315,13 @@ private void handleMessage(MessageEvent event, CompletableFuture initFutur } } + private static boolean exceptionHasCause(Throwable e, Class c) { + if (c.isAssignableFrom(e.getClass())) { + return true; + } + return e.getCause() != null && exceptionHasCause(e.getCause(), c); + } + private void handlePut(Reader eventData, CompletableFuture initFuture) throws StreamInputException, StreamStoreException { recordStreamInit(false); diff --git a/src/test/java/com/launchdarkly/sdk/server/EvalResultTest.java b/src/test/java/com/launchdarkly/sdk/server/EvalResultTest.java index cc7808f45..71ef5dec8 100644 --- a/src/test/java/com/launchdarkly/sdk/server/EvalResultTest.java +++ b/src/test/java/com/launchdarkly/sdk/server/EvalResultTest.java @@ -1,13 +1,13 @@ package com.launchdarkly.sdk.server; -import java.util.function.Function; - import com.launchdarkly.sdk.EvaluationDetail; import com.launchdarkly.sdk.EvaluationReason; import com.launchdarkly.sdk.LDValue; import org.junit.Test; +import java.util.function.Function; + import static com.launchdarkly.sdk.EvaluationDetail.NO_VARIATION; import static com.launchdarkly.sdk.EvaluationReason.ErrorKind.WRONG_TYPE; import static org.hamcrest.MatcherAssert.assertThat; diff --git a/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java b/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java index 945dae0e2..6b0f60b2a 100644 --- a/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java +++ b/src/test/java/com/launchdarkly/sdk/server/StreamProcessorTest.java @@ -32,6 +32,7 @@ import java.net.URI; import java.time.Duration; +import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; @@ -721,6 +722,26 @@ public void closingStreamProcessorDoesNotLogNetworkError() throws Exception { } } } + + @Test + public void streamFailingWithIncompleteEventDoesNotLogJsonError() throws Exception { + String incompleteEvent = "event: put\ndata: {\"flags\":"; + Handler stream1 = Handlers.all( + Handlers.SSE.start(), + Handlers.writeChunkString(incompleteEvent) + ); + Handler stream2 = streamResponse(EMPTY_DATA_EVENT); + Handler stream1Then2 = Handlers.sequential(stream1, stream2); + + try (HttpServer server = HttpServer.start(stream1Then2)) { + try (StreamProcessor sp = createStreamProcessor(null, server.getUri())) { + sp.start(); + dataSourceUpdates.awaitInit(); + + assertThat(logCapture.awaitMessage(LDLogLevel.ERROR, 0), nullValue()); + } + } + } private void testUnrecoverableHttpError(int statusCode) throws Exception { Handler errorResp = Handlers.status(statusCode);