Skip to content

Commit

Permalink
Change SSE to use the OkHttp public API only (#8141)
Browse files Browse the repository at this point in the history
* Change SSE to use the OkHttp public API only

Previously we prevented end-users from using their own
implementations of Call.Factory because we casted down
to RealCall in RealEventSource.

With this change we're implementing SSE without depending
on any OkHttp implementation details.

This also introduces a new function in EventSources to
create an EventSource.Factory from a Call.Factory, and
hides the previous implementation that required a concrete
OkHttpClient.

Finally this fixes SSE to publish the same EventListener
events as regular HTTP calls.

* apiDump
  • Loading branch information
squarejesse authored Dec 17, 2023
1 parent 6ba49ad commit f131fae
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 17 deletions.
3 changes: 2 additions & 1 deletion okhttp-sse/api/okhttp-sse.api
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ public abstract class okhttp3/sse/EventSourceListener {

public final class okhttp3/sse/EventSources {
public static final field INSTANCE Lokhttp3/sse/EventSources;
public static final fun createFactory (Lokhttp3/OkHttpClient;)Lokhttp3/sse/EventSource$Factory;
public static final fun createFactory (Lokhttp3/Call$Factory;)Lokhttp3/sse/EventSource$Factory;
public static final synthetic fun createFactory (Lokhttp3/OkHttpClient;)Lokhttp3/sse/EventSource$Factory;
public static final fun processResponse (Lokhttp3/Response;Lokhttp3/sse/EventSourceListener;)V
}

12 changes: 10 additions & 2 deletions okhttp-sse/src/main/kotlin/okhttp3/sse/EventSources.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,21 @@
*/
package okhttp3.sse

import okhttp3.Call
import okhttp3.OkHttpClient
import okhttp3.Response
import okhttp3.sse.internal.RealEventSource

object EventSources {
@Deprecated(
message = "required for binary-compatibility!",
level = DeprecationLevel.HIDDEN,
)
@JvmStatic
fun createFactory(client: OkHttpClient): EventSource.Factory {
fun createFactory(client: OkHttpClient) = createFactory(client as Call.Factory)

@JvmStatic
fun createFactory(callFactory: Call.Factory): EventSource.Factory {
return EventSource.Factory { request, listener ->
val actualRequest =
if (request.header("Accept") == null) {
Expand All @@ -31,7 +39,7 @@ object EventSources {
}

RealEventSource(actualRequest, listener).apply {
connect(client)
connect(callFactory)
}
}
}
Expand Down
18 changes: 6 additions & 12 deletions okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@ package okhttp3.sse.internal
import java.io.IOException
import okhttp3.Call
import okhttp3.Callback
import okhttp3.EventListener
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.ResponseBody
import okhttp3.internal.connection.RealCall
import okhttp3.internal.stripBody
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
Expand All @@ -32,16 +29,13 @@ internal class RealEventSource(
private val request: Request,
private val listener: EventSourceListener
) : EventSource, ServerSentEventReader.Callback, Callback {
private var call: RealCall? = null
private var call: Call? = null
@Volatile private var canceled = false

fun connect(client: OkHttpClient) {
val client = client.newBuilder()
.eventListener(EventListener.NONE)
.build()
val realCall = client.newCall(request) as RealCall
call = realCall
realCall.enqueue(this)
fun connect(callFactory: Call.Factory) {
call = callFactory.newCall(request).apply {
enqueue(this@RealEventSource)
}
}

override fun onResponse(call: Call, response: Response) {
Expand All @@ -64,7 +58,7 @@ internal class RealEventSource(
}

// This is a long-lived response. Cancel full-call timeouts.
call?.timeoutEarlyExit()
call?.timeout()?.cancel()

// Replace the body with a stripped one so the callbacks can't see real data.
val response = response.stripBody()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import mockwebserver3.junit5.internal.MockWebServerExtension;
import okhttp3.OkHttpClient;
import okhttp3.OkHttpClientTestRule;
import okhttp3.RecordingEventListener;
import okhttp3.Request;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSources;
Expand All @@ -34,7 +35,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junitpioneer.jupiter.RetryingTest;

import static org.assertj.core.api.Assertions.assertThat;

@Tag("Slowish")
Expand All @@ -45,8 +45,12 @@ public final class EventSourceHttpTest {
private MockWebServer server;
@RegisterExtension public final OkHttpClientTestRule clientTestRule = new OkHttpClientTestRule();

private final RecordingEventListener eventListener = new RecordingEventListener();

private final EventSourceRecorder listener = new EventSourceRecorder();
private OkHttpClient client = clientTestRule.newClient();
private OkHttpClient client = clientTestRule.newClientBuilder()
.eventListenerFactory(clientTestRule.wrap(eventListener))
.build();

@BeforeEach public void before(MockWebServer server) {
this.server = server;
Expand Down Expand Up @@ -177,6 +181,41 @@ public void cancelInEventShortCircuits() throws IOException {
assertThat(server.takeRequest().getHeaders().get("Accept")).isEqualTo("text/event-stream");
}

@Test public void eventListenerEvents() {
server.enqueue(new MockResponse.Builder()
.body(""
+ "data: hey\n"
+ "\n").setHeader("content-type", "text/event-stream")
.build());

EventSource source = newEventSource();

assertThat(source.request().url().encodedPath()).isEqualTo("/");

listener.assertOpen();
listener.assertEvent(null, null, "hey");
listener.assertClose();

assertThat(eventListener.recordedEventTypes()).containsExactly(
"CallStart",
"ProxySelectStart",
"ProxySelectEnd",
"DnsStart",
"DnsEnd",
"ConnectStart",
"ConnectEnd",
"ConnectionAcquired",
"RequestHeadersStart",
"RequestHeadersEnd",
"ResponseHeadersStart",
"ResponseHeadersEnd",
"ResponseBodyStart",
"ResponseBodyEnd",
"ConnectionReleased",
"CallEnd"
);
}

private EventSource newEventSource() {
return newEventSource(null);
}
Expand Down

0 comments on commit f131fae

Please sign in to comment.